/*
 * Decompiled with CFR 0.152.
 */
package com.github.mizosoft.methanol.internal.extensions;

import com.github.mizosoft.methanol.internal.Utils;
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
import com.github.mizosoft.methanol.internal.flow.Prefetcher;
import com.github.mizosoft.methanol.internal.flow.Upstream;
import java.io.IOException;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.spi.AbstractInterruptibleChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import org.checkerframework.checker.nullness.qual.Nullable;

public final class ByteChannelSubscriber
implements HttpResponse.BodySubscriber<ReadableByteChannel> {
    private static final ByteBuffer TOMBSTONE = ByteBuffer.allocate(0);
    private static final List<ByteBuffer> TOMBSTONE_LIST = List.of(TOMBSTONE);
    private final Upstream upstream = new Upstream();
    private final Prefetcher prefetcher = new Prefetcher();
    private final BlockingQueue<List<ByteBuffer>> upstreamBuffers = new ArrayBlockingQueue<List<ByteBuffer>>(FlowSupport.prefetch() + 1);
    private volatile @Nullable Throwable pendingError;

    @Override
    public CompletionStage<ReadableByteChannel> getBody() {
        return CompletableFuture.completedFuture(new ChannelView());
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        Objects.requireNonNull(subscription);
        if (this.upstream.setOrCancel(subscription)) {
            this.prefetcher.initialize(this.upstream);
        }
    }

    @Override
    public void onNext(List<ByteBuffer> item) {
        Objects.requireNonNull(item);
        if (this.upstreamBuffers.remainingCapacity() > 1) {
            this.upstreamBuffers.offer(item);
        } else {
            this.upstream.cancel();
            this.signalCompletion(new IllegalStateException("missing back-pressure: queue is overflowed"));
        }
    }

    @Override
    public void onError(Throwable throwable) {
        Objects.requireNonNull(throwable);
        this.complete(throwable);
    }

    @Override
    public void onComplete() {
        this.complete(null);
    }

    private void complete(@Nullable Throwable error) {
        this.upstream.clear();
        this.signalCompletion(error);
    }

    private void signalCompletion(@Nullable Throwable error) {
        if (error != null) {
            this.pendingError = error;
            this.upstreamBuffers.clear();
        }
        try {
            this.upstreamBuffers.add(TOMBSTONE_LIST);
        }
        catch (IllegalStateException e) {
            throw new AssertionError("no space for TOMBSTONE_LIST", e);
        }
    }

    private final class ChannelView
    extends AbstractInterruptibleChannel
    implements ReadableByteChannel {
        private final List<ByteBuffer> cached = new ArrayList<ByteBuffer>();

        ChannelView() {
        }

        private @Nullable ByteBuffer pollNext() {
            ByteBuffer next;
            while ((next = this.nextCached()) == null) {
                List buffers = (List)ByteChannelSubscriber.this.upstreamBuffers.poll();
                if (buffers == null) {
                    return null;
                }
                this.cached.addAll(buffers);
                ByteChannelSubscriber.this.prefetcher.update(ByteChannelSubscriber.this.upstream);
            }
            return next;
        }

        private @Nullable ByteBuffer takeNext() {
            ByteBuffer next;
            while ((next = this.nextCached()) == null) {
                try {
                    List<ByteBuffer> buffers = ByteChannelSubscriber.this.upstreamBuffers.take();
                    this.cached.addAll(buffers);
                    ByteChannelSubscriber.this.prefetcher.update(ByteChannelSubscriber.this.upstream);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return null;
                }
            }
            return next;
        }

        private @Nullable ByteBuffer nextCached() {
            while (this.cached.size() > 0) {
                ByteBuffer peek = this.cached.get(0);
                if (peek.hasRemaining() || peek == TOMBSTONE) {
                    return peek;
                }
                this.cached.remove(0);
            }
            return null;
        }

        private void checkOpen() throws IOException {
            if (!this.isOpen()) {
                throw new ClosedChannelException();
            }
        }

        private void throwIfPending() throws IOException {
            Throwable error = ByteChannelSubscriber.this.pendingError;
            if (error != null) {
                throw new IOException("upstream error", error);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private int readBytes(ByteBuffer dst) throws IOException {
            int read = 0;
            try {
                this.begin();
                while (dst.hasRemaining() && this.isOpen()) {
                    ByteBuffer next = read > 0 ? this.pollNext() : this.takeNext();
                    this.throwIfPending();
                    if (next == TOMBSTONE) {
                        if (read != 0) break;
                        read = -1;
                        break;
                    }
                    if (next == null) break;
                    read += Utils.copyRemaining(next, dst);
                }
                this.end(read > 0);
            }
            catch (Throwable throwable) {
                this.end(read > 0);
                throw throwable;
            }
            return read;
        }

        @Override
        public synchronized int read(ByteBuffer dst) throws IOException {
            this.checkOpen();
            this.throwIfPending();
            return this.readBytes(dst);
        }

        @Override
        protected void implCloseChannel() {
            ByteChannelSubscriber.this.upstream.cancel();
            ByteChannelSubscriber.this.upstreamBuffers.clear();
            ByteChannelSubscriber.this.signalCompletion(null);
        }
    }
}

