package ml.denisd3d.mc2discord.repack.discord4j.voice;

import java.time.Duration;
import ml.denisd3d.mc2discord.repack.discord4j.common.LogUtil;
import ml.denisd3d.mc2discord.repack.discord4j.common.close.CloseStatus;
import ml.denisd3d.mc2discord.repack.discord4j.common.close.DisconnectBehavior;
import ml.denisd3d.mc2discord.repack.discord4j.common.sinks.EmissionStrategy;
import ml.denisd3d.mc2discord.repack.discord4j.voice.retry.PartialDisconnectException;
import ml.denisd3d.mc2discord.repack.discord4j.voice.retry.VoiceGatewayException;
import ml.denisd3d.mc2discord.repack.io.netty.buffer.ByteBuf;
import ml.denisd3d.mc2discord.repack.io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import ml.denisd3d.mc2discord.repack.io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import ml.denisd3d.mc2discord.repack.org.reactivestreams.Publisher;
import ml.denisd3d.mc2discord.repack.reactor.core.publisher.Flux;
import ml.denisd3d.mc2discord.repack.reactor.core.publisher.Mono;
import ml.denisd3d.mc2discord.repack.reactor.core.publisher.Sinks;
import ml.denisd3d.mc2discord.repack.reactor.netty.http.websocket.WebsocketInbound;
import ml.denisd3d.mc2discord.repack.reactor.netty.http.websocket.WebsocketOutbound;
import ml.denisd3d.mc2discord.repack.reactor.util.Logger;
import ml.denisd3d.mc2discord.repack.reactor.util.Loggers;
import ml.denisd3d.mc2discord.repack.reactor.util.context.ContextView;
import ml.denisd3d.mc2discord.repack.reactor.util.function.Tuple2;

/* loaded from: input_file:ml/denisd3d/mc2discord/repack/discord4j/voice/VoiceWebsocketHandler.class */
public class VoiceWebsocketHandler {
    private static final Logger log = Loggers.getLogger((Class<?>) VoiceWebsocketHandler.class);
    private final Sinks.Many<ByteBuf> inbound;
    private final Flux<ByteBuf> outbound;
    private final ContextView context;
    private final Sinks.One<DisconnectBehavior> sessionClose = Sinks.one();
    private final EmissionStrategy emissionStrategy = EmissionStrategy.park(Duration.ofNanos(10));

    public VoiceWebsocketHandler(Sinks.Many<ByteBuf> many, Flux<ByteBuf> flux, ContextView contextView) {
        this.inbound = many;
        this.outbound = flux;
        this.context = contextView;
    }

    public Mono<Tuple2<DisconnectBehavior, CloseStatus>> handle(WebsocketInbound websocketInbound, WebsocketOutbound websocketOutbound) {
        Mono map = this.sessionClose.asMono().doOnNext(disconnectBehavior -> {
            log.debug(LogUtil.format(this.context, "Closing session with behavior: {}"), disconnectBehavior);
        }).flatMap(disconnectBehavior2 -> {
            switch (disconnectBehavior2.getAction()) {
                case RETRY_ABRUPTLY:
                case STOP_ABRUPTLY:
                    return Mono.error(disconnectBehavior2.getCause() != null ? disconnectBehavior2.getCause() : new PartialDisconnectException(this.context));
                case RETRY:
                case STOP:
                default:
                    return Mono.just(CloseStatus.NORMAL_CLOSE);
            }
        }).map(closeStatus -> {
            return new CloseWebSocketFrame(closeStatus.getCode(), closeStatus.getReason().orElse(null));
        });
        Mono doOnNext = websocketInbound.receiveCloseStatus().map(webSocketCloseStatus -> {
            return new CloseStatus(webSocketCloseStatus.code(), webSocketCloseStatus.reasonText());
        }).doOnNext(closeStatus2 -> {
            log.debug(LogUtil.format(this.context, "Received close status: {}"), closeStatus2);
            close(DisconnectBehavior.retryAbruptly(new VoiceGatewayException(this.context, "Inbound close status")));
        });
        Mono<Void> then = websocketOutbound.sendObject((Publisher<?>) Flux.merge(map, this.outbound.map(TextWebSocketFrame::new))).then();
        websocketInbound.withConnection(connection -> {
            connection.onDispose(() -> {
                log.debug(LogUtil.format(this.context, "Connection disposed"));
            });
        });
        return Mono.zip(then, websocketInbound.aggregateFrames().receiveFrames().map((v0) -> {
            return v0.content();
        }).doOnNext(this::emitInbound).then()).doOnError(this::error).onErrorResume(th -> {
            return th.getCause() instanceof VoiceGatewayException;
        }, th2 -> {
            return Mono.empty();
        }).then(Mono.zip(this.sessionClose.asMono(), doOnNext.defaultIfEmpty(CloseStatus.ABNORMAL_CLOSE)));
    }

    private void emitInbound(ByteBuf byteBuf) {
        this.emissionStrategy.emitNext(this.inbound, byteBuf);
    }

    public void close() {
        close(DisconnectBehavior.retry(null));
    }

    public void close(DisconnectBehavior disconnectBehavior) {
        this.sessionClose.emitValue(disconnectBehavior, Sinks.EmitFailureHandler.FAIL_FAST);
    }

    public void error(Throwable th) {
        log.info(LogUtil.format(this.context, "Triggering error sequence: {}"), th.toString());
        close(DisconnectBehavior.retryAbruptly(th));
    }
}
