package fr.denisd3d.mc2discord.shadow.discord4j.core.event;

import fr.denisd3d.mc2discord.shadow.discord4j.common.LogUtil;
import fr.denisd3d.mc2discord.shadow.discord4j.core.event.DefaultEventDispatcher;
import fr.denisd3d.mc2discord.shadow.discord4j.core.event.domain.Event;
import fr.denisd3d.mc2discord.shadow.discord4j.core.event.domain.guild.GuildCreateEvent;
import fr.denisd3d.mc2discord.shadow.discord4j.core.event.domain.lifecycle.GatewayLifecycleEvent;
import fr.denisd3d.mc2discord.shadow.org.reactivestreams.Publisher;
import fr.denisd3d.mc2discord.shadow.org.reactivestreams.Subscription;
import fr.denisd3d.mc2discord.shadow.reactor.core.publisher.EmitterProcessor;
import fr.denisd3d.mc2discord.shadow.reactor.core.publisher.Flux;
import fr.denisd3d.mc2discord.shadow.reactor.core.publisher.FluxProcessor;
import fr.denisd3d.mc2discord.shadow.reactor.core.publisher.FluxSink;
import fr.denisd3d.mc2discord.shadow.reactor.core.publisher.Mono;
import fr.denisd3d.mc2discord.shadow.reactor.core.publisher.ReplayProcessor;
import fr.denisd3d.mc2discord.shadow.reactor.core.scheduler.Scheduler;
import fr.denisd3d.mc2discord.shadow.reactor.core.scheduler.Schedulers;
import fr.denisd3d.mc2discord.shadow.reactor.util.Logger;
import fr.denisd3d.mc2discord.shadow.reactor.util.Loggers;
import fr.denisd3d.mc2discord.shadow.reactor.util.concurrent.Queues;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

/* loaded from: input_file:fr/denisd3d/mc2discord/shadow/discord4j/core/event/ReplayingEventDispatcher.class */
public class ReplayingEventDispatcher implements EventDispatcher {
    private static final Logger log = Loggers.getLogger((Class<?>) ReplayingEventDispatcher.class);
    private final FluxProcessor<Event, Event> eventProcessor;
    private final FluxSink<Event> sink;
    private final Scheduler eventScheduler;
    private final ReplayProcessor<Event> replayEventProcessor;
    private final FluxSink<Event> replaySink;
    private final Predicate<Event> replayEventFilter;
    private final Scheduler timedTaskScheduler;
    private final Publisher<?> stopReplayingTrigger;
    private final AtomicReference<State> state = new AtomicReference<>(State.REPLAY);

    /* loaded from: input_file:fr/denisd3d/mc2discord/shadow/discord4j/core/event/ReplayingEventDispatcher$Builder.class */
    public static class Builder extends DefaultEventDispatcher.Builder {
        protected ReplayProcessor<Event> replayEventProcessor;
        protected Publisher<?> stopReplayingTrigger;
        protected Predicate<Event> replayEventFilter = event -> {
            return (event instanceof GatewayLifecycleEvent) || (event instanceof GuildCreateEvent);
        };
        protected Scheduler timedTaskScheduler = Schedulers.parallel();
        protected FluxSink.OverflowStrategy replayEventOverflowStrategy = FluxSink.OverflowStrategy.DROP;

        protected Builder() {
        }

        @Override // fr.denisd3d.mc2discord.shadow.discord4j.core.event.DefaultEventDispatcher.Builder, fr.denisd3d.mc2discord.shadow.discord4j.core.event.EventDispatcher.Builder
        public Builder eventProcessor(FluxProcessor<Event, Event> fluxProcessor) {
            this.eventProcessor = (FluxProcessor) Objects.requireNonNull(fluxProcessor);
            return this;
        }

        @Override // fr.denisd3d.mc2discord.shadow.discord4j.core.event.DefaultEventDispatcher.Builder, fr.denisd3d.mc2discord.shadow.discord4j.core.event.EventDispatcher.Builder
        public Builder overflowStrategy(FluxSink.OverflowStrategy overflowStrategy) {
            this.overflowStrategy = (FluxSink.OverflowStrategy) Objects.requireNonNull(overflowStrategy);
            return this;
        }

        @Override // fr.denisd3d.mc2discord.shadow.discord4j.core.event.DefaultEventDispatcher.Builder, fr.denisd3d.mc2discord.shadow.discord4j.core.event.EventDispatcher.Builder
        public Builder eventScheduler(Scheduler scheduler) {
            this.eventScheduler = (Scheduler) Objects.requireNonNull(scheduler);
            return this;
        }

        public Builder replayEventProcessor(ReplayProcessor<Event> replayProcessor) {
            this.replayEventProcessor = (ReplayProcessor) Objects.requireNonNull(replayProcessor);
            return this;
        }

        public Builder replayEventOverflowStrategy(FluxSink.OverflowStrategy overflowStrategy) {
            this.replayEventOverflowStrategy = (FluxSink.OverflowStrategy) Objects.requireNonNull(overflowStrategy);
            return this;
        }

        public Builder replayEventFilter(Predicate<Event> predicate) {
            this.replayEventFilter = (Predicate) Objects.requireNonNull(predicate);
            return this;
        }

        public Builder timedTaskScheduler(Scheduler scheduler) {
            this.timedTaskScheduler = (Scheduler) Objects.requireNonNull(scheduler);
            return this;
        }

