package com.oath.cyclops.internal.stream.spliterators.push;

import com.oath.cyclops.types.reactive.BufferOverflowPolicy;
import cyclops.control.Option;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:META-INF/jars/cyclops-10.4.1.jar:com/oath/cyclops/internal/stream/spliterators/push/BufferingSinkOperator.class */
public class BufferingSinkOperator<T> implements Operator<T> {
    private final Queue<T> q;
    private final AtomicBoolean active = new AtomicBoolean(false);
    private final Consumer<? super Subscriber<T>> sub;
    private final BufferOverflowPolicy policy;

    public BufferingSinkOperator(Queue<T> queue, Consumer<? super Subscriber<T>> consumer, BufferOverflowPolicy bufferOverflowPolicy) {
        this.q = queue;
        this.sub = consumer;
        this.policy = bufferOverflowPolicy;
    }

    @Override // com.oath.cyclops.internal.stream.spliterators.push.Operator
    public StreamSubscription subscribe(final Consumer<? super T> consumer, final Consumer<? super Throwable> consumer2, final Runnable runnable) {
        final Subscription[] subscriptionArr = {null};
        final StreamSubscription streamSubscription = new StreamSubscription() { // from class: com.oath.cyclops.internal.stream.spliterators.push.BufferingSinkOperator.1
            @Override // com.oath.cyclops.internal.stream.spliterators.push.StreamSubscription, org.reactivestreams.Subscription
            public void request(long j) {
                super.request(j);
                subscriptionArr[0].request(j);
                BufferingSinkOperator.this.processQueue(this, consumer);
            }
        };
        this.sub.accept(new Subscriber<T>() { // from class: com.oath.cyclops.internal.stream.spliterators.push.BufferingSinkOperator.2
            @Override // org.reactivestreams.Subscriber
            public void onSubscribe(Subscription subscription) {
                subscriptionArr[0] = subscription;
            }

            @Override // org.reactivestreams.Subscriber
            public void onNext(T t) {
                if (!BufferingSinkOperator.this.q.offer(t)) {
                    Option<T> match = BufferingSinkOperator.this.policy.match(t);
                    StreamSubscription streamSubscription2 = streamSubscription;
                    Consumer consumer3 = consumer;
                    match.map((Function) obj -> {
                        while (!BufferingSinkOperator.this.q.offer(t)) {
                            Thread.yield();
                            BufferingSinkOperator.this.processQueue(streamSubscription2, consumer3);
                        }
                        return obj;
                    });
                }
                BufferingSinkOperator.this.processQueue(streamSubscription, consumer);
            }

            @Override // org.reactivestreams.Subscriber
            public void onError(Throwable th) {
                consumer2.accept(th);
            }

            @Override // org.reactivestreams.Subscriber
            public void onComplete() {
                runnable.run();
            }
        });
        return streamSubscription;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processQueue(StreamSubscription streamSubscription, Consumer<? super T> consumer) {
        T poll;
        if (this.active.compareAndSet(false, true)) {
            while (streamSubscription.isActive() && (poll = this.q.poll()) != null) {
                consumer.accept(poll);
                streamSubscription.requested.decrementAndGet();
            }
            this.active.set(false);
            if (this.q.isEmpty() || !streamSubscription.isActive()) {
                return;
            }
            processQueue(streamSubscription, consumer);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processQueue(Consumer<? super T> consumer) {
        if (this.active.compareAndSet(false, true)) {
            while (true) {
                T poll = this.q.poll();
                if (poll == null) {
                    break;
                } else {
                    consumer.accept(poll);
                }
            }
            this.active.set(false);
            if (this.q.isEmpty()) {
                return;
            }
            processQueue(consumer);
        }
    }

    @Override // com.oath.cyclops.internal.stream.spliterators.push.Operator
    public void subscribeAll(final Consumer<? super T> consumer, final Consumer<? super Throwable> consumer2, final Runnable runnable) {
        this.sub.accept(new Subscriber<T>() { // from class: com.oath.cyclops.internal.stream.spliterators.push.BufferingSinkOperator.3
            @Override // org.reactivestreams.Subscriber
            public void onSubscribe(Subscription subscription) {
                subscription.request(Long.MAX_VALUE);
            }

            @Override // org.reactivestreams.Subscriber
            public void onNext(T t) {
                if (!BufferingSinkOperator.this.q.offer(t)) {
                    BufferingSinkOperator.this.policy.match(t).map((Function) obj -> {
                        while (!BufferingSinkOperator.this.q.offer(t)) {
                            Thread.yield();
                        }
                        return obj;
                    });
                }
                BufferingSinkOperator.this.processQueue(consumer);
            }

            @Override // org.reactivestreams.Subscriber
            public void onError(Throwable th) {
                consumer2.accept(th);
            }

            @Override // org.reactivestreams.Subscriber
            public void onComplete() {
                runnable.run();
            }
        });
    }
}
