/*
 * Decompiled with CFR 0.152.
 */
package com.github.mizosoft.methanol.internal.extensions;

import com.github.mizosoft.methanol.internal.Validate;
import com.github.mizosoft.methanol.internal.extensions.ForwardingBodySubscriber;
import com.github.mizosoft.methanol.internal.flow.AbstractSubscription;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.function.Function;
import org.checkerframework.checker.nullness.qual.Nullable;

public final class HttpResponsePublisher<T>
implements Flow.Publisher<HttpResponse<T>> {
    private final HttpClient client;
    private final HttpRequest request;
    private final HttpResponse.BodyHandler<T> bodyHandler;
    private final @Nullable Function<HttpRequest, @Nullable HttpResponse.BodyHandler<T>> pushPromiseAcceptor;
    private final Executor executor;

    public HttpResponsePublisher(HttpClient client, HttpRequest request, HttpResponse.BodyHandler<T> bodyHandler, @Nullable Function<HttpRequest, @Nullable HttpResponse.BodyHandler<T>> pushPromiseAcceptor, Executor executor) {
        this.client = client;
        this.request = request;
        this.bodyHandler = bodyHandler;
        this.pushPromiseAcceptor = pushPromiseAcceptor;
        this.executor = executor;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super HttpResponse<T>> subscriber) {
        Objects.requireNonNull(subscriber);
        new SubscriptionImpl(subscriber, this).signal(true);
    }

    private static final class SubscriptionImpl<V>
    extends AbstractSubscription<HttpResponse<V>>
    implements HttpResponse.PushPromiseHandler<V> {
        private static final VarHandle ONGOING;
        private final HttpClient client;
        private final HttpRequest initialRequest;
        private final HttpResponse.BodyHandler<V> handler;
        private final @Nullable Function<HttpRequest, @Nullable HttpResponse.BodyHandler<V>> pushPromiseAcceptor;
        private final ConcurrentLinkedQueue<HttpResponse<V>> receivedResponses;
        private volatile boolean receivedInitialResponseBody;
        private volatile int ongoing;
        private boolean initialRequestSent;

        SubscriptionImpl(Flow.Subscriber<? super HttpResponse<V>> downstream, HttpResponsePublisher<V> parent) {
            super(downstream, parent.executor);
            this.client = parent.client;
            this.initialRequest = parent.request;
            this.handler = parent.bodyHandler;
            this.pushPromiseAcceptor = parent.pushPromiseAcceptor;
            this.receivedResponses = new ConcurrentLinkedQueue();
        }

        @Override
        public void applyPushPromise(HttpRequest initiatingRequest, HttpRequest pushPromiseRequest, Function<HttpResponse.BodyHandler<V>, CompletableFuture<HttpResponse<V>>> completer) {
            if (this.receivedInitialResponseBody) {
                this.signalError(new IllegalStateException("receiving push promise after initial response body has been received: " + pushPromiseRequest));
            } else if (!this.isCancelled() && !this.hasPendingErrors()) {
                this.handlePushPromise(pushPromiseRequest, completer);
            }
        }

        private void handlePushPromise(HttpRequest incomingPushPromise, Function<HttpResponse.BodyHandler<V>, CompletableFuture<HttpResponse<V>>> completer) {
            HttpResponse.BodyHandler<V> pushedResponseHandler;
            Validate.requireState(this.pushPromiseAcceptor != null, "unexpected push promise");
            Function<HttpRequest, @Nullable HttpResponse.BodyHandler<V>> acceptor = Validate.castNonNull(this.pushPromiseAcceptor);
            try {
                pushedResponseHandler = acceptor.apply(incomingPushPromise);
            }
            catch (Throwable t) {
                this.signalError(t);
                return;
            }
            if (pushedResponseHandler != null) {
                ONGOING.getAndAdd(this, 1);
                completer.apply(pushedResponseHandler).whenComplete(this::onCompletion);
            }
        }

        @Override
        protected long emit(Flow.Subscriber<? super HttpResponse<V>> downstream, long emit) {
            if (emit > 0L && !this.initialRequestSent) {
                this.initialRequestSent = true;
                ONGOING.getAndAdd(this, 1);
                try {
                    CompletableFuture<HttpResponse<Object>> responseFuture = this.pushPromiseAcceptor != null ? this.client.sendAsync(this.initialRequest, this::notifyOnBodyCompletion, this) : this.client.sendAsync(this.initialRequest, this.handler);
                    responseFuture.whenComplete(this::onCompletion);
                }
                catch (Throwable t) {
                    this.cancelOnError(downstream, t, true);
                }
                return 0L;
            }
            long submitted = 0L;
            while (true) {
                HttpResponse<V> response;
                if (this.receivedAllResponses()) {
                    this.cancelOnComplete(downstream);
                    return 0L;
                }
                if (submitted >= emit || (response = this.receivedResponses.poll()) == null) {
                    return submitted;
                }
                if (!this.submitOnNext(downstream, response)) break;
                ONGOING.getAndAdd(this, -1);
                ++submitted;
            }
            return 0L;
        }

        private void onCompletion(HttpResponse<V> response, Throwable error) {
            if (this.isCancelled()) {
                return;
            }
            if (error != null) {
                this.signalError(error);
            } else {
                this.receivedResponses.offer(response);
                this.signal(false);
            }
        }

        private void initialResponseBodyReceived() {
            this.receivedInitialResponseBody = true;
            this.signal(true);
        }

        private boolean receivedAllResponses() {
            return this.ongoing == 0 && this.initialRequestSent && (this.pushPromiseAcceptor == null || this.receivedInitialResponseBody);
        }

        private HttpResponse.BodySubscriber<V> notifyOnBodyCompletion(HttpResponse.ResponseInfo info) {
            return new NotifyingBodySubscriber<V>(this.handler.apply(info), this::initialResponseBodyReceived);
        }

        static {
            try {
                MethodHandles.Lookup lookup = MethodHandles.lookup();
                ONGOING = lookup.findVarHandle(SubscriptionImpl.class, "ongoing", Integer.TYPE);
            }
            catch (IllegalAccessException | NoSuchFieldException e) {
                throw new ExceptionInInitializerError(e);
            }
        }
    }

    private static final class NotifyingBodySubscriber<R>
    extends ForwardingBodySubscriber<R> {
        private final Runnable onCompletion;

        NotifyingBodySubscriber(HttpResponse.BodySubscriber<R> downstream, Runnable onCompletion) {
            super(downstream);
            this.onCompletion = onCompletion;
        }

        @Override
        public void onComplete() {
            try {
                super.onComplete();
            }
            finally {
                this.onCompletion.run();
            }
        }
    }
}

