/*
 * Decompiled with CFR 0.152.
 */
package com.github.mizosoft.methanol.decoder;

import com.github.mizosoft.methanol.BodyDecoder;
import com.github.mizosoft.methanol.decoder.AsyncDecoder;
import com.github.mizosoft.methanol.internal.flow.AbstractSubscription;
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
import com.github.mizosoft.methanol.internal.flow.Prefetcher;
import com.github.mizosoft.methanol.internal.flow.Upstream;
import java.io.IOException;
import java.net.http.HttpResponse;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

public final class AsyncBodyDecoder<T>
implements BodyDecoder<T> {
    private static final System.Logger logger = System.getLogger(AsyncBodyDecoder.class.getName());
    private static final String BUFFER_SIZE_PROP = "com.github.mizosoft.methanol.decoder.AsyncBodyDecoder.bufferSize";
    private static final int DEFAULT_BUFFER_SIZE = 8192;
    private static final int BUFFER_SIZE = AsyncBodyDecoder.getBufferSize();
    private static final List<ByteBuffer> COMPLETE = List.of(ByteBuffer.allocate(0));
    private final AsyncDecoder decoder;
    private final HttpResponse.BodySubscriber<T> downstream;
    private final Executor executor;
    private final boolean userExecutor;
    private final Upstream upstream;
    private final Prefetcher prefetcher;
    private final QueueByteSource source;
    private final StackByteSink sink;
    private final ConcurrentLinkedQueue<List<ByteBuffer>> decodedBuffers;
    private volatile @MonotonicNonNull SubscriptionImpl downstreamSubscription;
    private boolean completed;

    public AsyncBodyDecoder(AsyncDecoder decoder, HttpResponse.BodySubscriber<T> downstream) {
        this(decoder, downstream, FlowSupport.SYNC_EXECUTOR, false);
    }

    public AsyncBodyDecoder(AsyncDecoder decoder, HttpResponse.BodySubscriber<T> downstream, Executor executor) {
        this(decoder, downstream, executor, true);
    }

    private AsyncBodyDecoder(AsyncDecoder decoder, HttpResponse.BodySubscriber<T> downstream, Executor executor, boolean userExecutor) {
        this.decoder = Objects.requireNonNull(decoder, "decoder");
        this.downstream = Objects.requireNonNull(downstream, "downstream");
        this.executor = Objects.requireNonNull(executor, "executor");
        this.userExecutor = userExecutor;
        this.upstream = new Upstream();
        this.prefetcher = new Prefetcher();
        this.source = new QueueByteSource();
        this.sink = new StackByteSink();
        this.decodedBuffers = new ConcurrentLinkedQueue();
    }

    public AsyncDecoder asyncDecoder() {
        return this.decoder;
    }

    @Override
    public String encoding() {
        return this.decoder.encoding();
    }

    @Override
    public Optional<Executor> executor() {
        return this.userExecutor ? Optional.of(this.executor) : Optional.empty();
    }

    @Override
    public HttpResponse.BodySubscriber<T> downstream() {
        return this.downstream;
    }

    @Override
    public void onSubscribe(Flow.Subscription upstreamSubscription) {
        Objects.requireNonNull(upstreamSubscription);
        if (this.upstream.setOrCancel(upstreamSubscription)) {
            SubscriptionImpl subscription;
            this.downstreamSubscription = subscription = new SubscriptionImpl();
            subscription.signal(true);
            this.prefetcher.initialize(this.upstream);
        }
    }

    @Override
    public void onNext(List<ByteBuffer> buffers) {
        Objects.requireNonNull(buffers);
        if (this.completed) {
            return;
        }
        this.source.push(buffers);
        try {
            this.decoder.decode(this.source, this.sink);
        }
        catch (Throwable t) {
            this.upstream.cancel();
            this.onError(t);
            return;
        }
        this.prefetcher.update(this.upstream);
        SubscriptionImpl subscription = this.downstreamSubscription;
        if (this.sink.flush(this.decodedBuffers, false) && subscription != null) {
            subscription.signal(false);
        }
    }

    @Override
    public void onError(Throwable throwable) {
        Objects.requireNonNull(throwable);
        if (this.completed) {
            logger.log(System.Logger.Level.WARNING, "upstream error received after completion", throwable);
            return;
        }
        this.completed = true;
        this.upstream.clear();
        SubscriptionImpl subscription = this.downstreamSubscription;
        if (subscription != null) {
            subscription.signalError(throwable);
        }
    }

    @Override
    public void onComplete() {
        block11: {
            if (this.completed) {
                return;
            }
            this.completed = true;
            this.upstream.clear();
            SubscriptionImpl subscription = this.downstreamSubscription;
            try (AsyncDecoder asyncDecoder = this.decoder;){
                this.source.onComplete();
                this.decoder.decode(this.source, this.sink);
                if (this.source.hasRemaining()) {
                    throw new IOException("unexhausted bytes after final source: " + this.source.remaining());
                }
                this.sink.flush(this.decodedBuffers, true);
                this.decodedBuffers.offer(COMPLETE);
                if (subscription != null) {
                    subscription.signal(true);
                }
            }
            catch (Throwable t) {
                if (subscription == null) break block11;
                subscription.signalError(t);
            }
        }
    }

    private static int getBufferSize() {
        int bufferSize = Integer.getInteger(BUFFER_SIZE_PROP, 8192);
        if (bufferSize <= 0) {
            bufferSize = 8192;
        }
        return bufferSize;
    }

    private static final class QueueByteSource
    implements AsyncDecoder.ByteSource {
        private static final ByteBuffer NO_INPUT = ByteBuffer.allocate(0);
        private final List<ByteBuffer> sourceBuffers = new ArrayList<ByteBuffer>();
        private boolean completed;

        QueueByteSource() {
        }

        @Override
        public ByteBuffer currentSource() {
            while (this.sourceBuffers.size() > 0) {
                ByteBuffer peek = this.sourceBuffers.get(0);
                if (peek.hasRemaining()) {
                    return peek;
                }
                this.sourceBuffers.remove(0);
            }
            return NO_INPUT;
        }

        @Override
        public long remaining() {
            long sum = 0L;
            for (ByteBuffer b : this.sourceBuffers) {
                sum += (long)b.remaining();
            }
            return sum;
        }

        @Override
        public boolean hasRemaining() {
            return this.currentSource() != NO_INPUT;
        }

        @Override
        public boolean finalSource() {
            return this.completed;
        }

        void push(List<ByteBuffer> buffers) {
            this.sourceBuffers.addAll(buffers);
        }

        void onComplete() {
            this.completed = true;
        }
    }

    private static final class StackByteSink
    implements AsyncDecoder.ByteSink {
        private final List<ByteBuffer> sinkBuffers = new ArrayList<ByteBuffer>();

        StackByteSink() {
        }

        @Override
        public ByteBuffer currentSink() {
            Buffer last;
            int size = this.sinkBuffers.size();
            Buffer buffer = last = size > 0 ? this.sinkBuffers.get(size - 1) : null;
            if (last == null || !last.hasRemaining()) {
                last = ByteBuffer.allocate(BUFFER_SIZE);
                this.sinkBuffers.add((ByteBuffer)last);
            }
            return last;
        }

        boolean flush(ConcurrentLinkedQueue<List<ByteBuffer>> queue, boolean finished) {
            List<ByteBuffer> batch = this.slice(finished);
            if (!batch.isEmpty()) {
                queue.offer(batch);
                return true;
            }
            return false;
        }

        private List<ByteBuffer> slice(boolean finished) {
            int size;
            if (this.sinkBuffers.isEmpty()) {
                return List.of();
            }
            int snapshotSize = size = this.sinkBuffers.size();
            ByteBuffer last = this.sinkBuffers.get(size - 1);
            if (last.hasRemaining() && (!finished || last.position() == 0)) {
                --snapshotSize;
            }
            List<ByteBuffer> slice = this.sinkBuffers.subList(0, snapshotSize);
            List<ByteBuffer> snapshot = slice.stream().map(ByteBuffer::asReadOnlyBuffer).collect(Collectors.toUnmodifiableList());
            snapshot.forEach(ByteBuffer::flip);
            slice.clear();
            return snapshot;
        }
    }

    private final class SubscriptionImpl
    extends AbstractSubscription<List<ByteBuffer>> {
        private @Nullable List<ByteBuffer> currentBatch;

        SubscriptionImpl() {
            super(AsyncBodyDecoder.this.downstream, AsyncBodyDecoder.this.executor);
        }

        @Override
        protected long emit(Flow.Subscriber<? super List<ByteBuffer>> downstream, long emit) {
            List<ByteBuffer> batch = this.currentBatch;
            this.currentBatch = null;
            if (batch == null) {
                batch = AsyncBodyDecoder.this.decodedBuffers.poll();
            }
            long submitted = 0L;
            while (true) {
                if (batch == COMPLETE) {
                    this.cancelOnComplete(downstream);
                    return 0L;
                }
                if (submitted >= emit || batch == null) {
                    this.currentBatch = batch;
                    return submitted;
                }
                if (!this.submitOnNext(downstream, batch)) break;
                ++submitted;
                batch = AsyncBodyDecoder.this.decodedBuffers.poll();
            }
            return 0L;
        }

        @Override
        protected void abort(boolean flowInterrupted) {
            if (flowInterrupted) {
                AsyncBodyDecoder.this.upstream.cancel();
            } else {
                AsyncBodyDecoder.this.upstream.clear();
            }
            AsyncBodyDecoder.this.decoder.close();
            AsyncBodyDecoder.this.decodedBuffers.clear();
        }
    }
}

