package discord4j.gateway;

import discord4j.common.GitProperties;
import discord4j.common.LogUtil;
import discord4j.common.ResettableInterval;
import discord4j.common.close.CloseException;
import discord4j.common.close.CloseStatus;
import discord4j.common.close.DisconnectBehavior;
import discord4j.common.operator.RateLimitOperator;
import discord4j.common.retry.ReconnectContext;
import discord4j.common.retry.ReconnectOptions;
import discord4j.common.sinks.EmissionStrategy;
import discord4j.discordjson.json.gateway.Dispatch;
import discord4j.discordjson.json.gateway.Heartbeat;
import discord4j.discordjson.json.gateway.Hello;
import discord4j.discordjson.json.gateway.Identify;
import discord4j.discordjson.json.gateway.ImmutableHeartbeat;
import discord4j.discordjson.json.gateway.ImmutableIdentify;
import discord4j.discordjson.json.gateway.ImmutableIdentifyProperties;
import discord4j.discordjson.json.gateway.ImmutableResume;
import discord4j.discordjson.json.gateway.InvalidSession;
import discord4j.discordjson.json.gateway.Opcode;
import discord4j.discordjson.json.gateway.PayloadData;
import discord4j.discordjson.json.gateway.Ready;
import discord4j.discordjson.json.gateway.Resumed;
import discord4j.discordjson.json.gateway.StatusUpdate;
import discord4j.discordjson.possible.Possible;
import discord4j.gateway.GatewayConnection;
import discord4j.gateway.json.GatewayPayload;
import discord4j.gateway.limiter.PayloadTransformer;
import discord4j.gateway.payload.PayloadReader;
import discord4j.gateway.payload.PayloadWriter;
import discord4j.gateway.retry.GatewayException;
import discord4j.gateway.retry.GatewayRetrySpec;
import discord4j.gateway.retry.GatewayStateChange;
import discord4j.gateway.retry.InvalidSessionException;
import discord4j.gateway.retry.ReconnectException;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.util.IllegalReferenceCountException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.function.TupleUtils;
import reactor.netty.ConnectionObserver;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.WebsocketClientSpec;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import reactor.util.retry.Retry;

/* loaded from: input_file:META-INF/jars/discord4j-gateway-3.2.2.jar:discord4j/gateway/DefaultGatewayClient.class */
public class DefaultGatewayClient implements GatewayClient {
    private static final Logger log = Loggers.getLogger((Class<?>) DefaultGatewayClient.class);
    private static final Logger senderLog = Loggers.getLogger("discord4j.gateway.protocol.sender");
    private static final Logger receiverLog = Loggers.getLogger("discord4j.gateway.protocol.receiver");
    private final GatewayReactorResources reactorResources;
    private final PayloadReader payloadReader;
    private final PayloadWriter payloadWriter;
    private final ReconnectOptions reconnectOptions;
    private final ReconnectContext reconnectContext;
    private final IdentifyOptions identifyOptions;
    private final String token;
    private final GatewayObserver observer;
    private final PayloadTransformer identifyLimiter;
    private final ResettableInterval heartbeatEmitter;
    private final int maxMissedHeartbeatAck;
    private final boolean unpooled;
    private final EmissionStrategy emissionStrategy;
    private final HttpClient httpClient;
    private final Sinks.Many<ByteBuf> receiver;
    private final Sinks.Many<ByteBuf> sender;
    private final Sinks.Many<Dispatch> dispatch;
    private final Sinks.Many<GatewayPayload<?>> outbound;
    private final Sinks.Many<GatewayPayload<Heartbeat>> heartbeats;
    private final Sinks.Many<GatewayConnection.State> state;
    private volatile Sinks.One<CloseStatus> disconnectNotifier;
    private volatile GatewayWebsocketHandler sessionHandler;
    private volatile ContextView currentContext;
    private static final String OUTBOUND_CAPACITY_PROPERTY = "discord4j.gateway.outbound.capacity";
    private final Map<Opcode<?>, PayloadHandler<?>> handlerMap = new HashMap();
    private final AtomicInteger sequence = new AtomicInteger(0);
    private final AtomicReference<String> sessionId = new AtomicReference<>("");
    private final AtomicLong lastSent = new AtomicLong(0);
    private final AtomicLong lastAck = new AtomicLong(0);
    private final AtomicInteger missedAck = new AtomicInteger(0);
    private volatile long responseTime = 0;

