package net.william278.papiproxybridge.libraries.lettuce.core.pubsub;

import java.util.Map;
import net.william278.papiproxybridge.libraries.lettuce.core.RedisReactiveCommandsImpl;
import net.william278.papiproxybridge.libraries.lettuce.core.codec.RedisCodec;
import net.william278.papiproxybridge.libraries.lettuce.core.pubsub.api.reactive.ChannelMessage;
import net.william278.papiproxybridge.libraries.lettuce.core.pubsub.api.reactive.PatternMessage;
import net.william278.papiproxybridge.libraries.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import net.william278.papiproxybridge.libraries.reactor.core.publisher.Flux;
import net.william278.papiproxybridge.libraries.reactor.core.publisher.FluxSink;
import net.william278.papiproxybridge.libraries.reactor.core.publisher.Mono;

/* loaded from: input_file:net/william278/papiproxybridge/libraries/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.class */
public class RedisPubSubReactiveCommandsImpl<K, V> extends RedisReactiveCommandsImpl<K, V> implements RedisPubSubReactiveCommands<K, V> {
    private final PubSubCommandBuilder<K, V> commandBuilder;

    public RedisPubSubReactiveCommandsImpl(StatefulRedisPubSubConnection<K, V> statefulRedisPubSubConnection, RedisCodec<K, V> redisCodec) {
        super(statefulRedisPubSubConnection, redisCodec, null);
        this.commandBuilder = new PubSubCommandBuilder<>(redisCodec);
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands
    public Flux<PatternMessage<K, V>> observePatterns() {
        return observePatterns(FluxSink.OverflowStrategy.BUFFER);
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands
    public Flux<PatternMessage<K, V>> observePatterns(FluxSink.OverflowStrategy overflowStrategy) {
        return Flux.create(fluxSink -> {
            RedisPubSubAdapter<K, V> redisPubSubAdapter = new RedisPubSubAdapter<K, V>() { // from class: net.william278.papiproxybridge.libraries.lettuce.core.pubsub.RedisPubSubReactiveCommandsImpl.1
                @Override // net.william278.papiproxybridge.libraries.lettuce.core.pubsub.RedisPubSubAdapter, net.william278.papiproxybridge.libraries.lettuce.core.pubsub.RedisPubSubListener
                public void message(K k, K k2, V v) {
                    fluxSink.next(new PatternMessage(k, k2, v));
                }
            };
            StatefulRedisPubSubConnection<K, V> statefulConnection = getStatefulConnection();
            statefulConnection.addListener(redisPubSubAdapter);
            fluxSink.onDispose(() -> {
                statefulConnection.removeListener(redisPubSubAdapter);
            });
        }, overflowStrategy);
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands
    public Flux<ChannelMessage<K, V>> observeChannels() {
        return observeChannels(FluxSink.OverflowStrategy.BUFFER);
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands
    public Flux<ChannelMessage<K, V>> observeChannels(FluxSink.OverflowStrategy overflowStrategy) {
        return Flux.create(fluxSink -> {
            RedisPubSubAdapter<K, V> redisPubSubAdapter = new RedisPubSubAdapter<K, V>() { // from class: net.william278.papiproxybridge.libraries.lettuce.core.pubsub.RedisPubSubReactiveCommandsImpl.2
                @Override // net.william278.papiproxybridge.libraries.lettuce.core.pubsub.RedisPubSubAdapter, net.william278.papiproxybridge.libraries.lettuce.core.pubsub.RedisPubSubListener
                public void message(K k, V v) {
                    fluxSink.next(new ChannelMessage(k, v));
                }
            };
            StatefulRedisPubSubConnection<K, V> statefulConnection = getStatefulConnection();
            statefulConnection.addListener(redisPubSubAdapter);
            fluxSink.onDispose(() -> {
                statefulConnection.removeListener(redisPubSubAdapter);
            });
        }, overflowStrategy);
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands
    public Mono<Void> psubscribe(K... kArr) {
        return createMono(() -> {
            return this.commandBuilder.psubscribe(kArr);
        }).then();
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands
    public Mono<Void> punsubscribe(K... kArr) {
        return createFlux(() -> {
            return this.commandBuilder.punsubscribe(kArr);
        }).then();
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands
    public Mono<Void> subscribe(K... kArr) {
        return createFlux(() -> {
            return this.commandBuilder.subscribe(kArr);
        }).then();
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands
    public Mono<Void> unsubscribe(K... kArr) {
        return createFlux(() -> {
            return this.commandBuilder.unsubscribe(kArr);
        }).then();
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.AbstractRedisReactiveCommands, net.william278.papiproxybridge.libraries.lettuce.core.api.reactive.BaseRedisReactiveCommands
    public Mono<Long> publish(K k, V v) {
        return createMono(() -> {
            return this.commandBuilder.publish(k, v);
        });
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.AbstractRedisReactiveCommands, net.william278.papiproxybridge.libraries.lettuce.core.api.reactive.BaseRedisReactiveCommands
    public Flux<K> pubsubChannels(K k) {
        return (Flux<K>) createDissolvingFlux(() -> {
            return this.commandBuilder.pubsubChannels(k);
        });
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.AbstractRedisReactiveCommands, net.william278.papiproxybridge.libraries.lettuce.core.api.reactive.BaseRedisReactiveCommands
    public Mono<Map<K, Long>> pubsubNumsub(K... kArr) {
        return (Mono<Map<K, Long>>) createMono(() -> {
            return this.commandBuilder.pubsubNumsub(kArr);
        });
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.AbstractRedisReactiveCommands, net.william278.papiproxybridge.libraries.lettuce.core.api.reactive.BaseRedisReactiveCommands
    public Flux<K> pubsubShardChannels(K k) {
        return (Flux<K>) createDissolvingFlux(() -> {
            return this.commandBuilder.pubsubShardChannels(k);
        });
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.AbstractRedisReactiveCommands, net.william278.papiproxybridge.libraries.lettuce.core.api.reactive.BaseRedisReactiveCommands
    public Mono<Map<K, Long>> pubsubShardNumsub(K... kArr) {
        return (Mono<Map<K, Long>>) createMono(() -> {
            return this.commandBuilder.pubsubShardNumsub(kArr);
        });
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.AbstractRedisReactiveCommands, net.william278.papiproxybridge.libraries.lettuce.core.api.reactive.BaseRedisReactiveCommands
    public Mono<Long> spublish(K k, V v) {
        return createMono(() -> {
            return this.commandBuilder.spublish(k, v);
        });
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands
    public Mono<Void> ssubscribe(K... kArr) {
        return createFlux(() -> {
            return this.commandBuilder.ssubscribe(kArr);
        }).then();
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands
    public Mono<Void> sunsubscribe(K... kArr) {
        return createFlux(() -> {
            return this.commandBuilder.sunsubscribe(kArr);
        }).then();
    }

    @Override // net.william278.papiproxybridge.libraries.lettuce.core.RedisReactiveCommandsImpl, net.william278.papiproxybridge.libraries.lettuce.core.api.reactive.RedisReactiveCommands
    public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
        return (StatefulRedisPubSubConnection) super.getStatefulConnection();
    }
}
