package org.threadly.concurrent;

import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.threadly.concurrent.future.FutureCallback;
import org.threadly.concurrent.future.FutureUtils;
import org.threadly.concurrent.future.ImmediateResultListenableFuture;
import org.threadly.concurrent.future.ListenableFuture;
import org.threadly.concurrent.future.ListenableFutureAdapterTask;
import org.threadly.concurrent.future.ListenableFutureTask;
import org.threadly.concurrent.future.ListenableRunnableFuture;
import org.threadly.concurrent.future.SettableListenableFuture;
import org.threadly.concurrent.future.watchdog.ConstantTimeWatchdog;
import org.threadly.util.ExceptionHandler;
import org.threadly.util.ExceptionUtils;
import org.threadly.util.Pair;

/* loaded from: input_file:org/threadly/concurrent/Poller.class */
public class Poller {
    protected final SubmitterScheduler scheduler;
    private final ConstantTimeWatchdog futureWatchdog;
    private final PollRunner runner;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/threadly/concurrent/Poller$PollRunner.class */
    public static class PollRunner extends ReschedulingOperation {
        private final Collection<Pair<ListenableRunnableFuture<?>, Supplier<Boolean>>> polls;

        public PollRunner(SubmitterScheduler submitterScheduler, long j) {
            super(submitterScheduler, j);
            this.polls = new ConcurrentLinkedQueue();
        }

        public ListenableFuture<?> watch(Supplier<Boolean> supplier) {
            if (supplier.get().booleanValue()) {
                return ImmediateResultListenableFuture.NULL_RESULT;
            }
            ListenableFutureTask listenableFutureTask = new ListenableFutureTask(false, DoNothingRunnable.instance(), null, this.executor);
            this.polls.add(new Pair<>(listenableFutureTask, supplier));
            signalToRun();
            return listenableFutureTask;
        }

        public <T> ListenableFuture<T> watch(Future<? extends T> future) {
            if (!future.isDone()) {
                ListenableFutureAdapterTask listenableFutureAdapterTask = new ListenableFutureAdapterTask(future);
                Collection<Pair<ListenableRunnableFuture<?>, Supplier<Boolean>>> collection = this.polls;
                future.getClass();
                collection.add(new Pair<>(listenableFutureAdapterTask, future::isDone));
                signalToRun();
                return listenableFutureAdapterTask;
            }
            if (future.isCancelled()) {
                SettableListenableFuture settableListenableFuture = new SettableListenableFuture();
                settableListenableFuture.cancel(false);
                return settableListenableFuture;
            }
            try {
                return FutureUtils.immediateResultFuture(future.get());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                return FutureUtils.immediateFailureFuture(e2.getCause());
            }
        }

        @Override // org.threadly.concurrent.ReschedulingOperation
        public void run() {
            Iterator<Pair<ListenableRunnableFuture<?>, Supplier<Boolean>>> it = this.polls.iterator();
            boolean z = false;
            while (it.hasNext()) {
                Pair<ListenableRunnableFuture<?>, Supplier<Boolean>> next = it.next();
                if (next.getLeft().isDone()) {
                    it.remove();
                } else if (next.getRight().get().booleanValue()) {
                    it.remove();
                    next.getLeft().run();
                } else {
                    z = true;
                }
            }
            if (z) {
                signalToRun();
            }
        }
    }

    public Poller(SubmitterScheduler submitterScheduler, long j) {
        this(submitterScheduler, j, -1L);
    }

    public Poller(SubmitterScheduler submitterScheduler, long j, long j2) {
        this.scheduler = submitterScheduler;
        if (j2 <= 0 || j2 == Long.MAX_VALUE) {
            this.futureWatchdog = null;
        } else {
            this.futureWatchdog = new ConstantTimeWatchdog(submitterScheduler, j2, false);
        }
        this.runner = new PollRunner(submitterScheduler, j);
    }

    public ListenableFuture<?> watch(Supplier<Boolean> supplier) {
        ListenableFuture<?> watch = this.runner.watch(supplier);
        if (this.futureWatchdog != null) {
            this.futureWatchdog.watch(watch);
        }
        return watch;
    }

    public <T> ListenableFuture<T> watch(Future<? extends T> future) {
        if ((this.futureWatchdog == null || future.isDone()) && (future instanceof ListenableFuture)) {
            return (ListenableFuture) future;
        }
        ListenableFuture<T> watch = this.runner.watch(future);
        if (this.futureWatchdog != null) {
            this.futureWatchdog.watch(watch);
        }
        return watch;
    }

    public <T> ListenableFuture<?> consumeQueue(Queue<? extends T> queue, Consumer<? super T> consumer) {
        return consumeQueue(queue, consumer, ExceptionUtils::handleException);
    }

    public <T> ListenableFuture<?> consumeQueue(final Queue<? extends T> queue, final Consumer<? super T> consumer, final ExceptionHandler exceptionHandler) {
        final SettableListenableFuture settableListenableFuture = new SettableListenableFuture(false);
        final Supplier<Boolean> supplier = () -> {
            return Boolean.valueOf(!queue.isEmpty() || settableListenableFuture.isDone());
        };
        this.runner.watch(supplier).callback(new FutureCallback<Object>() { // from class: org.threadly.concurrent.Poller.1
            @Override // org.threadly.concurrent.future.FutureCallback
            public void handleResult(Object obj) {
                ListenableFuture<?> watch;
                do {
                    try {
                        if (settableListenableFuture.isDone()) {
                            return;
                        }
                        Object remove = queue.remove();
                        watch = Poller.this.runner.watch(supplier);
                        consumer.accept(remove);
                    } catch (Throwable th) {
                        handleFailure(th);
                        return;
                    }
                } while (watch.isDone());
                watch.callback(this, Poller.this.scheduler, ListenableFuture.ListenerOptimizationStrategy.InvokingThreadIfDone);
            }

            @Override // org.threadly.concurrent.future.FutureCallback
            public void handleFailure(Throwable th) {
                if (exceptionHandler == null) {
                    settableListenableFuture.setFailure(th);
                    return;
                }
                try {
                    exceptionHandler.handleException(th);
                } finally {
                    Poller.this.runner.watch(supplier).callback(this, Poller.this.scheduler, ListenableFuture.ListenerOptimizationStrategy.InvokingThreadIfDone);
                }
            }
        }, this.scheduler);
        return settableListenableFuture;
    }
}