    public DefaultGatewayClient(GatewayOptions gatewayOptions) {
        this.token = (String) Objects.requireNonNull(gatewayOptions.getToken());
        this.reactorResources = (GatewayReactorResources) Objects.requireNonNull(gatewayOptions.getReactorResources());
        this.payloadReader = (PayloadReader) Objects.requireNonNull(gatewayOptions.getPayloadReader());
        this.payloadWriter = (PayloadWriter) Objects.requireNonNull(gatewayOptions.getPayloadWriter());
        this.reconnectOptions = gatewayOptions.getReconnectOptions();
        this.reconnectContext = new ReconnectContext(this.reconnectOptions.getFirstBackoff(), this.reconnectOptions.getMaxBackoffInterval());
        this.identifyOptions = (IdentifyOptions) Objects.requireNonNull(gatewayOptions.getIdentifyOptions());
        this.observer = gatewayOptions.getInitialObserver();
        this.identifyLimiter = (PayloadTransformer) Objects.requireNonNull(gatewayOptions.getIdentifyLimiter());
        this.maxMissedHeartbeatAck = Math.max(0, gatewayOptions.getMaxMissedHeartbeatAck());
        this.unpooled = gatewayOptions.isUnpooled();
        this.emissionStrategy = gatewayOptions.getEmissionStrategy();
        addHandler(Opcode.DISPATCH, this::handleDispatch);
        addHandler(Opcode.HEARTBEAT, this::handleHeartbeat);
        addHandler(Opcode.RECONNECT, this::handleReconnect);
        addHandler(Opcode.INVALID_SESSION, this::handleInvalidSession);
        addHandler(Opcode.HELLO, this::handleHello);
        addHandler(Opcode.HEARTBEAT_ACK, this::handleHeartbeatAck);
        this.httpClient = initHttpClient();
        this.receiver = newEmitterSink();
        this.sender = newEmitterSink();
        this.dispatch = newEmitterSink();
        this.outbound = newEmitterSink();
        this.heartbeats = newEmitterSink();
        this.heartbeatEmitter = new ResettableInterval(this.reactorResources.getTimerTaskScheduler());
        SessionInfo orElse = this.identifyOptions.getResumeSession().orElse(null);
        if (orElse == null) {
            this.state = Sinks.many().replay().latestOrDefault(GatewayConnection.State.START_IDENTIFYING);
            return;
        }
        this.sequence.set(orElse.getSequence());
        this.sessionId.set(orElse.getId());
        this.state = Sinks.many().replay().latestOrDefault(GatewayConnection.State.START_RESUMING);
    }

    private <T extends PayloadData> void addHandler(Opcode<T> opcode, PayloadHandler<T> payloadHandler) {
        this.handlerMap.put(opcode, payloadHandler);
    }

    private static <T> Sinks.Many<T> newEmitterSink() {
        return Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    }

