package com.github.mizosoft.methanol.decoder;

import com.github.mizosoft.methanol.BodyDecoder;
import com.github.mizosoft.methanol.decoder.AsyncDecoder;
import com.github.mizosoft.methanol.internal.flow.AbstractSubscription;
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
import com.github.mizosoft.methanol.internal.flow.Prefetcher;
import com.github.mizosoft.methanol.internal.flow.Upstream;
import java.io.IOException;
import java.lang.System;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;

/* loaded from: input_file:com/github/mizosoft/methanol/decoder/AsyncBodyDecoder.class */
public final class AsyncBodyDecoder<T> implements BodyDecoder<T> {
    private static final String BUFFER_SIZE_PROP = "com.github.mizosoft.methanol.decoder.AsyncBodyDecoder.bufferSize";
    private static final int DEFAULT_BUFFER_SIZE = 8192;
    private final AsyncDecoder decoder;
    private final HttpResponse.BodySubscriber<T> downstream;
    private final Executor executor;
    private final boolean userExecutor;
    private final Upstream upstream;
    private final Prefetcher prefetcher;
    private final QueueByteSource source;
    private final StackByteSink sink;
    private final ConcurrentLinkedQueue<List<ByteBuffer>> decodedBuffers;
    private volatile AsyncBodyDecoder<T>.SubscriptionImpl downstreamSubscription;
    private boolean completed;
    private static final System.Logger logger = System.getLogger(AsyncBodyDecoder.class.getName());
    private static final int BUFFER_SIZE = getBufferSize();
    private static final List<ByteBuffer> COMPLETE = List.of(ByteBuffer.allocate(0));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/mizosoft/methanol/decoder/AsyncBodyDecoder$QueueByteSource.class */
    public static final class QueueByteSource implements AsyncDecoder.ByteSource {
        private static final ByteBuffer NO_INPUT = ByteBuffer.allocate(0);
        private final List<ByteBuffer> sourceBuffers = new ArrayList();
        private boolean completed;

        QueueByteSource() {
        }

        @Override // com.github.mizosoft.methanol.decoder.AsyncDecoder.ByteSource
        public ByteBuffer currentSource() {
            while (this.sourceBuffers.size() > 0) {
                ByteBuffer byteBuffer = this.sourceBuffers.get(0);
                if (byteBuffer.hasRemaining()) {
                    return byteBuffer;
                }
                this.sourceBuffers.remove(0);
            }
            return NO_INPUT;
        }

        @Override // com.github.mizosoft.methanol.decoder.AsyncDecoder.ByteSource
        public long remaining() {
            long j = 0;
            while (this.sourceBuffers.iterator().hasNext()) {
                j += r0.next().remaining();
            }
            return j;
        }

        @Override // com.github.mizosoft.methanol.decoder.AsyncDecoder.ByteSource
        public boolean hasRemaining() {
            return currentSource() != NO_INPUT;
        }

        @Override // com.github.mizosoft.methanol.decoder.AsyncDecoder.ByteSource
        public boolean finalSource() {
            return this.completed;
        }

        void push(List<ByteBuffer> list) {
            this.sourceBuffers.addAll(list);
        }

