package discord4j.gateway;

import discord4j.common.LogUtil;
import discord4j.common.close.CloseStatus;
import discord4j.common.close.DisconnectBehavior;
import discord4j.common.sinks.EmissionStrategy;
import discord4j.gateway.retry.GatewayException;
import discord4j.gateway.retry.PartialDisconnectException;
import discord4j.gateway.retry.ReconnectException;
import java.time.Duration;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.ContextView;
import reactor.util.function.Tuple2;
import shadow.io.netty.buffer.ByteBuf;
import shadow.io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import shadow.io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import shadow.io.netty.util.IllegalReferenceCountException;

/* loaded from: input_file:discord4j/gateway/GatewayWebsocketHandler.class */
public class GatewayWebsocketHandler {
    private static final Logger log = Loggers.getLogger((Class<?>) GatewayWebsocketHandler.class);
    private final Sinks.Many<ByteBuf> inbound;
    private final Flux<ByteBuf> outbound;
    private final Sinks.One<DisconnectBehavior> sessionClose;
    private final ContextView context;
    private final boolean unpooled;
    private final EmissionStrategy emissionStrategy;

    public GatewayWebsocketHandler(Sinks.Many<ByteBuf> many, Flux<ByteBuf> flux, ContextView contextView) {
        this(many, flux, contextView, false);
    }

    public GatewayWebsocketHandler(Sinks.Many<ByteBuf> many, Flux<ByteBuf> flux, ContextView contextView, boolean z) {
        this.inbound = many;
        this.outbound = flux;
        this.sessionClose = Sinks.one();
        this.context = contextView;
        this.unpooled = z;
        this.emissionStrategy = EmissionStrategy.park(Duration.ofNanos(10L));
    }

    public Mono<Tuple2<DisconnectBehavior, CloseStatus>> handle(WebsocketInbound websocketInbound, WebsocketOutbound websocketOutbound) {
        ZlibDecompressor zlibDecompressor = new ZlibDecompressor(websocketOutbound.alloc());
        Mono map = this.sessionClose.asMono().doOnNext(disconnectBehavior -> {
            log.debug(LogUtil.format(this.context, "Closing session with behavior: {}"), disconnectBehavior);
        }).flatMap(disconnectBehavior2 -> {
            switch (disconnectBehavior2.getAction()) {
                case RETRY_ABRUPTLY:
                case STOP_ABRUPTLY:
                    return Mono.error(disconnectBehavior2.getCause() != null ? disconnectBehavior2.getCause() : new PartialDisconnectException(this.context));
                case RETRY:
                case STOP:
                default:
                    return Mono.just(CloseStatus.NORMAL_CLOSE);
            }
        }).map(closeStatus -> {
            return new CloseWebSocketFrame(closeStatus.getCode(), closeStatus.getReason().orElse(null));
        });
        Mono doOnNext = websocketInbound.receiveCloseStatus().map(webSocketCloseStatus -> {
            return new CloseStatus(webSocketCloseStatus.code(), webSocketCloseStatus.reasonText());
        }).doOnNext(closeStatus2 -> {
            log.debug(LogUtil.format(this.context, "Received close status: {}"), closeStatus2);
        }).doOnNext(closeStatus3 -> {
            close(DisconnectBehavior.retryAbruptly(new GatewayException(this.context, "Inbound close status")));
        });
        Mono<Void> then = websocketOutbound.sendObject((Publisher<?>) Flux.merge(map, this.outbound.map(TextWebSocketFrame::new))).then();
        websocketInbound.withConnection(connection -> {
            connection.onDispose(() -> {
                log.debug(LogUtil.format(this.context, "Connection disposed"));
            });
        });
        Flux<V> map2 = websocketInbound.aggregateFrames().receiveFrames().map((v0) -> {
            return v0.content();
        });
        Objects.requireNonNull(zlibDecompressor);
        return Mono.zip(then, map2.transformDeferred(zlibDecompressor::completeMessages).doOnNext(this::emitInbound).doOnNext(this::safeRelease).then()).doOnError(this::error).onErrorResume(th -> {
            return th.getCause() instanceof GatewayException;
        }, th2 -> {
            return Mono.empty();
        }).then(Mono.zip(this.sessionClose.asMono(), doOnNext.defaultIfEmpty(CloseStatus.ABNORMAL_CLOSE)));
    }

    private void emitInbound(ByteBuf byteBuf) {
        if (this.emissionStrategy.emitNext(this.inbound, byteBuf)) {
            return;
        }
        safeRelease(byteBuf);
    }

    public void close() {
        close(DisconnectBehavior.retry(null));
    }

    public void close(DisconnectBehavior disconnectBehavior) {
        this.sessionClose.emitValue(disconnectBehavior, Sinks.EmitFailureHandler.FAIL_FAST);
    }

    public void error(Throwable th) {
        if (!(th instanceof ReconnectException)) {
            log.info(LogUtil.format(this.context, "Triggering error sequence: {}"), th.toString());
        }
        close(DisconnectBehavior.retryAbruptly(th));
    }

    private void safeRelease(ByteBuf byteBuf) {
        if (this.unpooled || byteBuf.refCnt() <= 0) {
            return;
        }
        try {
            byteBuf.release();
        } catch (IllegalReferenceCountException e) {
            if (log.isDebugEnabled()) {
                log.debug("", e);
            }
        }
    }
}