        public Builder stopReplayingTrigger(Publisher<?> publisher) {
            this.stopReplayingTrigger = (Publisher) Objects.requireNonNull(publisher);
            return this;
        }

        @Override // fr.denisd3d.mc2discord.shadow.discord4j.core.event.DefaultEventDispatcher.Builder, fr.denisd3d.mc2discord.shadow.discord4j.core.event.EventDispatcher.Builder
        public EventDispatcher build() {
            if (this.eventProcessor == null) {
                this.eventProcessor = EmitterProcessor.create(Queues.SMALL_BUFFER_SIZE, false);
            }
            if (this.eventScheduler == null) {
                this.eventScheduler = EventDispatcher.DEFAULT_EVENT_SCHEDULER.get();
            }
            if (this.timedTaskScheduler == null) {
                this.timedTaskScheduler = Schedulers.parallel();
            }
            if (this.replayEventProcessor == null) {
                this.replayEventProcessor = ReplayProcessor.createTimeout(Duration.ofMinutes(2L), this.timedTaskScheduler);
            }
            if (this.stopReplayingTrigger == null) {
                this.stopReplayingTrigger = Mono.delay(Duration.ofSeconds(5L), this.timedTaskScheduler);
            }
            return new ReplayingEventDispatcher(this.eventProcessor, this.overflowStrategy, this.eventScheduler, this.replayEventProcessor, this.replayEventOverflowStrategy, this.replayEventFilter, this.timedTaskScheduler, this.stopReplayingTrigger);
        }

        @Override // fr.denisd3d.mc2discord.shadow.discord4j.core.event.DefaultEventDispatcher.Builder, fr.denisd3d.mc2discord.shadow.discord4j.core.event.EventDispatcher.Builder
        public /* bridge */ /* synthetic */ DefaultEventDispatcher.Builder eventProcessor(FluxProcessor fluxProcessor) {
            return eventProcessor((FluxProcessor<Event, Event>) fluxProcessor);
        }
    }

    /* loaded from: input_file:fr/denisd3d/mc2discord/shadow/discord4j/core/event/ReplayingEventDispatcher$State.class */
    private enum State {
        REPLAY,
        REPLAY_EMIT,
        EMIT
    }

    public ReplayingEventDispatcher(FluxProcessor<Event, Event> fluxProcessor, FluxSink.OverflowStrategy overflowStrategy, Scheduler scheduler, ReplayProcessor<Event> replayProcessor, FluxSink.OverflowStrategy overflowStrategy2, Predicate<Event> predicate, Scheduler scheduler2, Publisher<?> publisher) {
        this.eventProcessor = fluxProcessor;
        this.sink = fluxProcessor.sink(overflowStrategy);
        this.eventScheduler = scheduler;
        this.replayEventProcessor = replayProcessor;
        this.replaySink = replayProcessor.sink(overflowStrategy2);
        this.replayEventFilter = predicate;
        this.timedTaskScheduler = scheduler2;
        this.stopReplayingTrigger = publisher;
    }

    @Override // fr.denisd3d.mc2discord.shadow.discord4j.core.event.EventDispatcher
    public <E extends Event> Flux<E> on(Class<E> cls) {
        AtomicReference atomicReference = new AtomicReference();
        return this.eventProcessor.publishOn(this.eventScheduler).startWith(this.replayEventProcessor.timeout(Duration.ofMillis(1L), Mono.empty(), this.timedTaskScheduler)).ofType(cls).handle((event, synchronousSink) -> {
            if (log.isTraceEnabled()) {
                log.trace(LogUtil.format(synchronousSink.currentContext().put(LogUtil.KEY_SHARD_ID, Integer.valueOf(event.getShardInfo().getIndex())), "{}"), event.toString());
            }
            synchronousSink.next(event);
        }).doOnSubscribe(subscription -> {
            atomicReference.set(subscription);
            if (log.isDebugEnabled()) {
                log.debug("Subscription {} to {} created", Integer.toHexString(subscription.hashCode()), cls.getSimpleName());
            }
        }).doFinally(signalType -> {
            if (log.isDebugEnabled()) {
                log.debug("Subscription {} to {} disposed due to {}", Integer.toHexString(((Subscription) atomicReference.get()).hashCode()), cls.getSimpleName(), signalType);
            }
        });
    }

    @Override // fr.denisd3d.mc2discord.shadow.discord4j.core.event.EventDispatcher
    public void publish(Event event) {
        if (this.state.get() != State.EMIT && this.replayEventFilter.test(event)) {
            this.replaySink.next(event);
        }
        if (this.eventProcessor.hasDownstreams()) {
            if (this.state.compareAndSet(State.REPLAY, State.REPLAY_EMIT)) {
                Flux.from(this.stopReplayingTrigger).doFinally(signalType -> {
                    this.state.set(State.EMIT);
                }).subscribe();
            }
            this.sink.next(event);
        } else if (this.state.compareAndSet(State.EMIT, State.REPLAY) && this.replayEventFilter.test(event)) {
            log.warn("All subscribers have disconnected from this dispatcher");
            this.replaySink.next(event);
        }
    }

    @Override // fr.denisd3d.mc2discord.shadow.discord4j.core.event.EventDispatcher
    public void shutdown() {
        this.replaySink.complete();
        this.sink.complete();
    }

    public static EventDispatcher create() {
        return builder().build();
    }

    public static Builder builder() {
        return new Builder();
    }
}
