package fr.denisd3d.mc2discord.shadow.discord4j.core.event;

import fr.denisd3d.mc2discord.shadow.discord4j.common.LogUtil;
import fr.denisd3d.mc2discord.shadow.discord4j.common.annotations.Experimental;
import fr.denisd3d.mc2discord.shadow.discord4j.common.sinks.EmissionStrategy;
import fr.denisd3d.mc2discord.shadow.discord4j.core.event.domain.Event;
import fr.denisd3d.mc2discord.shadow.org.reactivestreams.Subscription;
import fr.denisd3d.mc2discord.shadow.reactor.core.publisher.Flux;
import fr.denisd3d.mc2discord.shadow.reactor.core.publisher.Sinks;
import fr.denisd3d.mc2discord.shadow.reactor.core.scheduler.Scheduler;
import fr.denisd3d.mc2discord.shadow.reactor.util.Logger;
import fr.denisd3d.mc2discord.shadow.reactor.util.Loggers;
import fr.denisd3d.mc2discord.shadow.reactor.util.concurrent.Queues;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

@Experimental
/* loaded from: input_file:fr/denisd3d/mc2discord/shadow/discord4j/core/event/SinksEventDispatcher.class */
public class SinksEventDispatcher implements EventDispatcher {
    private static final Logger log = Loggers.getLogger((Class<?>) SinksEventDispatcher.class);
    private final Sinks.Many<Event> events;
    private final EmissionStrategy emissionStrategy;
    private final Scheduler eventScheduler;

    /* loaded from: input_file:fr/denisd3d/mc2discord/shadow/discord4j/core/event/SinksEventDispatcher$Builder.class */
    public static class Builder {
        protected Function<Sinks.ManySpec, Sinks.Many<Event>> eventSinkFactory;
        protected EmissionStrategy emissionStrategy;
        protected Scheduler eventScheduler;

        public Builder eventSink(Function<Sinks.ManySpec, Sinks.Many<Event>> function) {
            this.eventSinkFactory = (Function) Objects.requireNonNull(function);
            return this;
        }

        public Builder emissionStrategy(EmissionStrategy emissionStrategy) {
            this.emissionStrategy = (EmissionStrategy) Objects.requireNonNull(emissionStrategy);
            return this;
        }

        public Builder eventScheduler(Scheduler scheduler) {
            this.eventScheduler = (Scheduler) Objects.requireNonNull(scheduler);
            return this;
        }

        public EventDispatcher build() {
            if (this.eventSinkFactory == null) {
                this.eventSinkFactory = manySpec -> {
                    return manySpec.multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
                };
            }
            if (this.emissionStrategy == null) {
                this.emissionStrategy = EmissionStrategy.timeoutDrop(Duration.ofSeconds(10L));
            }
            if (this.eventScheduler == null) {
                this.eventScheduler = EventDispatcher.DEFAULT_EVENT_SCHEDULER.get();
            }
            return new SinksEventDispatcher(this.eventSinkFactory, this.emissionStrategy, this.eventScheduler);
        }
    }

    public SinksEventDispatcher(Function<Sinks.ManySpec, Sinks.Many<Event>> function, EmissionStrategy emissionStrategy, Scheduler scheduler) {
        this.events = function.apply(Sinks.many());
        this.emissionStrategy = emissionStrategy;
        this.eventScheduler = scheduler;
    }

    @Override // fr.denisd3d.mc2discord.shadow.discord4j.core.event.EventDispatcher
    public <E extends Event> Flux<E> on(Class<E> cls) {
        AtomicReference atomicReference = new AtomicReference();
        return this.events.asFlux().publishOn(this.eventScheduler).ofType(cls).handle((event, synchronousSink) -> {
            if (log.isTraceEnabled()) {
                log.trace(LogUtil.format(synchronousSink.currentContext().put(LogUtil.KEY_SHARD_ID, Integer.valueOf(event.getShardInfo().getIndex())), "{}"), event.toString());
            }
            synchronousSink.next(event);
        }).doOnSubscribe(subscription -> {
            atomicReference.set(subscription);
            if (log.isDebugEnabled()) {
                log.debug("Subscription {} to {} created", Integer.toHexString(subscription.hashCode()), cls.getSimpleName());
            }
        }).doFinally(signalType -> {
            if (log.isDebugEnabled()) {
                log.debug("Subscription {} to {} disposed due to {}", Integer.toHexString(((Subscription) atomicReference.get()).hashCode()), cls.getSimpleName(), signalType);
            }
        });
    }

    @Override // fr.denisd3d.mc2discord.shadow.discord4j.core.event.EventDispatcher
    public void publish(Event event) {
        this.emissionStrategy.emitNext(this.events, event);
    }

    @Override // fr.denisd3d.mc2discord.shadow.discord4j.core.event.EventDispatcher
    public void shutdown() {
        this.emissionStrategy.emitComplete(this.events);
    }
}