    @Override // discord4j.gateway.GatewayClient
    public Mono<Void> execute(String str) {
        return Mono.deferContextual(contextView -> {
            this.currentContext = contextView;
            this.disconnectNotifier = Sinks.one();
            this.lastAck.set(0L);
            this.lastSent.set(0L);
            this.missedAck.set(0);
            Sinks.Empty empty = Sinks.empty();
            Mono<Void> then = this.state.asFlux().filter(state -> {
                return state == GatewayConnection.State.CONNECTED;
            }).next().then();
            this.sessionHandler = new GatewayWebsocketHandler(this.receiver, Flux.merge(this.heartbeats.asFlux().flatMap(gatewayPayload -> {
                return Flux.from(this.payloadWriter.write(gatewayPayload));
            }), this.outbound.asFlux().filter(gatewayPayload2 -> {
                return Opcode.IDENTIFY.equals(gatewayPayload2.getOp());
            }).delayUntil(gatewayPayload3 -> {
                return empty.asMono();
            }).flatMap(gatewayPayload4 -> {
                return Flux.from(this.payloadWriter.write(gatewayPayload4));
            }).transform(this.identifyLimiter), this.outbound.asFlux().filter(gatewayPayload5 -> {
                return Opcode.RESUME.equals(gatewayPayload5.getOp());
            }).flatMap(gatewayPayload6 -> {
                return Flux.from(this.payloadWriter.write(gatewayPayload6));
            }), this.outbound.asFlux().filter(DefaultGatewayClient::isNotStartupPayload).delayUntil(gatewayPayload7 -> {
                return then;
            }).flatMap(gatewayPayload8 -> {
                return Flux.from(this.payloadWriter.write(gatewayPayload8));
            }).transform(flux -> {
                return Flux.merge(flux, this.sender.asFlux());
            }).transform(new RateLimitOperator(outboundLimiterCapacity(), Duration.ofSeconds(60L), this.reactorResources.getTimerTaskScheduler(), this.reactorResources.getPayloadSenderScheduler()))).doOnNext(byteBuf -> {
                logPayload(senderLog, contextView, byteBuf);
            }).doOnDiscard(ByteBuf.class, DefaultGatewayClient::safeRelease), contextView);
            Mono<Void> then2 = this.dispatch.asFlux().filter(DefaultGatewayClient::isReadyOrResumed).zipWith(this.state.asFlux().next().repeat()).doOnNext(TupleUtils.consumer((dispatch, state2) -> {
                ConnectionObserver.State state2;
                if (state2 == GatewayConnection.State.START_IDENTIFYING || state2 == GatewayConnection.State.START_RESUMING) {
                    log.info(LogUtil.format(contextView, "Connected to Gateway"));
                    this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.connected());
                    state2 = GatewayObserver.CONNECTED;
                } else {
                    log.info(LogUtil.format(contextView, "Reconnected to Gateway"));
                    this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.retrySucceeded(this.reconnectContext.getAttempts()));
                    state2 = GatewayObserver.RETRY_SUCCEEDED;
                }
                this.reconnectContext.reset();
                this.state.emitNext(GatewayConnection.State.CONNECTED, Sinks.EmitFailureHandler.FAIL_FAST);
                notifyObserver(state2);
            })).then();
            Flux doOnNext = this.receiver.asFlux().map(byteBuf2 -> {
                return this.unpooled ? byteBuf2 : byteBuf2.retain();
            }).doOnNext(byteBuf3 -> {
                logPayload(receiverLog, contextView, byteBuf3);
            });
            PayloadReader payloadReader = this.payloadReader;
            Objects.requireNonNull(payloadReader);
            Mono<Void> then3 = doOnNext.flatMap(payloadReader::read).doOnDiscard(ByteBuf.class, DefaultGatewayClient::safeRelease).doOnNext(gatewayPayload9 -> {
                if (Opcode.HEARTBEAT_ACK.equals(gatewayPayload9.getOp())) {
                    empty.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
                }
            }).map(this::updateSequence).flatMap(this::handlePayload).then();
            Flux<GatewayPayload<?>> asFlux = this.outbound.asFlux();
            GatewayWebsocketHandler gatewayWebsocketHandler = this.sessionHandler;
            Objects.requireNonNull(gatewayWebsocketHandler);
            Mono<Void> then4 = asFlux.doOnComplete(gatewayWebsocketHandler::close).doOnNext(gatewayPayload10 -> {
                if (Opcode.RECONNECT.equals(gatewayPayload10.getOp())) {
                    this.sessionHandler.error(new GatewayException(contextView, "Reconnecting due to user action"));
                }
            }).then();
            Mono<Void> then5 = this.heartbeatEmitter.ticks().flatMap(l -> {
                long nanoTime = System.nanoTime();
                this.lastAck.compareAndSet(0L, nanoTime);
                long j = nanoTime - this.lastAck.get();
                if (this.lastSent.get() - this.lastAck.get() <= 0 || this.missedAck.incrementAndGet() <= this.maxMissedHeartbeatAck) {
                    log.debug(LogUtil.format(contextView, "Sending heartbeat {} after last ACK"), Duration.ofNanos(j));
                    this.lastSent.set(nanoTime);
                    return Mono.just(GatewayPayload.heartbeat(ImmutableHeartbeat.of(this.sequence.get())));
                }
                log.warn(LogUtil.format(contextView, "Missing heartbeat ACK for {} (tick: {}, seq: {})"), Duration.ofNanos(j), l, Integer.valueOf(this.sequence.get()));
                this.sessionHandler.error(new GatewayException(contextView, "Reconnecting due to zombie or failed connection"));
                return Mono.empty();
            }).doOnNext(gatewayPayload11 -> {
                this.emissionStrategy.emitNext(this.heartbeats, gatewayPayload11);
            }).then();
            HttpClient.WebsocketSender websocketSender = (HttpClient.WebsocketSender) this.httpClient.websocket(WebsocketClientSpec.builder().maxFramePayloadLength(Integer.MAX_VALUE).build()).uri(str);
            GatewayWebsocketHandler gatewayWebsocketHandler2 = this.sessionHandler;
            Objects.requireNonNull(gatewayWebsocketHandler2);
            Mono doOnError = Mono.zip(websocketSender.handle(gatewayWebsocketHandler2::handle).contextWrite(LogUtil.clearContext()).flatMap(tuple2 -> {
                return handleClose((DisconnectBehavior) tuple2.getT1(), (CloseStatus) tuple2.getT2());
            }).then(), then2, then3, then4, then5).doOnError(th -> {
                if (th instanceof ReconnectException) {
                    log.info(LogUtil.format(contextView, "{}"), th.getMessage());
                } else if ((th instanceof CloseException) || (th instanceof GatewayException)) {
                    log.warn(LogUtil.format(contextView, "{}"), th.toString());
                } else {
                    log.error(LogUtil.format(contextView, "Gateway client error"), th);
                }
            });
            ResettableInterval resettableInterval = this.heartbeatEmitter;
            Objects.requireNonNull(resettableInterval);
            return doOnError.doOnTerminate(resettableInterval::stop).doOnCancel(() -> {
                this.sessionHandler.close();
            }).then();
        }).contextWrite(context -> {
            return context.put(LogUtil.KEY_SHARD_ID, Integer.valueOf(this.identifyOptions.getShardInfo().getIndex()));
        }).retryWhen(retryFactory()).then(Mono.defer(() -> {
            return this.disconnectNotifier.asMono().then();
        })).doOnSubscribe(subscription -> {
            if (this.disconnectNotifier != null) {
                throw new IllegalStateException("execute can only be subscribed once");
            }
        });
    }

    private HttpClient initHttpClient() {
        HttpClient headers = this.reactorResources.getHttpClient().headers(httpHeaders -> {
            httpHeaders.add((CharSequence) HttpHeaderNames.USER_AGENT, (Object) initUserAgent());
        });
        return this.observer == GatewayObserver.NOOP_LISTENER ? headers : headers.observe((connection, state) -> {
            notifyObserver(state);
        });
    }

    private String initUserAgent() {
        Properties properties = GitProperties.getProperties();
        return "DiscordBot(" + properties.getProperty(GitProperties.APPLICATION_URL, "https://discord4j.com") + ", " + properties.getProperty(GitProperties.APPLICATION_VERSION, "3") + ")";
    }

    private void notifyObserver(ConnectionObserver.State state) {
        this.observer.onStateChange(state, this);
    }

    private void logPayload(Logger logger, ContextView contextView, ByteBuf byteBuf) {
        if (logger.isTraceEnabled()) {
            logger.trace(LogUtil.format(contextView, byteBuf.toString(StandardCharsets.UTF_8).replaceAll("(\"token\": ?\")([A-Za-z0-9._-]*)(\")", "$1hunter2$3")));
        }
    }

    private static boolean isNotStartupPayload(GatewayPayload<?> gatewayPayload) {
        return (Opcode.IDENTIFY.equals(gatewayPayload.getOp()) || Opcode.RESUME.equals(gatewayPayload.getOp())) ? false : true;
    }

    private static boolean isReadyOrResumed(Dispatch dispatch) {
        return Ready.class.isAssignableFrom(dispatch.getClass()) || Resumed.class.isAssignableFrom(dispatch.getClass());
    }

    private GatewayPayload<?> updateSequence(GatewayPayload<?> gatewayPayload) {
        if (gatewayPayload.getSequence() != null) {
            this.sequence.set(gatewayPayload.getSequence().intValue());
            notifyObserver(GatewayObserver.SEQUENCE);
        }
        return gatewayPayload;
    }

    private <T extends PayloadData> Mono<Void> handlePayload(GatewayPayload<T> gatewayPayload) {
        PayloadHandler<?> payloadHandler = this.handlerMap.get(gatewayPayload.getOp());
        if (payloadHandler != null) {
            return Mono.defer(() -> {
                return payloadHandler.handle(gatewayPayload);
            }).checkpoint("Dispatch handled for OP " + gatewayPayload.getOp().getRawOp() + " seq " + gatewayPayload.getSequence() + " type " + gatewayPayload.getType());
        }
        log.warn(LogUtil.format(this.currentContext, "Handler not found from: {}"), gatewayPayload);
        return Mono.empty();
    }

    private Mono<Void> handleDispatch(GatewayPayload<Dispatch> gatewayPayload) {
        if (gatewayPayload.getData() instanceof Ready) {
            this.sessionId.set(((Ready) gatewayPayload.getData()).sessionId());
        }
        if (gatewayPayload.getData() != null) {
            this.emissionStrategy.emitNext(this.dispatch, gatewayPayload.getData());
        }
        return Mono.empty();
    }

    private Mono<Void> handleHeartbeat(GatewayPayload<Heartbeat> gatewayPayload) {
        log.debug(LogUtil.format(this.currentContext, "Received heartbeat"));
        this.emissionStrategy.emitNext(this.outbound, GatewayPayload.heartbeat(ImmutableHeartbeat.of(this.sequence.get())));
        return Mono.empty();
    }

    private Mono<Void> handleReconnect(GatewayPayload<?> gatewayPayload) {
        this.sessionHandler.error(new ReconnectException(this.currentContext, "Reconnecting due to reconnect packet received"));
        return Mono.empty();
    }

    private Mono<Void> handleInvalidSession(GatewayPayload<InvalidSession> gatewayPayload) {
        if (gatewayPayload.getData().resumable()) {
            this.emissionStrategy.emitNext(this.outbound, GatewayPayload.resume(ImmutableResume.of(this.token, this.sessionId.get(), this.sequence.get())));
        } else {
            this.sessionHandler.error(new InvalidSessionException(this.currentContext, "Reconnecting due to non-resumable session invalidation"));
        }
        return Mono.empty();
    }

    private Mono<Void> handleHello(GatewayPayload<Hello> gatewayPayload) {
        this.heartbeatEmitter.start(Duration.ZERO, Duration.ofMillis(gatewayPayload.getData().heartbeatInterval()));
        return this.state.asFlux().next().doOnNext(state -> {
            if (state == GatewayConnection.State.START_RESUMING || state == GatewayConnection.State.RESUMING) {
                doResume(gatewayPayload);
            } else {
                doIdentify(gatewayPayload);
            }
        }).then();
    }

    private void doResume(GatewayPayload<Hello> gatewayPayload) {
        log.debug(LogUtil.format(this.currentContext, "Resuming Gateway session from {}"), Integer.valueOf(this.sequence.get()));
        this.emissionStrategy.emitNext(this.outbound, GatewayPayload.resume(ImmutableResume.of(this.token, this.sessionId.get(), this.sequence.get())));
    }

    private void doIdentify(GatewayPayload<Hello> gatewayPayload) {
        ImmutableIdentify build = Identify.builder().token(this.token).intents((Possible<Long>) this.identifyOptions.getIntents().map(intentSet -> {
            return Possible.of(Long.valueOf(intentSet.getRawValue()));
        }).orElse(Possible.absent())).properties(ImmutableIdentifyProperties.of(System.getProperty("os.name"), "Discord4J", "Discord4J")).compress((Boolean) false).largeThreshold(this.identifyOptions.getLargeThreshold()).shard(this.identifyOptions.getShardInfo().asArray()).presence((Possible<StatusUpdate>) this.identifyOptions.getInitialStatus().map((v0) -> {
            return Possible.of(v0);
        }).orElse(Possible.absent())).build();
        log.debug(LogUtil.format(this.currentContext, "Identifying to Gateway"), Integer.valueOf(this.sequence.get()));
        this.emissionStrategy.emitNext(this.outbound, GatewayPayload.identify(build));
    }

    private Mono<Void> handleHeartbeatAck(GatewayPayload<?> gatewayPayload) {
        this.responseTime = this.lastAck.updateAndGet(j -> {
            return System.nanoTime();
        }) - this.lastSent.get();
        this.missedAck.set(0);
        log.debug(LogUtil.format(this.currentContext, "Heartbeat acknowledged after {}"), getResponseTime());
        return Mono.empty();
    }

    private Retry retryFactory() {
        return GatewayRetrySpec.create(this.reconnectOptions, this.reconnectContext).doBeforeRetry(gatewayRetrySignal -> {
            this.state.emitNext(gatewayRetrySignal.nextState(), Sinks.EmitFailureHandler.FAIL_FAST);
            long iteration = gatewayRetrySignal.iteration();
            Duration nextBackoff = gatewayRetrySignal.nextBackoff();
            log.debug(LogUtil.format(getContextFromException(gatewayRetrySignal.failure()), "{} in {} (attempts: {})"), gatewayRetrySignal.nextState(), nextBackoff, Long.valueOf(iteration));
            if (gatewayRetrySignal.iteration() != 1) {
                this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.retryFailed(iteration - 1, nextBackoff));
                notifyObserver(GatewayObserver.RETRY_FAILED);
            } else if (gatewayRetrySignal.nextState() == GatewayConnection.State.RESUMING) {
                this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.retryStarted(nextBackoff));
                notifyObserver(GatewayObserver.RETRY_STARTED);
            } else {
                this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.retryStartedResume(nextBackoff));
                notifyObserver(GatewayObserver.RETRY_RESUME_STARTED);
            }
            if (gatewayRetrySignal.nextState() == GatewayConnection.State.RECONNECTING) {
                this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.sessionInvalidated());
            }
        });
    }

    private ContextView getContextFromException(Throwable th) {
        return th instanceof CloseException ? ((CloseException) th).getContext() : th instanceof GatewayException ? ((GatewayException) th).getContext() : Context.empty();
    }

    private Mono<CloseStatus> handleClose(DisconnectBehavior disconnectBehavior, CloseStatus closeStatus) {
        return Mono.deferContextual(contextView -> {
            DisconnectBehavior stop = GatewayRetrySpec.NON_RETRYABLE_STATUS_CODES.contains(Integer.valueOf(closeStatus.getCode())) ? DisconnectBehavior.stop(disconnectBehavior.getCause()) : disconnectBehavior;
            log.debug(LogUtil.format(contextView, "Closing and {} with status {}"), stop, closeStatus);
            this.state.emitNext(GatewayConnection.State.DISCONNECTING, Sinks.EmitFailureHandler.FAIL_FAST);
            this.heartbeatEmitter.stop();
            if (stop.getAction() == DisconnectBehavior.Action.STOP_ABRUPTLY) {
                this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.disconnectedResume());
                notifyObserver(GatewayObserver.DISCONNECTED_RESUME);
            } else if (stop.getAction() == DisconnectBehavior.Action.STOP) {
                this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.disconnected(disconnectBehavior, closeStatus));
                this.sequence.set(0);
                this.sessionId.set("");
                notifyObserver(GatewayObserver.DISCONNECTED);
            }
            switch (stop.getAction()) {
                case STOP_ABRUPTLY:
                case STOP:
                    this.reconnectContext.clear();
                    this.responseTime = 0L;
                    this.lastSent.set(0L);
                    this.lastAck.set(0L);
                    this.state.emitNext(GatewayConnection.State.DISCONNECTED, Sinks.EmitFailureHandler.FAIL_FAST);
                    return stop.getCause() != null ? Mono.just(new CloseException(closeStatus, contextView, stop.getCause())).flatMap(closeException -> {
                        this.disconnectNotifier.emitError(closeException, Sinks.EmitFailureHandler.FAIL_FAST);
                        return Mono.error(closeException);
                    }) : Mono.just(closeStatus).doOnNext(closeStatus2 -> {
                        this.disconnectNotifier.emitValue(closeStatus, Sinks.EmitFailureHandler.FAIL_FAST);
                    });
                case RETRY_ABRUPTLY:
                case RETRY:
                default:
                    return Mono.error(new CloseException(closeStatus, contextView, stop.getCause()));
            }
        });
    }

    @Override // discord4j.gateway.GatewayClient
    public Mono<CloseStatus> close(boolean z) {
        return Mono.defer(() -> {
            if (this.sessionHandler == null || this.disconnectNotifier == null) {
                return Mono.error(new IllegalStateException("Gateway client is not active!"));
            }
            if (z) {
                this.sessionHandler.close(DisconnectBehavior.stopAbruptly(null));
            } else {
                this.sessionHandler.close(DisconnectBehavior.stop(null));
            }
            return this.disconnectNotifier.asMono();
        });
    }

    @Override // discord4j.gateway.GatewayClient
    public Flux<Dispatch> dispatch() {
        return this.dispatch.asFlux();
    }

    @Override // discord4j.gateway.GatewayClient
    public Flux<GatewayPayload<?>> receiver() {
        PayloadReader payloadReader = this.payloadReader;
        Objects.requireNonNull(payloadReader);
        return receiver(payloadReader::read);
    }

    @Override // discord4j.gateway.GatewayClient
    public <T> Flux<T> receiver(Function<ByteBuf, Publisher<? extends T>> function) {
        return this.receiver.asFlux().map((v0) -> {
            return v0.retainedDuplicate();
        }).doOnDiscard(ByteBuf.class, DefaultGatewayClient::safeRelease).flatMap(function);
    }

    private static void safeRelease(ByteBuf byteBuf) {
        if (byteBuf.refCnt() > 0) {
            try {
                byteBuf.release();
            } catch (IllegalReferenceCountException e) {
                if (log.isDebugEnabled()) {
                    log.debug("", e);
                }
            }
        }
    }

    @Override // discord4j.gateway.GatewayClient
    public Sinks.Many<GatewayPayload<?>> sender() {
        return this.outbound;
    }

    @Override // discord4j.gateway.GatewayClient
    public Mono<Void> sendBuffer(Publisher<ByteBuf> publisher) {
        return Flux.from(publisher).doOnNext(byteBuf -> {
            this.emissionStrategy.emitNext(this.sender, byteBuf);
        }).then();
    }

    @Override // discord4j.gateway.GatewayClient
    public int getShardCount() {
        return this.identifyOptions.getShardInfo().getCount();
    }

    @Override // discord4j.gateway.GatewayClient
    public String getSessionId() {
        return this.sessionId.get();
    }

    @Override // discord4j.gateway.GatewayClient
    public int getSequence() {
        return this.sequence.get();
    }

    @Override // discord4j.gateway.GatewayClient
    public Flux<GatewayConnection.State> stateEvents() {
        return this.state.asFlux();
    }

    @Override // discord4j.gateway.GatewayClient
    public Mono<Boolean> isConnected() {
        return this.state.asFlux().next().filter(state -> {
            return state == GatewayConnection.State.CONNECTED;
        }).hasElement().defaultIfEmpty(false);
    }

    @Override // discord4j.gateway.GatewayClient
    public Duration getResponseTime() {
        return Duration.ofNanos(this.responseTime);
    }

    private int outboundLimiterCapacity() {
        String property = System.getProperty(OUTBOUND_CAPACITY_PROPERTY);
        if (property == null) {
            return 115;
        }
        try {
            int parseInt = Integer.parseInt(property);
            log.info("Overriding default outbound limiter capacity: {}", Integer.valueOf(parseInt));
            return parseInt;
        } catch (NumberFormatException e) {
            log.warn("Invalid custom outbound limiter capacity: {}", property);
            return 115;
        }
    }
}
