package com.oath.cyclops.internal.stream;

import com.oath.cyclops.types.stream.PausableConnectable;
import cyclops.companion.Eithers;
import cyclops.function.FluentFunctions;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Stream;

/* loaded from: input_file:META-INF/jars/cyclops-10.4.1.jar:com/oath/cyclops/internal/stream/PausableConnectableImpl.class */
public class PausableConnectableImpl<T> extends BaseConnectableImpl<T> implements PausableConnectable<T> {
    public PausableConnectableImpl(Stream<T> stream) {
        super(stream);
    }

    @Override // com.oath.cyclops.internal.stream.BaseConnectableImpl
    public PausableConnectable<T> init(Executor executor) {
        CompletableFuture.runAsync(() -> {
            this.stream.forEach(obj -> {
                this.pause.get().join();
                int i = this.connected;
                for (int i2 = 0; i2 < i; i2++) {
                    Eithers.blocking(this.connections.get(i2)).fold(FluentFunctions.ofChecked(blockingQueue -> {
                        blockingQueue.put(obj);
                        return true;
                    }), queue -> {
                        return Boolean.valueOf(queue.offer(obj));
                    });
                }
            });
            this.open.set(false);
        }, executor);
        return this;
    }

    @Override // com.oath.cyclops.internal.stream.BaseConnectableImpl
    public PausableConnectable<T> paused(Executor executor) {
        super.paused(executor);
        return this;
    }

    @Override // com.oath.cyclops.internal.stream.IteratorHotStream, com.oath.cyclops.types.stream.PausableConnectable
    public void unpause() {
        super.unpause();
    }

    @Override // com.oath.cyclops.internal.stream.IteratorHotStream, com.oath.cyclops.types.stream.PausableConnectable
    public void pause() {
        super.pause();
    }
}
