package discord4j.rest.request;

import discord4j.common.LogUtil;
import discord4j.rest.http.client.ClientException;
import discord4j.rest.http.client.ClientRequest;
import discord4j.rest.http.client.ClientResponse;
import discord4j.rest.http.client.DiscordWebClient;
import discord4j.rest.response.ResponseFunction;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import net.dv8tion.jda.api.requests.RestRateLimiter;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.netty.http.client.HttpClientResponse;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:discord4j/rest/request/RequestStream.class */
public class RequestStream {
    private static final Logger log = Loggers.getLogger((Class<?>) RequestStream.class);
    private final BucketKey id;
    private final RequestQueue<RequestCorrelation<ClientResponse>> requestQueue;
    private final GlobalRateLimiter globalRateLimiter;
    private final Scheduler timedTaskScheduler;
    private final List<ResponseFunction> responseFunctions;
    private final DiscordWebClient httpClient;
    private final RequestSubscriber requestSubscriber;
    private final RateLimitRetryOperator rateLimitRetryOperator;
    private final AtomicLong requestsInFlight = new AtomicLong(0);
    private final Sinks.Empty<?> stopCallback = Sinks.empty();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:discord4j/rest/request/RequestStream$RequestSubscriber.class */
    public class RequestSubscriber extends BaseSubscriber<RequestCorrelation<ClientResponse>> {
        private volatile Instant resetAt = Instant.EPOCH;
        private final Function<ClientResponse, Mono<ClientResponse>> responseFunction;
        private final Runnable processedCallback;

        public Instant getResetAt() {
            return this.resetAt;
        }

        public RequestSubscriber(RateLimitStrategy rateLimitStrategy, Runnable runnable) {
            this.processedCallback = runnable;
            this.responseFunction = clientResponse -> {
                HttpClientResponse httpResponse = clientResponse.getHttpResponse();
                if (RequestStream.log.isDebugEnabled()) {
                    Duration between = Duration.between(Instant.ofEpochMilli(((Long) httpResponse.currentContextView().get(DiscordWebClient.KEY_REQUEST_TIMESTAMP)).longValue()), Instant.now());
                    LogUtil.traceDebug(RequestStream.log, bool -> {
                        return LogUtil.format(httpResponse.currentContextView(), "Read " + httpResponse.status() + " in " + between + (!bool.booleanValue() ? "" : " with headers: " + httpResponse.responseHeaders()));
                    });
                }
                Duration apply = rateLimitStrategy.apply(httpResponse);
                if (!apply.isZero()) {
                    if (RequestStream.log.isDebugEnabled()) {
                        RequestStream.log.debug(LogUtil.format(httpResponse.currentContextView(), "Delaying next request by {}"), apply);
                    }
                    this.resetAt = Instant.now().plus((TemporalAmount) apply);
                }
                boolean parseBoolean = Boolean.parseBoolean(httpResponse.responseHeaders().get(RestRateLimiter.GLOBAL_HEADER));
                Mono<Void> empty = Mono.empty();
                if (parseBoolean) {
                    Duration ofSeconds = Duration.ofSeconds(Long.parseLong(httpResponse.responseHeaders().get("Retry-After")));
                    empty = RequestStream.this.globalRateLimiter.rateLimitFor(ofSeconds).doOnTerminate(() -> {
                        RequestStream.log.debug(LogUtil.format(httpResponse.currentContextView(), "Globally rate limited for {}"), ofSeconds);
                    });
                }
                return httpResponse.status().code() >= 400 ? empty.then(clientResponse.createException().flatMap((v0) -> {
                    return Mono.error(v0);
                })) : empty.thenReturn(clientResponse);
            };
        }