        void onComplete() {
            this.completed = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/mizosoft/methanol/decoder/AsyncBodyDecoder$StackByteSink.class */
    public static final class StackByteSink implements AsyncDecoder.ByteSink {
        private final List<ByteBuffer> sinkBuffers = new ArrayList();

        StackByteSink() {
        }

        @Override // com.github.mizosoft.methanol.decoder.AsyncDecoder.ByteSink
        public ByteBuffer currentSink() {
            int size = this.sinkBuffers.size();
            ByteBuffer byteBuffer = size > 0 ? this.sinkBuffers.get(size - 1) : null;
            if (byteBuffer == null || !byteBuffer.hasRemaining()) {
                byteBuffer = ByteBuffer.allocate(AsyncBodyDecoder.BUFFER_SIZE);
                this.sinkBuffers.add(byteBuffer);
            }
            return byteBuffer;
        }

        boolean flush(ConcurrentLinkedQueue<List<ByteBuffer>> concurrentLinkedQueue, boolean z) {
            List<ByteBuffer> slice = slice(z);
            if (slice.isEmpty()) {
                return false;
            }
            concurrentLinkedQueue.offer(slice);
            return true;
        }

        private List<ByteBuffer> slice(boolean z) {
            if (this.sinkBuffers.isEmpty()) {
                return List.of();
            }
            int size = this.sinkBuffers.size();
            int i = size;
            ByteBuffer byteBuffer = this.sinkBuffers.get(size - 1);
            if (byteBuffer.hasRemaining() && (!z || byteBuffer.position() == 0)) {
                i--;
            }
            List<ByteBuffer> subList = this.sinkBuffers.subList(0, i);
            List<ByteBuffer> list = (List) subList.stream().map((v0) -> {
                return v0.asReadOnlyBuffer();
            }).collect(Collectors.toUnmodifiableList());
            list.forEach((v0) -> {
                v0.flip();
            });
            subList.clear();
            return list;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/mizosoft/methanol/decoder/AsyncBodyDecoder$SubscriptionImpl.class */
    public final class SubscriptionImpl extends AbstractSubscription<List<ByteBuffer>> {
        private List<ByteBuffer> currentBatch;

        SubscriptionImpl() {
            super(AsyncBodyDecoder.this.downstream, AsyncBodyDecoder.this.executor);
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v1, types: [java.util.List<java.nio.ByteBuffer>] */
        /* JADX WARN: Type inference failed for: r0v22, types: [java.util.List] */
        /* JADX WARN: Type inference failed for: r0v27, types: [java.util.List] */
        /* JADX WARN: Type inference failed for: r1v4, types: [java.util.List<java.nio.ByteBuffer>] */
        @Override // com.github.mizosoft.methanol.internal.flow.AbstractSubscription
        protected long emit(Flow.Subscriber<? super List<ByteBuffer>> subscriber, long j) {
            T t = this.currentBatch;
            this.currentBatch = null;
            if (t == null) {
                t = (List) AsyncBodyDecoder.this.decodedBuffers.poll();
            }
            long j2 = 0;
            while (t != AsyncBodyDecoder.COMPLETE) {
                if (j2 >= j || t == null) {
                    this.currentBatch = t;
                    return j2;
                }
                if (!submitOnNext(subscriber, t)) {
                    return 0L;
                }
                j2++;
                t = (List) AsyncBodyDecoder.this.decodedBuffers.poll();
            }
            cancelOnComplete(subscriber);
            return 0L;
        }

        @Override // com.github.mizosoft.methanol.internal.flow.AbstractSubscription
        protected void abort(boolean z) {
            if (z) {
                AsyncBodyDecoder.this.upstream.cancel();
            } else {
                AsyncBodyDecoder.this.upstream.clear();
            }
            AsyncBodyDecoder.this.decoder.close();
            AsyncBodyDecoder.this.decodedBuffers.clear();
        }
    }

    public AsyncBodyDecoder(AsyncDecoder asyncDecoder, HttpResponse.BodySubscriber<T> bodySubscriber) {
        this(asyncDecoder, bodySubscriber, FlowSupport.SYNC_EXECUTOR, false);
    }

    public AsyncBodyDecoder(AsyncDecoder asyncDecoder, HttpResponse.BodySubscriber<T> bodySubscriber, Executor executor) {
        this(asyncDecoder, bodySubscriber, executor, true);
    }

    private AsyncBodyDecoder(AsyncDecoder asyncDecoder, HttpResponse.BodySubscriber<T> bodySubscriber, Executor executor, boolean z) {
        this.decoder = (AsyncDecoder) Objects.requireNonNull(asyncDecoder, "decoder");
        this.downstream = (HttpResponse.BodySubscriber) Objects.requireNonNull(bodySubscriber, "downstream");
        this.executor = (Executor) Objects.requireNonNull(executor, "executor");
        this.userExecutor = z;
        this.upstream = new Upstream();
        this.prefetcher = new Prefetcher();
        this.source = new QueueByteSource();
        this.sink = new StackByteSink();
        this.decodedBuffers = new ConcurrentLinkedQueue<>();
    }

    public AsyncDecoder asyncDecoder() {
        return this.decoder;
    }

    @Override // com.github.mizosoft.methanol.BodyDecoder
    public String encoding() {
        return this.decoder.encoding();
    }

    @Override // com.github.mizosoft.methanol.BodyDecoder
    public Optional<Executor> executor() {
        return this.userExecutor ? Optional.of(this.executor) : Optional.empty();
    }

    @Override // com.github.mizosoft.methanol.BodyDecoder
    public HttpResponse.BodySubscriber<T> downstream() {
        return this.downstream;
    }

    public void onSubscribe(Flow.Subscription subscription) {
        Objects.requireNonNull(subscription);
        if (this.upstream.setOrCancel(subscription)) {
            AsyncBodyDecoder<T>.SubscriptionImpl subscriptionImpl = new SubscriptionImpl();
            this.downstreamSubscription = subscriptionImpl;
            subscriptionImpl.signal(true);
            this.prefetcher.initialize(this.upstream);
        }
    }

    public void onNext(List<ByteBuffer> list) {
        Objects.requireNonNull(list);
        if (this.completed) {
            return;
        }
        this.source.push(list);
        try {
            this.decoder.decode(this.source, this.sink);
            this.prefetcher.update(this.upstream);
            AsyncBodyDecoder<T>.SubscriptionImpl subscriptionImpl = this.downstreamSubscription;
            if (!this.sink.flush(this.decodedBuffers, false) || subscriptionImpl == null) {
                return;
            }
            subscriptionImpl.signal(false);
        } catch (Throwable th) {
            this.upstream.cancel();
            onError(th);
        }
    }

    public void onError(Throwable th) {
        Objects.requireNonNull(th);
        if (this.completed) {
            logger.log(System.Logger.Level.WARNING, "upstream error received after completion", th);
            return;
        }
        this.completed = true;
        this.upstream.clear();
        AsyncBodyDecoder<T>.SubscriptionImpl subscriptionImpl = this.downstreamSubscription;
        if (subscriptionImpl != null) {
            subscriptionImpl.signalError(th);
        }
    }

    public void onComplete() {
        if (this.completed) {
            return;
        }
        this.completed = true;
        this.upstream.clear();
        AsyncBodyDecoder<T>.SubscriptionImpl subscriptionImpl = this.downstreamSubscription;
        try {
            AsyncDecoder asyncDecoder = this.decoder;
            try {
                this.source.onComplete();
                this.decoder.decode(this.source, this.sink);
                if (this.source.hasRemaining()) {
                    throw new IOException("unexhausted bytes after final source: " + this.source.remaining());
                }
                this.sink.flush(this.decodedBuffers, true);
                this.decodedBuffers.offer(COMPLETE);
                if (subscriptionImpl != null) {
                    subscriptionImpl.signal(true);
                }
                if (asyncDecoder != null) {
                    asyncDecoder.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (subscriptionImpl != null) {
                subscriptionImpl.signalError(th);
            }
        }
    }

    private static int getBufferSize() {
        int intValue = Integer.getInteger(BUFFER_SIZE_PROP, DEFAULT_BUFFER_SIZE).intValue();
        if (intValue <= 0) {
            intValue = DEFAULT_BUFFER_SIZE;
        }
        return intValue;
    }
}
