package org.threadly.concurrent.processing;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import org.threadly.util.AbstractService;
import org.threadly.util.ArgumentVerifier;
import org.threadly.util.ExceptionHandler;
import org.threadly.util.ExceptionUtils;

/* loaded from: input_file:org/threadly/concurrent/processing/BlockingQueueConsumer.class */
public abstract class BlockingQueueConsumer<T> extends AbstractService {
    protected final ThreadFactory threadFactory;
    protected final BlockingQueue<? extends T> queue;
    protected volatile Thread runningThread = null;

    /* loaded from: input_file:org/threadly/concurrent/processing/BlockingQueueConsumer$ConsumerRunnable.class */
    protected final class ConsumerRunnable implements Runnable {
        protected ConsumerRunnable() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Runnable
        public void run() {
            while (BlockingQueueConsumer.this.runningThread != null) {
                try {
                    BlockingQueueConsumer.this.accept(BlockingQueueConsumer.this.getNext());
                } catch (InterruptedException e) {
                    BlockingQueueConsumer.this.stopIfRunning();
                } catch (Throwable th) {
                    BlockingQueueConsumer.this.handleException(th);
                }
            }
        }
    }

    public static <T> BlockingQueueConsumer<T> makeForHandlers(ThreadFactory threadFactory, BlockingQueue<? extends T> blockingQueue, final Consumer<T> consumer, ExceptionHandler exceptionHandler) {
        ArgumentVerifier.assertNotNull(consumer, "consumer");
        final ExceptionHandler exceptionHandler2 = exceptionHandler == null ? ExceptionUtils::handleException : exceptionHandler;
        return new BlockingQueueConsumer<T>(threadFactory, blockingQueue) { // from class: org.threadly.concurrent.processing.BlockingQueueConsumer.1
            @Override // org.threadly.concurrent.processing.BlockingQueueConsumer
            protected void accept(T t) throws Exception {
                consumer.accept(t);
            }

            @Override // org.threadly.concurrent.processing.BlockingQueueConsumer
            protected void handleException(Throwable th) {
                exceptionHandler2.handleException(th);
            }
        };
    }

    public BlockingQueueConsumer(ThreadFactory threadFactory, BlockingQueue<? extends T> blockingQueue) {
        ArgumentVerifier.assertNotNull(threadFactory, "threadFactory");
        ArgumentVerifier.assertNotNull(blockingQueue, "queue");
        this.threadFactory = threadFactory;
        this.queue = blockingQueue;
    }

    @Override // org.threadly.util.AbstractService
    protected void startupService() {
        this.runningThread = this.threadFactory.newThread(new ConsumerRunnable());
        if (this.runningThread.isAlive()) {
            throw new IllegalThreadStateException();
        }
        this.runningThread.setDaemon(true);
        this.runningThread.start();
    }

    @Override // org.threadly.util.AbstractService
    protected void shutdownService() {
        Thread thread = this.runningThread;
        this.runningThread = null;
        thread.interrupt();
    }

    protected T getNext() throws InterruptedException {
        return this.queue.take();
    }

    protected abstract void accept(T t) throws Exception;

    protected abstract void handleException(Throwable th);
}