        @Override // reactor.core.publisher.BaseSubscriber
        protected void hookOnSubscribe(Subscription subscription) {
            request(1L);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // reactor.core.publisher.BaseSubscriber
        public void hookOnNext(RequestCorrelation<ClientResponse> requestCorrelation) {
            DiscordWebRequest request = requestCorrelation.getRequest();
            ClientRequest clientRequest = new ClientRequest(request);
            Sinks.One<ClientResponse> response = requestCorrelation.getResponse();
            Mono contextWrite = Mono.just(clientRequest).flatMap(clientRequest2 -> {
                return Mono.deferContextual(contextView -> {
                    LogUtil.traceDebug(RequestStream.log, bool -> {
                        return LogUtil.format(contextView, bool.booleanValue() ? clientRequest2.toString() : clientRequest2.getDescription());
                    });
                    return RequestStream.this.globalRateLimiter.withLimiter(RequestStream.this.httpClient.exchange(clientRequest2).flatMap(this.responseFunction)).next();
                });
            }).contextWrite(context -> {
                return context.putAll(requestCorrelation.getContext()).put(LogUtil.KEY_REQUEST_ID, clientRequest.getId()).put(LogUtil.KEY_BUCKET_ID, RequestStream.this.id.toString());
            });
            RateLimitRetryOperator rateLimitRetryOperator = RequestStream.this.rateLimitRetryOperator;
            Objects.requireNonNull(rateLimitRetryOperator);
            contextWrite.retryWhen(Retry.withThrowable(rateLimitRetryOperator::apply)).transform(getResponseTransformers(request)).retryWhen(RequestStream.this.serverErrorRetryFactory()).takeUntilOther(requestCorrelation.onCancel()).doFinally(this::next).checkpoint("Request to " + clientRequest.getDescription() + " [RequestStream]").subscribe(clientResponse -> {
                response.emitValue(clientResponse, Sinks.EmitFailureHandler.FAIL_FAST);
            }, th -> {
                RequestStream.log.trace("Error while processing {}: {}", request, th);
                response.emitError(th, Sinks.EmitFailureHandler.FAIL_FAST);
            }, () -> {
                response.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
            });
        }

        private Function<Mono<ClientResponse>, Mono<ClientResponse>> getResponseTransformers(DiscordWebRequest discordWebRequest) {
            return (Function) RequestStream.this.responseFunctions.stream().map(responseFunction -> {
                return responseFunction.transform(discordWebRequest).andThen(mono -> {
                    return mono.checkpoint("Apply " + responseFunction + " to " + discordWebRequest.getDescription() + " [RequestStream]");
                });
            }).reduce((v0, v1) -> {
                return v0.andThen(v1);
            }).orElse(mono -> {
                return mono;
            });
        }

        private void next(SignalType signalType) {
            Duration between = Duration.between(Instant.now(), this.resetAt);
            ((between.isNegative() || between.isZero()) ? Mono.just(0L) : Mono.delay(between, RequestStream.this.timedTaskScheduler)).doFinally(signalType2 -> {
                this.processedCallback.run();
            }).subscribe(l -> {
                if (RequestStream.log.isDebugEnabled()) {
                    RequestStream.log.debug("[B:{}] Ready to consume next request after {}", RequestStream.this.id.toString(), signalType);
                }
                request(1L);
            }, th -> {
                RequestStream.log.error("[B:{}] Error while scheduling next request", RequestStream.this.id.toString(), th);
            });
        }

        @Override // reactor.core.publisher.BaseSubscriber
        protected void hookOnComplete() {
            RequestStream.log.debug("[B:{}] RequestStream completed", RequestStream.this.id.toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RequestStream(BucketKey bucketKey, RouterOptions routerOptions, DiscordWebClient discordWebClient, RateLimitStrategy rateLimitStrategy) {
        this.id = bucketKey;
        this.requestQueue = routerOptions.getRequestQueueFactory().create();
        this.globalRateLimiter = routerOptions.getGlobalRateLimiter();
        this.timedTaskScheduler = routerOptions.getReactorResources().getTimerTaskScheduler();
        this.responseFunctions = routerOptions.getResponseTransformers();
        this.httpClient = discordWebClient;
        AtomicLong atomicLong = this.requestsInFlight;
        Objects.requireNonNull(atomicLong);
        this.requestSubscriber = new RequestSubscriber(rateLimitStrategy, atomicLong::decrementAndGet);
        this.rateLimitRetryOperator = new RateLimitRetryOperator(this.timedTaskScheduler);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Retry serverErrorRetryFactory() {
        return RetryBackoffSpec.backoff(10L, Duration.ofSeconds(2L)).filter(th -> {
            if (!(th instanceof ClientException)) {
                return false;
            }
            int code = ((ClientException) th).getStatus().code();
            return code == 500 || code == 502 || code == 503 || code == 504;
        }).jitter(0.5d).maxBackoff(Duration.ofSeconds(30L)).scheduler(this.timedTaskScheduler).doBeforeRetry(retrySignal -> {
            if (log.isDebugEnabled()) {
                log.debug("Retry {} in bucket {} due to {}", Long.valueOf(retrySignal.totalRetries()), this.id.toString(), retrySignal.failure().toString());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean push(RequestCorrelation<ClientResponse> requestCorrelation) {
        this.requestsInFlight.incrementAndGet();
        boolean push = this.requestQueue.push(requestCorrelation);
        if (!push) {
            this.requestsInFlight.decrementAndGet();
        }
        return push;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.requestQueue.requests().doOnDiscard(RequestCorrelation.class, this::onDiscard).takeUntilOther(this.stopCallback.asMono()).subscribe((CoreSubscriber<? super RequestCorrelation<ClientResponse>>) this.requestSubscriber);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.stopCallback.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Instant getResetAt() {
        return this.requestSubscriber.getResetAt();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long countRequestsInFlight() {
        return this.requestsInFlight.get();
    }

    private void onDiscard(RequestCorrelation<?> requestCorrelation) {
        this.requestsInFlight.decrementAndGet();
        requestCorrelation.getResponse().emitError(new DiscardedRequestException(requestCorrelation.getRequest()), Sinks.EmitFailureHandler.FAIL_FAST);
    }
}
