package discord4j.common.operator;

import discord4j.common.sinks.EmissionStrategy;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:discord4j/common/operator/RateLimitOperator.class */
public class RateLimitOperator<T> implements Function<Publisher<T>, Publisher<T>> {
    private static final Logger log = Loggers.getLogger("discord4j.limiter");
    private static final Supplier<Scheduler> DEFAULT_PUBLISH_SCHEDULER = () -> {
        return Schedulers.newSingle("d4j-limiter", true);
    };
    private final AtomicInteger tokens;
    private final Duration refillPeriod;
    private final Scheduler delayScheduler;
    private final Sinks.Many<Integer> tokenSink;
    private final Scheduler tokenPublishScheduler;
    private final EmissionStrategy emissionStrategy;

    public RateLimitOperator(int i, Duration duration, Scheduler scheduler) {
        this(i, duration, scheduler, DEFAULT_PUBLISH_SCHEDULER.get());
    }

    public RateLimitOperator(int i, Duration duration, Scheduler scheduler, Scheduler scheduler2) {
        this.tokens = new AtomicInteger(i);
        this.refillPeriod = duration;
        this.delayScheduler = scheduler;
        this.tokenSink = Sinks.many().replay().latestOrDefault(Integer.valueOf(i));
        this.tokenPublishScheduler = scheduler2;
        this.emissionStrategy = EmissionStrategy.park(Duration.ofNanos(10L));
    }

    private String id() {
        return Integer.toHexString(hashCode());
    }

    @Override // java.util.function.Function
    public Publisher<T> apply(Publisher<T> publisher) {
        return Flux.from(publisher).flatMap(obj -> {
            return availableTokens().next().doOnSubscribe(subscription -> {
                if (log.isTraceEnabled()) {
                    log.trace("[{}] Subscribed to limiter", id());
                }
            }).map(num -> {
                acquire();
                Mono.delay(this.refillPeriod, this.delayScheduler).subscribe(l -> {
                    release();
                });
                return obj;
            });
        });
    }

    private void acquire() {
        int decrementAndGet = this.tokens.decrementAndGet();
        if (log.isTraceEnabled()) {
            log.trace("[{}] Acquired a token, {} tokens remaining", id(), Integer.valueOf(decrementAndGet));
        }
        this.emissionStrategy.emitNext(this.tokenSink, Integer.valueOf(decrementAndGet));
    }

    private void release() {
        int incrementAndGet = this.tokens.incrementAndGet();
        if (log.isTraceEnabled()) {
            log.trace("[{}] Released a token, {} tokens remaining", id(), Integer.valueOf(incrementAndGet));
        }
        this.emissionStrategy.emitNext(this.tokenSink, Integer.valueOf(incrementAndGet));
    }

    private Flux<Integer> availableTokens() {
        return this.tokenSink.asFlux().publishOn(this.tokenPublishScheduler).filter(num -> {
            return this.tokens.get() > 0;
        });
    }
}
