/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.cluster;

import io.lettuce.core.ConnectionFuture;
import io.lettuce.core.OrderingReadFromAccessor;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisChannelWriter;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.push.PushMessage;
import io.lettuce.core.cluster.AbstractClusterNodeConnectionFactory;
import io.lettuce.core.cluster.AsyncClusterConnectionProvider;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterConnectionProvider;
import io.lettuce.core.cluster.ClusterEventListener;
import io.lettuce.core.cluster.ClusterNodeConnectionFactory;
import io.lettuce.core.cluster.ClusterPushHandler;
import io.lettuce.core.cluster.PartitionSelectorException;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.UnknownPartitionException;
import io.lettuce.core.cluster.api.push.RedisClusterPushListener;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.AsyncConnectionProvider;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.HostAndPort;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.models.role.RedisNodeDescription;
import io.lettuce.core.protocol.ConnectionIntent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

class PooledClusterConnectionProvider<K, V>
implements ClusterConnectionProvider,
AsyncClusterConnectionProvider,
ClusterPushHandler {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledClusterConnectionProvider.class);
    private final Lock stateLock = new ReentrantLock();
    private final boolean debugEnabled = logger.isDebugEnabled();
    private final List<RedisClusterPushListener> pushListeners = new CopyOnWriteArrayList<RedisClusterPushListener>();
    private final CompletableFuture<StatefulRedisConnection<K, V>>[] writers = new CompletableFuture[16384];
    private final CompletableFuture<StatefulRedisConnection<K, V>>[][] readers = new CompletableFuture[16384][];
    private final RedisClusterClient redisClusterClient;
    private final ClusterClientOptions options;
    private final ClusterNodeConnectionFactory<K, V> connectionFactory;
    private final RedisChannelWriter clusterWriter;
    private final ClusterEventListener clusterEventListener;
    private final RedisCodec<K, V> redisCodec;
    private final AsyncConnectionProvider<ClusterNodeConnectionFactory.ConnectionKey, StatefulRedisConnection<K, V>, ConnectionFuture<StatefulRedisConnection<K, V>>> connectionProvider;
    private Partitions partitions;
    private boolean autoFlushCommands = true;
    private ReadFrom readFrom;

    public PooledClusterConnectionProvider(RedisClusterClient redisClusterClient, RedisChannelWriter clusterWriter, RedisCodec<K, V> redisCodec, ClusterEventListener clusterEventListener) {
        this.redisCodec = redisCodec;
        this.redisClusterClient = redisClusterClient;
        this.options = redisClusterClient.getClusterClientOptions();
        this.clusterWriter = clusterWriter;
        this.clusterEventListener = clusterEventListener;
        this.connectionFactory = new NodeConnectionPostProcessor(this.getConnectionFactory(redisClusterClient));
        this.connectionProvider = new AsyncConnectionProvider(this.connectionFactory);
    }

    @Override
    public void addListener(RedisClusterPushListener listener) {
        this.pushListeners.add(listener);
    }

    @Override
    public void removeListener(RedisClusterPushListener listener) {
        this.pushListeners.remove(listener);
    }

    @Override
    public Collection<RedisClusterPushListener> getPushListeners() {
        return this.pushListeners;
    }

    @Override
    public StatefulRedisConnection<K, V> getConnection(ConnectionIntent connectionIntent, int slot) {
        try {
            return this.getConnectionAsync(connectionIntent, slot).get();
        }
        catch (Exception e) {
            throw Exceptions.bubble(e);
        }
    }

    @Override
    public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(ConnectionIntent connectionIntent, int slot) {
        if (this.debugEnabled) {
            logger.debug("getConnection(" + (Object)((Object)connectionIntent) + ", " + slot + ")");
        }
        if (connectionIntent == ConnectionIntent.READ && this.readFrom != null && this.readFrom != ReadFrom.UPSTREAM) {
            return this.getReadConnection(slot);
        }
        return this.getWriteConnection(slot).toCompletableFuture();
    }

    private CompletableFuture<StatefulRedisConnection<K, V>> getWriteConnection(int slot) {
        CompletableFuture<StatefulRedisConnection<K, V>> writer = this.writers[slot];
        if (writer != null) {
            return writer;
        }
        RedisClusterNode master2 = this.partitions.getMasterBySlot(slot);
        if (master2 == null) {
            this.clusterEventListener.onUncoveredSlot(slot);
            return Futures.failed(new PartitionSelectorException("Cannot determine a partition for slot " + slot + ".", this.partitions.clone()));
        }
        RedisURI uri = master2.getUri();
        ClusterNodeConnectionFactory.ConnectionKey key = new ClusterNodeConnectionFactory.ConnectionKey(ConnectionIntent.WRITE, uri.getHost(), uri.getPort());
        ConnectionFuture<StatefulRedisConnection<K, V>> future = this.getConnectionAsync(key);
        return future.thenApply(connection -> {
            this.stateLock.lock();
            try {
                if (this.writers[slot] == null) {
                    this.writers[slot] = CompletableFuture.completedFuture(connection);
                }
            }
            finally {
                this.stateLock.unlock();
            }
            return connection;
        }).toCompletableFuture();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<StatefulRedisConnection<K, V>> getReadConnection(int slot) {
        CompletableFuture[] readerCandidates;
        boolean cached = true;
        this.stateLock.lock();
        try {
            readerCandidates = this.readers[slot];
        }
        finally {
            this.stateLock.unlock();
        }
        if (readerCandidates == null) {
            RedisClusterNode master2 = this.partitions.getMasterBySlot(slot);
            if (master2 == null) {
                this.clusterEventListener.onUncoveredSlot(slot);
                return Futures.failed(new PartitionSelectorException(String.format("Cannot determine a partition to read for slot %d.", slot), this.partitions.clone()));
            }
            final List<RedisNodeDescription> candidates2 = this.getReadCandidates(master2);
            List<RedisNodeDescription> selection = this.readFrom.select(new ReadFrom.Nodes(){

                @Override
                public List<RedisNodeDescription> getNodes() {
                    return candidates2;
                }

                @Override
                public Iterator<RedisNodeDescription> iterator() {
                    return candidates2.iterator();
                }
            });
            if (selection.isEmpty()) {
                this.clusterEventListener.onUncoveredSlot(slot);
                return Futures.failed(new PartitionSelectorException(String.format("Cannot determine a partition to read for slot %d with setting %s.", slot, this.readFrom), this.partitions.clone()));
            }
            readerCandidates = this.getReadFromConnections(selection);
            cached = false;
        }
        CompletableFuture[] selectedReaderCandidates = readerCandidates;
        if (cached) {
            return CompletableFuture.allOf(readerCandidates).thenCompose(v -> {
                CompletableFuture candidate;
                boolean orderSensitive = this.isOrderSensitive(selectedReaderCandidates);
                if (!orderSensitive && (candidate = (CompletableFuture)PooledClusterConnectionProvider.findRandomActiveConnection(selectedReaderCandidates, Function.identity())) != null) {
                    return candidate;
                }
                for (CompletableFuture candidate2 : selectedReaderCandidates) {
                    if (!((StatefulRedisConnection)candidate2.join()).isOpen()) continue;
                    return candidate2;
                }
                return selectedReaderCandidates[0];
            });
        }
        CompletableFuture filteredReaderCandidates = new CompletableFuture();
        ((CompletableFuture)CompletableFuture.allOf(readerCandidates).thenApply(v -> selectedReaderCandidates)).whenComplete((candidates, throwable) -> {
            if (throwable == null) {
                filteredReaderCandidates.complete(this.getConnections((CompletableFuture<StatefulRedisConnection<K, V>>[])candidates));
                return;
            }
            StatefulRedisConnection<K, V>[] connections = this.getConnections(selectedReaderCandidates);
            if (connections.length == 0) {
                filteredReaderCandidates.completeExceptionally((Throwable)throwable);
                return;
            }
            filteredReaderCandidates.complete(connections);
        });
        return filteredReaderCandidates.thenApply(statefulRedisConnections -> {
            StatefulRedisConnection candidate;
            boolean orderSensitive = this.isOrderSensitive((Object[])statefulRedisConnections);
            CompletableFuture[] toCache = new CompletableFuture[((StatefulRedisConnection[])statefulRedisConnections).length];
            for (int i2 = 0; i2 < toCache.length; ++i2) {
                toCache[i2] = CompletableFuture.completedFuture(statefulRedisConnections[i2]);
            }
            this.stateLock.lock();
            try {
                this.readers[slot] = toCache;
            }
            finally {
                this.stateLock.unlock();
            }
            if (!orderSensitive && (candidate = PooledClusterConnectionProvider.findRandomActiveConnection(selectedReaderCandidates, CompletableFuture::join)) != null) {
                return candidate;
            }
            for (StatefulRedisConnection candidate2 : statefulRedisConnections) {
                if (!candidate2.isOpen()) continue;
                return candidate2;
            }
            return statefulRedisConnections[0];
        });
    }

    private boolean isOrderSensitive(Object[] connections) {
        return OrderingReadFromAccessor.isOrderSensitive(this.readFrom) || connections.length == 1;
    }

    private static <T, E extends StatefulConnection<?, ?>> T findRandomActiveConnection(CompletableFuture<E>[] selectedReaderCandidates, Function<CompletableFuture<E>, T> mappingFunction) {
        for (int i2 = 0; i2 < Math.min(2, selectedReaderCandidates.length); ++i2) {
            StatefulConnection candidate;
            int index = ThreadLocalRandom.current().nextInt(selectedReaderCandidates.length);
            CompletableFuture<E> candidateFuture = selectedReaderCandidates[index];
            if (!candidateFuture.isDone() || candidateFuture.isCompletedExceptionally() || !(candidate = (StatefulConnection)candidateFuture.join()).isOpen()) continue;
            return mappingFunction.apply(candidateFuture);
        }
        return null;
    }

    private StatefulRedisConnection<K, V>[] getConnections(CompletableFuture<StatefulRedisConnection<K, V>>[] selectedReaderCandidates) {
        ArrayList<StatefulRedisConnection<K, V>> connections = new ArrayList<StatefulRedisConnection<K, V>>(selectedReaderCandidates.length);
        for (CompletableFuture<StatefulRedisConnection<K, V>> candidate : selectedReaderCandidates) {
            try {
                connections.add(candidate.join());
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        StatefulRedisConnection[] result = new StatefulRedisConnection[connections.size()];
        connections.toArray(result);
        return result;
    }

    private CompletableFuture<StatefulRedisConnection<K, V>>[] getReadFromConnections(List<RedisNodeDescription> selection) {
        CompletableFuture[] readerCandidates = new CompletableFuture[selection.size()];
        for (int i2 = 0; i2 < selection.size(); ++i2) {
            RedisNodeDescription redisClusterNode = selection.get(i2);
            RedisURI uri = redisClusterNode.getUri();
            ClusterNodeConnectionFactory.ConnectionKey key = new ClusterNodeConnectionFactory.ConnectionKey(redisClusterNode.getRole().isUpstream() ? ConnectionIntent.WRITE : ConnectionIntent.READ, uri.getHost(), uri.getPort());
            readerCandidates[i2] = this.getConnectionAsync(key).toCompletableFuture();
        }
        return readerCandidates;
    }

    private List<RedisNodeDescription> getReadCandidates(RedisClusterNode upstream) {
        return this.partitions.stream().filter(partition -> PooledClusterConnectionProvider.isReadCandidate(upstream, partition)).collect(Collectors.toList());
    }

    private static boolean isReadCandidate(RedisClusterNode upstream, RedisClusterNode partition) {
        if (upstream.getNodeId().equals(partition.getNodeId())) {
            return true;
        }
        return upstream.getNodeId().equals(partition.getSlaveOf()) && partition.getReplOffset() != 0L;
    }

    @Override
    public StatefulRedisConnection<K, V> getConnection(ConnectionIntent connectionIntent, String nodeId) {
        if (this.debugEnabled) {
            logger.debug("getConnection(" + (Object)((Object)connectionIntent) + ", " + nodeId + ")");
        }
        return this.getConnection(new ClusterNodeConnectionFactory.ConnectionKey(connectionIntent, nodeId));
    }

    @Override
    public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(ConnectionIntent connectionIntent, String nodeId) {
        if (this.debugEnabled) {
            logger.debug("getConnection(" + (Object)((Object)connectionIntent) + ", " + nodeId + ")");
        }
        return this.getConnectionAsync(new ClusterNodeConnectionFactory.ConnectionKey(connectionIntent, nodeId)).toCompletableFuture();
    }

    protected ConnectionFuture<StatefulRedisConnection<K, V>> getConnectionAsync(ClusterNodeConnectionFactory.ConnectionKey key) {
        ConnectionFuture connectionFuture = this.connectionProvider.getConnection(key);
        CompletableFuture result = new CompletableFuture();
        connectionFuture.handle((connection, throwable) -> {
            if (throwable != null) {
                result.completeExceptionally(RedisConnectionException.create(connectionFuture.getRemoteAddress(), (Throwable)Exceptions.bubble(throwable)));
            } else {
                result.complete(connection);
            }
            return null;
        });
        return ConnectionFuture.from(connectionFuture.getRemoteAddress(), result);
    }

    @Override
    public StatefulRedisConnection<K, V> getConnection(ConnectionIntent connectionIntent, String host, int port) {
        try {
            this.beforeGetConnection(connectionIntent, host, port);
            return this.getConnection(new ClusterNodeConnectionFactory.ConnectionKey(connectionIntent, host, port));
        }
        catch (RedisException e) {
            throw e;
        }
        catch (RuntimeException e) {
            throw new RedisException(e);
        }
    }

    private StatefulRedisConnection<K, V> getConnection(ClusterNodeConnectionFactory.ConnectionKey key) {
        ConnectionFuture<StatefulRedisConnection<K, V>> future = this.getConnectionAsync(key);
        try {
            return future.join();
        }
        catch (CompletionException e) {
            throw RedisConnectionException.create(future.getRemoteAddress(), e.getCause());
        }
    }

    @Override
    public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(ConnectionIntent connectionIntent, String host, int port) {
        try {
            this.beforeGetConnection(connectionIntent, host, port);
            return this.connectionProvider.getConnection(new ClusterNodeConnectionFactory.ConnectionKey(connectionIntent, host, port)).toCompletableFuture();
        }
        catch (RedisException e) {
            throw e;
        }
        catch (RuntimeException e) {
            throw new RedisException(e);
        }
    }

    private void beforeGetConnection(ConnectionIntent connectionIntent, String host, int port) {
        RedisClusterNode redisClusterNode;
        if (this.debugEnabled) {
            logger.debug("getConnection(" + (Object)((Object)connectionIntent) + ", " + host + ", " + port + ")");
        }
        if ((redisClusterNode = this.partitions.getPartition(host, port)) == null) {
            this.clusterEventListener.onUnknownNode();
            if (this.validateClusterNodeMembership()) {
                HostAndPort hostAndPort = HostAndPort.of(host, port);
                throw PooledClusterConnectionProvider.connectionAttemptRejected(hostAndPort.toString());
            }
        }
    }

    @Override
    public void close() {
        this.closeAsync().join();
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        this.resetFastConnectionCache();
        return this.connectionProvider.close();
    }

    @Override
    public void reset() {
        this.connectionProvider.forEach(StatefulConnection::reset);
    }

    @Override
    public void setPartitions(Partitions partitions) {
        boolean reconfigurePartitions = false;
        this.stateLock.lock();
        try {
            if (this.partitions != null) {
                reconfigurePartitions = true;
            }
            this.partitions = partitions;
            this.connectionFactory.setPartitions(partitions);
        }
        finally {
            this.stateLock.unlock();
        }
        if (reconfigurePartitions) {
            this.reconfigurePartitions();
        }
    }

    protected Partitions getPartitions() {
        return this.partitions;
    }

    private void reconfigurePartitions() {
        this.resetFastConnectionCache();
        if (this.expireStaleConnections()) {
            this.closeStaleConnections();
        }
    }

    private boolean expireStaleConnections() {
        return this.options == null || this.options.isCloseStaleConnections();
    }

    @Override
    public void closeStaleConnections() {
        logger.debug("closeStaleConnections() count before expiring: {}", (Object)this.getConnectionCount());
        this.connectionProvider.forEach((key, connection) -> {
            if (this.isStale((ClusterNodeConnectionFactory.ConnectionKey)key)) {
                this.connectionProvider.close((ClusterNodeConnectionFactory.ConnectionKey)key);
            }
        });
        logger.debug("closeStaleConnections() count after expiring: {}", (Object)this.getConnectionCount());
    }

    private boolean isStale(ClusterNodeConnectionFactory.ConnectionKey connectionKey) {
        if (connectionKey.nodeId != null && this.partitions.getPartitionByNodeId(connectionKey.nodeId) != null) {
            return false;
        }
        return connectionKey.host == null || this.partitions.getPartition(connectionKey.host, connectionKey.port) == null;
    }

    @Override
    public void setAutoFlushCommands(boolean autoFlush) {
        this.stateLock.lock();
        try {
            this.autoFlushCommands = autoFlush;
        }
        finally {
            this.stateLock.unlock();
        }
        this.connectionProvider.forEach(connection -> connection.setAutoFlushCommands(autoFlush));
    }

    @Override
    public void flushCommands() {
        this.connectionProvider.forEach(StatefulConnection::flushCommands);
    }

    @Override
    public void setReadFrom(ReadFrom readFrom) {
        this.stateLock.lock();
        try {
            this.readFrom = readFrom;
            Arrays.fill(this.readers, null);
        }
        finally {
            this.stateLock.unlock();
        }
    }

    @Override
    public ReadFrom getReadFrom() {
        return this.readFrom;
    }

    long getConnectionCount() {
        return this.connectionProvider.getConnectionCount();
    }

    private void resetFastConnectionCache() {
        this.stateLock.lock();
        try {
            Arrays.fill(this.writers, null);
            Arrays.fill(this.readers, null);
        }
        finally {
            this.stateLock.unlock();
        }
    }

    private static RuntimeException connectionAttemptRejected(String message) {
        return new UnknownPartitionException("Connection to " + message + " not allowed. This partition is not known in the cluster view.");
    }

    private boolean validateClusterNodeMembership() {
        return this.redisClusterClient.getClusterClientOptions() == null || this.redisClusterClient.getClusterClientOptions().isValidateClusterNodeMembership();
    }

    protected ClusterNodeConnectionFactory<K, V> getConnectionFactory(RedisClusterClient redisClusterClient) {
        return new DefaultClusterNodeConnectionFactory<K, V>(redisClusterClient, this.redisCodec, this.clusterWriter);
    }

    protected void onPushMessage(RedisClusterNode node, PushMessage message) {
        this.pushListeners.forEach(listener -> listener.onPushMessage(node, message));
    }

    static class DefaultClusterNodeConnectionFactory<K, V>
    extends AbstractClusterNodeConnectionFactory<K, V> {
        private final RedisClusterClient redisClusterClient;
        private final RedisCodec<K, V> redisCodec;
        private final RedisChannelWriter clusterWriter;

        DefaultClusterNodeConnectionFactory(RedisClusterClient redisClusterClient, RedisCodec<K, V> redisCodec, RedisChannelWriter clusterWriter) {
            super(redisClusterClient.getResources());
            this.redisClusterClient = redisClusterClient;
            this.redisCodec = redisCodec;
            this.clusterWriter = clusterWriter;
        }

        @Override
        public ConnectionFuture<StatefulRedisConnection<K, V>> apply(ClusterNodeConnectionFactory.ConnectionKey key) {
            if (key.nodeId != null) {
                return this.redisClusterClient.connectToNodeAsync(this.redisCodec, key.nodeId, null, this.getSocketAddressSupplier(key));
            }
            return this.redisClusterClient.connectToNodeAsync(this.redisCodec, key.host + ":" + key.port, this.clusterWriter, this.getSocketAddressSupplier(key));
        }
    }

    class NodeConnectionPostProcessor
    implements ClusterNodeConnectionFactory<K, V> {
        private final ClusterNodeConnectionFactory<K, V> delegate;

        NodeConnectionPostProcessor(ClusterNodeConnectionFactory<K, V> delegate) {
            this.delegate = delegate;
        }

        @Override
        public void setPartitions(Partitions partitions) {
            this.delegate.setPartitions(partitions);
        }

        @Override
        public ConnectionFuture<StatefulRedisConnection<K, V>> apply(ClusterNodeConnectionFactory.ConnectionKey key) {
            RedisClusterNode targetNode = null;
            if (key.nodeId != null && (targetNode = PooledClusterConnectionProvider.this.getPartitions().getPartitionByNodeId(key.nodeId)) == null) {
                PooledClusterConnectionProvider.this.clusterEventListener.onUnknownNode();
                throw PooledClusterConnectionProvider.connectionAttemptRejected("node id " + key.nodeId);
            }
            if (key.host != null && (targetNode = PooledClusterConnectionProvider.this.partitions.getPartition(key.host, key.port)) == null) {
                PooledClusterConnectionProvider.this.clusterEventListener.onUnknownNode();
                if (PooledClusterConnectionProvider.this.validateClusterNodeMembership()) {
                    throw PooledClusterConnectionProvider.connectionAttemptRejected(key.host + ":" + key.port);
                }
            }
            CompletionStage connection = (ConnectionFuture)this.delegate.apply(key);
            LettuceAssert.notNull((Object)connection, "Connection is null. Check ConnectionKey because host and nodeId are null.");
            if (key.connectionIntent == ConnectionIntent.READ) {
                connection = connection.thenCompose(c -> {
                    RedisFuture<String> stringRedisFuture = c.async().readOnly();
                    return stringRedisFuture.thenApply(s2 -> c).whenCompleteAsync((s2, throwable) -> {
                        if (throwable != null) {
                            c.close();
                        }
                    });
                });
            }
            RedisClusterNode actualNode = targetNode;
            connection = connection.thenApply(c -> {
                PooledClusterConnectionProvider.this.stateLock.lock();
                try {
                    c.setAutoFlushCommands(PooledClusterConnectionProvider.this.autoFlushCommands);
                    c.addListener(message -> PooledClusterConnectionProvider.this.onPushMessage(actualNode, message));
                }
                finally {
                    PooledClusterConnectionProvider.this.stateLock.unlock();
                }
                return c;
            });
            return connection;
        }
    }
}

