package x.lib.discord4j.voice;

import x.lib.io.netty.buffer.ByteBuf;
import x.lib.reactor.core.Disposable;
import x.lib.reactor.core.publisher.Flux;
import x.lib.reactor.core.publisher.Mono;
import x.lib.reactor.core.scheduler.Scheduler;
import x.lib.reactor.util.Logger;
import x.lib.reactor.util.Loggers;

/* loaded from: input_file:x/lib/discord4j/voice/LocalVoiceReceiveTask.class */
public class LocalVoiceReceiveTask implements Disposable {
    private static final Logger log = Loggers.getLogger((Class<?>) LocalVoiceReceiveTask.class);
    private final Disposable task;

    public LocalVoiceReceiveTask(Scheduler scheduler, Flux<ByteBuf> flux, PacketTransformer packetTransformer, AudioReceiver audioReceiver) {
        this.task = flux.flatMap(byteBuf -> {
            return Mono.fromCallable(() -> {
                return packetTransformer.nextReceive(byteBuf);
            }).map(bArr -> {
                if (audioReceiver != AudioReceiver.NO_OP) {
                    audioReceiver.getBuffer().put(bArr);
                    audioReceiver.getBuffer().flip();
                    audioReceiver.receive();
                }
                return bArr;
            }).onErrorResume(th -> {
                log.error("Error while receiving audio", th);
                return Mono.empty();
            });
        }).subscribeOn(scheduler).subscribe();
    }

    @Override // x.lib.reactor.core.Disposable
    public void dispose() {
        this.task.dispose();
    }

    @Override // x.lib.reactor.core.Disposable
    public boolean isDisposed() {
        return this.task.isDisposed();
    }
}
