/*
 * Decompiled with CFR 0.152.
 */
package io.github.insideranh.stellarprotect.libs.mongodb.internal.connection;

import io.github.insideranh.stellarprotect.libs.bson.BsonBinaryReader;
import io.github.insideranh.stellarprotect.libs.bson.BsonDocument;
import io.github.insideranh.stellarprotect.libs.bson.ByteBuf;
import io.github.insideranh.stellarprotect.libs.bson.codecs.BsonDocumentCodec;
import io.github.insideranh.stellarprotect.libs.bson.codecs.Decoder;
import io.github.insideranh.stellarprotect.libs.bson.io.ByteBufferBsonInput;
import io.github.insideranh.stellarprotect.libs.mongodb.LoggerSettings;
import io.github.insideranh.stellarprotect.libs.mongodb.MongoClientException;
import io.github.insideranh.stellarprotect.libs.mongodb.MongoCommandException;
import io.github.insideranh.stellarprotect.libs.mongodb.MongoCompressor;
import io.github.insideranh.stellarprotect.libs.mongodb.MongoException;
import io.github.insideranh.stellarprotect.libs.mongodb.MongoInternalException;
import io.github.insideranh.stellarprotect.libs.mongodb.MongoInterruptedException;
import io.github.insideranh.stellarprotect.libs.mongodb.MongoOperationTimeoutException;
import io.github.insideranh.stellarprotect.libs.mongodb.MongoSocketClosedException;
import io.github.insideranh.stellarprotect.libs.mongodb.MongoSocketReadException;
import io.github.insideranh.stellarprotect.libs.mongodb.MongoSocketReadTimeoutException;
import io.github.insideranh.stellarprotect.libs.mongodb.MongoSocketWriteException;
import io.github.insideranh.stellarprotect.libs.mongodb.MongoSocketWriteTimeoutException;
import io.github.insideranh.stellarprotect.libs.mongodb.ServerAddress;
import io.github.insideranh.stellarprotect.libs.mongodb.annotations.NotThreadSafe;
import io.github.insideranh.stellarprotect.libs.mongodb.assertions.Assertions;
import io.github.insideranh.stellarprotect.libs.mongodb.connection.AsyncCompletionHandler;
import io.github.insideranh.stellarprotect.libs.mongodb.connection.ClusterConnectionMode;
import io.github.insideranh.stellarprotect.libs.mongodb.connection.ClusterId;
import io.github.insideranh.stellarprotect.libs.mongodb.connection.ConnectionDescription;
import io.github.insideranh.stellarprotect.libs.mongodb.connection.ConnectionId;
import io.github.insideranh.stellarprotect.libs.mongodb.connection.ServerConnectionState;
import io.github.insideranh.stellarprotect.libs.mongodb.connection.ServerDescription;
import io.github.insideranh.stellarprotect.libs.mongodb.connection.ServerId;
import io.github.insideranh.stellarprotect.libs.mongodb.connection.ServerType;
import io.github.insideranh.stellarprotect.libs.mongodb.event.CommandListener;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.ResourceUtil;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.TimeoutContext;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.async.AsyncRunnable;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.async.AsyncSupplier;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.async.ErrorHandlingResultCallback;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.async.SingleResultCallback;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.Authenticator;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.ByteBufferBsonOutput;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.CommandEventSender;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.CommandHelper;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.CommandMessage;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.CompressedHeader;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.CompressedMessage;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.Compressor;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.ConnectionGenerationSupplier;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.InternalConnection;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.InternalConnectionInitializationDescription;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.InternalConnectionInitializer;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.LoggingCommandEventSender;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.MessageHeader;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.MongoWriteConcernWithResponseException;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.NoOpCommandEventSender;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.OpCode;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.OperationContext;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.ProtocolHelper;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.ReplyHeader;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.ReplyMessage;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.ResponseBuffers;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.SnappyCompressor;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.Stream;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.StreamFactory;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.ZlibCompressor;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.connection.ZstdCompressor;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.diagnostics.logging.Logger;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.diagnostics.logging.Loggers;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.logging.LogMessage;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.logging.StructuredLogger;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.session.SessionContext;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.thread.InterruptionUtil;
import io.github.insideranh.stellarprotect.libs.mongodb.internal.time.Timeout;
import io.github.insideranh.stellarprotect.libs.mongodb.lang.Nullable;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

@NotThreadSafe
public class InternalStreamConnection
implements InternalConnection {
    private static volatile boolean recordEverything = false;
    private static final Set<String> SECURITY_SENSITIVE_COMMANDS = new HashSet<String>(Arrays.asList("authenticate", "saslStart", "saslContinue", "getnonce", "createUser", "updateUser", "copydbgetnonce", "copydbsaslstart", "copydb"));
    private static final Set<String> SECURITY_SENSITIVE_HELLO_COMMANDS = new HashSet<String>(Arrays.asList("hello", "isMaster", CommandHelper.LEGACY_HELLO_LOWER));
    private static final Logger LOGGER = Loggers.getLogger("connection");
    private final ClusterConnectionMode clusterConnectionMode;
    @Nullable
    private final Authenticator authenticator;
    private final boolean isMonitoringConnection;
    private final ServerId serverId;
    private final ConnectionGenerationSupplier connectionGenerationSupplier;
    private final StreamFactory streamFactory;
    private final InternalConnectionInitializer connectionInitializer;
    private volatile ConnectionDescription description;
    private volatile ServerDescription initialServerDescription;
    private volatile Stream stream;
    private final AtomicBoolean isClosed = new AtomicBoolean();
    private final AtomicBoolean opened = new AtomicBoolean();
    private final AtomicBoolean authenticated = new AtomicBoolean();
    private final List<MongoCompressor> compressorList;
    private final LoggerSettings loggerSettings;
    private final CommandListener commandListener;
    @Nullable
    private volatile Compressor sendCompressor;
    private final Map<Byte, Compressor> compressorMap;
    private volatile boolean hasMoreToCome;
    private volatile int responseTo;
    private int generation = -1;
    private static final StructuredLogger COMMAND_PROTOCOL_LOGGER = new StructuredLogger("protocol.command");

    public static void setRecordEverything(boolean recordEverything) {
        InternalStreamConnection.recordEverything = recordEverything;
    }

    static Set<String> getSecuritySensitiveCommands() {
        return Collections.unmodifiableSet(SECURITY_SENSITIVE_COMMANDS);
    }

    static Set<String> getSecuritySensitiveHelloCommands() {
        return Collections.unmodifiableSet(SECURITY_SENSITIVE_HELLO_COMMANDS);
    }

    public InternalStreamConnection(ClusterConnectionMode clusterConnectionMode, ServerId serverId, ConnectionGenerationSupplier connectionGenerationSupplier, StreamFactory streamFactory, List<MongoCompressor> compressorList, CommandListener commandListener, InternalConnectionInitializer connectionInitializer) {
        this(clusterConnectionMode, null, false, serverId, connectionGenerationSupplier, streamFactory, compressorList, LoggerSettings.builder().build(), commandListener, connectionInitializer);
    }

    public InternalStreamConnection(ClusterConnectionMode clusterConnectionMode, @Nullable Authenticator authenticator, boolean isMonitoringConnection, ServerId serverId, ConnectionGenerationSupplier connectionGenerationSupplier, StreamFactory streamFactory, List<MongoCompressor> compressorList, LoggerSettings loggerSettings, CommandListener commandListener, InternalConnectionInitializer connectionInitializer) {
        this.clusterConnectionMode = clusterConnectionMode;
        this.authenticator = authenticator;
        this.isMonitoringConnection = isMonitoringConnection;
        this.serverId = Assertions.notNull("serverId", serverId);
        this.connectionGenerationSupplier = Assertions.notNull("connectionGeneration", connectionGenerationSupplier);
        this.streamFactory = Assertions.notNull("streamFactory", streamFactory);
        this.compressorList = Assertions.notNull("compressorList", compressorList);
        this.compressorMap = this.createCompressorMap(compressorList);
        this.loggerSettings = loggerSettings;
        this.commandListener = commandListener;
        this.connectionInitializer = Assertions.notNull("connectionInitializer", connectionInitializer);
        this.description = new ConnectionDescription(serverId);
        this.initialServerDescription = ServerDescription.builder().address(serverId.getAddress()).type(ServerType.UNKNOWN).state(ServerConnectionState.CONNECTING).build();
        if (clusterConnectionMode != ClusterConnectionMode.LOAD_BALANCED) {
            this.generation = connectionGenerationSupplier.getGeneration();
        }
    }

    @Override
    public ConnectionDescription getDescription() {
        return this.description;
    }

    @Override
    public ServerDescription getInitialServerDescription() {
        return this.initialServerDescription;
    }

    @Override
    public int getGeneration() {
        return this.generation;
    }

    @Override
    public void open(OperationContext originalOperationContext) {
        Assertions.isTrue("Open already called", this.stream == null);
        this.stream = this.streamFactory.create(this.serverId.getAddress());
        try {
            OperationContext operationContext = originalOperationContext.withTimeoutContext(originalOperationContext.getTimeoutContext().withComputedServerSelectionTimeoutContext());
            this.stream.open(operationContext);
            InternalConnectionInitializationDescription initializationDescription = this.connectionInitializer.startHandshake(this, operationContext);
            this.initAfterHandshakeStart(initializationDescription);
            initializationDescription = this.connectionInitializer.finishHandshake(this, initializationDescription, operationContext);
            this.initAfterHandshakeFinish(initializationDescription);
        }
        catch (Throwable t) {
            this.close();
            if (t instanceof MongoException) {
                throw (MongoException)t;
            }
            throw new MongoException(t.toString(), t);
        }
    }

    @Override
    public void openAsync(OperationContext originalOperationContext, final SingleResultCallback<Void> callback) {
        Assertions.assertNull(this.stream);
        try {
            final OperationContext operationContext = originalOperationContext.withTimeoutContext(originalOperationContext.getTimeoutContext().withComputedServerSelectionTimeoutContext());
            this.stream = this.streamFactory.create(this.serverId.getAddress());
            this.stream.openAsync(operationContext, new AsyncCompletionHandler<Void>(){

                @Override
                public void completed(@Nullable Void aVoid) {
                    InternalStreamConnection.this.connectionInitializer.startHandshakeAsync(InternalStreamConnection.this, operationContext, (initialResult, initialException) -> {
                        if (initialException != null) {
                            InternalStreamConnection.this.close();
                            callback.onResult(null, initialException);
                        } else {
                            Assertions.assertNotNull(initialResult);
                            InternalStreamConnection.this.initAfterHandshakeStart(initialResult);
                            InternalStreamConnection.this.connectionInitializer.finishHandshakeAsync(InternalStreamConnection.this, (InternalConnectionInitializationDescription)initialResult, operationContext, (completedResult, completedException) -> {
                                if (completedException != null) {
                                    InternalStreamConnection.this.close();
                                    callback.onResult(null, completedException);
                                } else {
                                    Assertions.assertNotNull(completedResult);
                                    InternalStreamConnection.this.initAfterHandshakeFinish(completedResult);
                                    callback.onResult(null, null);
                                }
                            });
                        }
                    });
                }

                @Override
                public void failed(Throwable t) {
                    InternalStreamConnection.this.close();
                    callback.onResult(null, t);
                }
            });
        }
        catch (Throwable t) {
            this.close();
            callback.onResult(null, t);
        }
    }

    private void initAfterHandshakeStart(InternalConnectionInitializationDescription initializationDescription) {
        this.description = initializationDescription.getConnectionDescription();
        this.initialServerDescription = initializationDescription.getServerDescription();
        if (this.clusterConnectionMode == ClusterConnectionMode.LOAD_BALANCED) {
            this.generation = this.connectionGenerationSupplier.getGeneration(Assertions.assertNotNull(this.description.getServiceId()));
        }
    }

    private void initAfterHandshakeFinish(InternalConnectionInitializationDescription initializationDescription) {
        this.description = initializationDescription.getConnectionDescription();
        this.initialServerDescription = initializationDescription.getServerDescription();
        this.opened.set(true);
        this.authenticated.set(true);
        this.sendCompressor = this.findSendCompressor(this.description);
    }

    private Map<Byte, Compressor> createCompressorMap(List<MongoCompressor> compressorList) {
        HashMap<Byte, Compressor> compressorMap = new HashMap<Byte, Compressor>(this.compressorList.size());
        for (MongoCompressor mongoCompressor : compressorList) {
            Compressor compressor = this.createCompressor(mongoCompressor);
            compressorMap.put(compressor.getId(), compressor);
        }
        return compressorMap;
    }

    @Nullable
    private Compressor findSendCompressor(ConnectionDescription description) {
        if (description.getCompressors().isEmpty()) {
            return null;
        }
        String firstCompressorName = description.getCompressors().get(0);
        for (Compressor compressor : this.compressorMap.values()) {
            if (!compressor.getName().equals(firstCompressorName)) continue;
            return compressor;
        }
        throw new MongoInternalException("Unexpected compressor negotiated: " + firstCompressorName);
    }

    private Compressor createCompressor(MongoCompressor mongoCompressor) {
        switch (mongoCompressor.getName()) {
            case "zlib": {
                return new ZlibCompressor(mongoCompressor);
            }
            case "snappy": {
                return new SnappyCompressor();
            }
            case "zstd": {
                return new ZstdCompressor();
            }
        }
        throw new MongoClientException("Unsupported compressor " + mongoCompressor.getName());
    }

    @Override
    public void close() {
        if (!this.isClosed.getAndSet(true) && this.stream != null) {
            this.stream.close();
        }
    }

    @Override
    public boolean opened() {
        return this.opened.get();
    }

    @Override
    public boolean isClosed() {
        return this.isClosed.get();
    }

    @Override
    @Nullable
    public <T> T sendAndReceive(CommandMessage message, Decoder<T> decoder, OperationContext operationContext) {
        Supplier<Object> sendAndReceiveInternal = () -> this.sendAndReceiveInternal(message, decoder, operationContext);
        try {
            return (T)sendAndReceiveInternal.get();
        }
        catch (MongoCommandException e) {
            if (this.reauthenticationIsTriggered(e)) {
                return (T)this.reauthenticateAndRetry(sendAndReceiveInternal, operationContext);
            }
            throw e;
        }
    }

    @Override
    public <T> void sendAndReceiveAsync(CommandMessage message, Decoder<T> decoder, OperationContext operationContext, SingleResultCallback<T> callback) {
        AsyncSupplier sendAndReceiveAsyncInternal = c -> this.sendAndReceiveAsyncInternal(message, decoder, operationContext, c);
        AsyncRunnable.beginAsync().thenSupply(c -> sendAndReceiveAsyncInternal.getAsync(c)).onErrorIf(e -> this.reauthenticationIsTriggered((Throwable)e), (t, c) -> this.reauthenticateAndRetryAsync(sendAndReceiveAsyncInternal, operationContext, c)).finish(callback);
    }

    private <T> T reauthenticateAndRetry(Supplier<T> operation, OperationContext operationContext) {
        this.authenticated.set(false);
        Assertions.assertNotNull(this.authenticator).reauthenticate(this, operationContext);
        this.authenticated.set(true);
        return operation.get();
    }

    private <T> void reauthenticateAndRetryAsync(AsyncSupplier<T> operation, OperationContext operationContext, SingleResultCallback<T> callback) {
        AsyncRunnable.beginAsync().thenRun(c -> {
            this.authenticated.set(false);
            Assertions.assertNotNull(this.authenticator).reauthenticateAsync(this, operationContext, c);
        }).thenSupply(c -> {
            this.authenticated.set(true);
            operation.getAsync(c);
        }).finish(callback);
    }

    public boolean reauthenticationIsTriggered(@Nullable Throwable t) {
        if (!Authenticator.shouldAuthenticate(this.authenticator, this.description)) {
            return false;
        }
        if (t instanceof MongoCommandException) {
            MongoCommandException e = (MongoCommandException)t;
            return e.getErrorCode() == 391;
        }
        return false;
    }

    @Nullable
    private <T> T sendAndReceiveInternal(CommandMessage message, Decoder<T> decoder, OperationContext operationContext) {
        CommandEventSender commandEventSender;
        try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this);){
            message.encode(bsonOutput, operationContext);
            commandEventSender = this.createCommandEventSender(message, bsonOutput, operationContext);
            commandEventSender.sendStartedEvent();
            try {
                this.sendCommandMessage(message, bsonOutput, operationContext);
            }
            catch (Exception e) {
                commandEventSender.sendFailedEvent(e);
                throw e;
            }
        }
        if (message.isResponseExpected()) {
            return this.receiveCommandMessageResponse(decoder, commandEventSender, operationContext);
        }
        commandEventSender.sendSucceededEventForOneWayCommand();
        return null;
    }

    @Override
    public <T> void send(CommandMessage message, Decoder<T> decoder, OperationContext operationContext) {
        try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this);){
            message.encode(bsonOutput, operationContext);
            this.sendCommandMessage(message, bsonOutput, operationContext);
            if (message.isResponseExpected()) {
                this.hasMoreToCome = true;
            }
        }
    }

    @Override
    public <T> T receive(Decoder<T> decoder, OperationContext operationContext) {
        Assertions.isTrue("Response is expected", this.hasMoreToCome);
        return this.receiveCommandMessageResponse(decoder, new NoOpCommandEventSender(), operationContext);
    }

    @Override
    public boolean hasMoreToCome() {
        return this.hasMoreToCome;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendCommandMessage(CommandMessage message, ByteBufferBsonOutput bsonOutput, OperationContext operationContext) {
        Compressor localSendCompressor = this.sendCompressor;
        if (localSendCompressor == null || SECURITY_SENSITIVE_COMMANDS.contains(message.getCommandDocument(bsonOutput).getFirstKey())) {
            this.trySendMessage(message, bsonOutput, operationContext);
        } else {
            ByteBufferBsonOutput compressedBsonOutput;
            List<ByteBuf> byteBuffers = bsonOutput.getByteBuffers();
            try {
                CompressedMessage compressedMessage = new CompressedMessage(message.getOpCode(), byteBuffers, localSendCompressor, ProtocolHelper.getMessageSettings(this.description, this.initialServerDescription));
                compressedBsonOutput = new ByteBufferBsonOutput(this);
                compressedMessage.encode(compressedBsonOutput, operationContext);
            }
            finally {
                ResourceUtil.release(byteBuffers);
                bsonOutput.close();
            }
            this.trySendMessage(message, compressedBsonOutput, operationContext);
        }
        this.responseTo = message.getId();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void trySendMessage(CommandMessage message, ByteBufferBsonOutput bsonOutput, OperationContext operationContext) {
        Timeout.onExistsAndExpired(operationContext.getTimeoutContext().timeoutIncludingRoundTrip(), () -> {
            throw TimeoutContext.createMongoRoundTripTimeoutException();
        });
        List<ByteBuf> byteBuffers = bsonOutput.getByteBuffers();
        try {
            this.sendMessage(byteBuffers, message.getId(), operationContext);
        }
        finally {
            ResourceUtil.release(byteBuffers);
            bsonOutput.close();
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private <T> T receiveCommandMessageResponse(Decoder<T> decoder, CommandEventSender commandEventSender, OperationContext operationContext) {
        boolean commandSuccessful = false;
        try (ResponseBuffers responseBuffers = this.receiveResponseBuffers(operationContext);){
            this.updateSessionContext(operationContext.getSessionContext(), responseBuffers);
            if (!ProtocolHelper.isCommandOk(responseBuffers)) {
                throw ProtocolHelper.getCommandFailureException(responseBuffers.getResponseDocument(this.responseTo, new BsonDocumentCodec()), this.description.getServerAddress(), operationContext.getTimeoutContext());
            }
            commandSuccessful = true;
            commandEventSender.sendSucceededEvent(responseBuffers);
            T commandResult = this.getCommandResult(decoder, responseBuffers, this.responseTo, operationContext.getTimeoutContext());
            this.hasMoreToCome = responseBuffers.getReplyHeader().hasMoreToCome();
            this.responseTo = this.hasMoreToCome ? responseBuffers.getReplyHeader().getRequestId() : 0;
            T t = commandResult;
            return t;
        }
        catch (Exception e) {
            if (commandSuccessful) throw e;
            commandEventSender.sendFailedEvent(e);
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> void sendAndReceiveAsyncInternal(CommandMessage message, Decoder<T> decoder, OperationContext operationContext, SingleResultCallback<T> callback) {
        block7: {
            if (this.isClosed()) {
                callback.onResult(null, new MongoSocketClosedException("Can not read from a closed socket", this.getServerAddress()));
                return;
            }
            ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this);
            ByteBufferBsonOutput compressedBsonOutput = new ByteBufferBsonOutput(this);
            try {
                message.encode(bsonOutput, operationContext);
                CommandEventSender commandEventSender = this.createCommandEventSender(message, bsonOutput, operationContext);
                commandEventSender.sendStartedEvent();
                Compressor localSendCompressor = this.sendCompressor;
                if (localSendCompressor == null || SECURITY_SENSITIVE_COMMANDS.contains(message.getCommandDocument(bsonOutput).getFirstKey())) {
                    this.sendCommandMessageAsync(message.getId(), decoder, operationContext, callback, bsonOutput, commandEventSender, message.isResponseExpected());
                    break block7;
                }
                List<ByteBuf> byteBuffers = bsonOutput.getByteBuffers();
                try {
                    CompressedMessage compressedMessage = new CompressedMessage(message.getOpCode(), byteBuffers, localSendCompressor, ProtocolHelper.getMessageSettings(this.description, this.initialServerDescription));
                    compressedMessage.encode(compressedBsonOutput, operationContext);
                }
                finally {
                    ResourceUtil.release(byteBuffers);
                    bsonOutput.close();
                }
                this.sendCommandMessageAsync(message.getId(), decoder, operationContext, callback, compressedBsonOutput, commandEventSender, message.isResponseExpected());
            }
            catch (Throwable t) {
                bsonOutput.close();
                compressedBsonOutput.close();
                callback.onResult(null, t);
            }
        }
    }

    private <T> void sendCommandMessageAsync(int messageId, Decoder<T> decoder, OperationContext operationContext, SingleResultCallback<T> callback, ByteBufferBsonOutput bsonOutput, CommandEventSender commandEventSender, boolean responseExpected) {
        boolean[] shouldReturn = new boolean[]{false};
        Timeout.onExistsAndExpired(operationContext.getTimeoutContext().timeoutIncludingRoundTrip(), () -> {
            bsonOutput.close();
            MongoOperationTimeoutException operationTimeoutException = TimeoutContext.createMongoRoundTripTimeoutException();
            commandEventSender.sendFailedEvent(operationTimeoutException);
            callback.onResult(null, operationTimeoutException);
            shouldReturn[0] = true;
        });
        if (shouldReturn[0]) {
            return;
        }
        List<ByteBuf> byteBuffers = bsonOutput.getByteBuffers();
        this.sendMessageAsync(byteBuffers, messageId, operationContext, (result, t) -> {
            ResourceUtil.release(byteBuffers);
            bsonOutput.close();
            if (t != null) {
                commandEventSender.sendFailedEvent(t);
                callback.onResult(null, t);
            } else if (!responseExpected) {
                commandEventSender.sendSucceededEventForOneWayCommand();
                callback.onResult(null, null);
            } else {
                this.readAsync(16, operationContext, new MessageHeaderCallback(operationContext, (responseBuffers, t1) -> {
                    Object commandResult;
                    if (t1 != null) {
                        commandEventSender.sendFailedEvent(t1);
                        callback.onResult(null, t1);
                        return;
                    }
                    Assertions.assertNotNull(responseBuffers);
                    try {
                        this.updateSessionContext(operationContext.getSessionContext(), (ResponseBuffers)responseBuffers);
                        boolean commandOk = ProtocolHelper.isCommandOk(new BsonBinaryReader(new ByteBufferBsonInput(responseBuffers.getBodyByteBuffer())));
                        responseBuffers.reset();
                        if (!commandOk) {
                            MongoException commandFailureException = ProtocolHelper.getCommandFailureException(responseBuffers.getResponseDocument(messageId, new BsonDocumentCodec()), this.description.getServerAddress(), operationContext.getTimeoutContext());
                            commandEventSender.sendFailedEvent(commandFailureException);
                            throw commandFailureException;
                        }
                        commandEventSender.sendSucceededEvent((ResponseBuffers)responseBuffers);
                        commandResult = this.getCommandResult(decoder, (ResponseBuffers)responseBuffers, messageId, operationContext.getTimeoutContext());
                    }
                    catch (Throwable localThrowable) {
                        callback.onResult(null, localThrowable);
                        return;
                    }
                    finally {
                        responseBuffers.close();
                    }
                    callback.onResult(commandResult, null);
                }));
            }
        });
    }

    private <T> T getCommandResult(Decoder<T> decoder, ResponseBuffers responseBuffers, int messageId, TimeoutContext timeoutContext) {
        T result = new ReplyMessage<T>(responseBuffers, decoder, messageId).getDocument();
        MongoException writeConcernBasedError = ProtocolHelper.createSpecialWriteConcernException(responseBuffers, this.description.getServerAddress(), timeoutContext);
        if (writeConcernBasedError instanceof MongoOperationTimeoutException) {
            throw writeConcernBasedError;
        }
        if (writeConcernBasedError != null) {
            throw new MongoWriteConcernWithResponseException(writeConcernBasedError, result);
        }
        return result;
    }

    @Override
    public void sendMessage(List<ByteBuf> byteBuffers, int lastRequestId, OperationContext operationContext) {
        Assertions.notNull("stream is open", this.stream);
        if (this.isClosed()) {
            throw new MongoSocketClosedException("Cannot write to a closed stream", this.getServerAddress());
        }
        try {
            this.stream.write(byteBuffers, operationContext);
        }
        catch (Exception e) {
            this.close();
            this.throwTranslatedWriteException(e, operationContext);
        }
    }

    @Override
    public void sendMessageAsync(List<ByteBuf> byteBuffers, int lastRequestId, OperationContext operationContext, SingleResultCallback<Void> callback) {
        AsyncRunnable.beginAsync().thenRun(c -> {
            Assertions.notNull("stream is open", this.stream);
            if (this.isClosed()) {
                throw new MongoSocketClosedException("Cannot write to a closed stream", this.getServerAddress());
            }
            c.complete(c);
        }).thenRunTryCatchAsyncBlocks(c -> this.stream.writeAsync(byteBuffers, operationContext, c.asHandler()), Exception.class, (e, c) -> {
            try {
                this.close();
                this.throwTranslatedWriteException((Throwable)e, operationContext);
            }
            catch (Throwable translatedException) {
                c.completeExceptionally(translatedException);
            }
        }).finish(ErrorHandlingResultCallback.errorHandlingCallback(callback, LOGGER));
    }

    @Override
    public ResponseBuffers receiveMessage(int responseTo, OperationContext operationContext) {
        Assertions.assertNotNull(this.stream);
        if (this.isClosed()) {
            throw new MongoSocketClosedException("Cannot read from a closed stream", this.getServerAddress());
        }
        return this.receiveResponseBuffers(operationContext);
    }

    @Override
    public void receiveMessageAsync(int responseTo, OperationContext operationContext, SingleResultCallback<ResponseBuffers> callback) {
        Assertions.assertNotNull(this.stream);
        if (this.isClosed()) {
            callback.onResult(null, new MongoSocketClosedException("Can not read from a closed socket", this.getServerAddress()));
            return;
        }
        this.readAsync(16, operationContext, new MessageHeaderCallback(operationContext, (result, t) -> {
            if (t != null) {
                this.close();
                callback.onResult(null, t);
            } else {
                callback.onResult((ResponseBuffers)result, null);
            }
        }));
    }

    private void readAsync(int numBytes, final OperationContext operationContext, final SingleResultCallback<ByteBuf> callback) {
        if (this.isClosed()) {
            callback.onResult(null, new MongoSocketClosedException("Cannot read from a closed stream", this.getServerAddress()));
            return;
        }
        try {
            this.stream.readAsync(numBytes, operationContext, new AsyncCompletionHandler<ByteBuf>(){

                @Override
                public void completed(@Nullable ByteBuf buffer) {
                    callback.onResult(buffer, null);
                }

                @Override
                public void failed(Throwable t) {
                    InternalStreamConnection.this.close();
                    callback.onResult(null, InternalStreamConnection.this.translateReadException(t, operationContext));
                }
            });
        }
        catch (Exception e) {
            this.close();
            callback.onResult(null, this.translateReadException(e, operationContext));
        }
    }

    private ConnectionId getId() {
        return this.description.getConnectionId();
    }

    private ServerAddress getServerAddress() {
        return this.description.getServerAddress();
    }

    private void updateSessionContext(SessionContext sessionContext, ResponseBuffers responseBuffers) {
        BsonDocument recoveryToken;
        sessionContext.advanceOperationTime(ProtocolHelper.getOperationTime(responseBuffers));
        sessionContext.advanceClusterTime(ProtocolHelper.getClusterTime(responseBuffers));
        sessionContext.setSnapshotTimestamp(ProtocolHelper.getSnapshotTimestamp(responseBuffers));
        if (sessionContext.hasActiveTransaction() && (recoveryToken = ProtocolHelper.getRecoveryToken(responseBuffers)) != null) {
            sessionContext.setRecoveryToken(recoveryToken);
        }
    }

    private void throwTranslatedWriteException(Throwable e, OperationContext operationContext) {
        if (e instanceof MongoSocketWriteTimeoutException && operationContext.getTimeoutContext().hasTimeoutMS()) {
            throw TimeoutContext.createMongoTimeoutException(e);
        }
        if (e instanceof MongoException) {
            throw (MongoException)e;
        }
        Optional<MongoInterruptedException> interruptedException = InterruptionUtil.translateInterruptedException(e, "Interrupted while sending message");
        if (interruptedException.isPresent()) {
            throw interruptedException.get();
        }
        if (e instanceof IOException) {
            throw new MongoSocketWriteException("Exception sending message", this.getServerAddress(), e);
        }
        throw new MongoInternalException("Unexpected exception", e);
    }

    private MongoException translateReadException(Throwable e, OperationContext operationContext) {
        if (operationContext.getTimeoutContext().hasTimeoutMS()) {
            if (e instanceof SocketTimeoutException) {
                return TimeoutContext.createMongoTimeoutException(this.createReadTimeoutException((SocketTimeoutException)e));
            }
            if (e instanceof MongoSocketReadTimeoutException) {
                return TimeoutContext.createMongoTimeoutException(e);
            }
        }
        if (e instanceof MongoException) {
            return (MongoException)e;
        }
        Optional<MongoInterruptedException> interruptedException = InterruptionUtil.translateInterruptedException(e, "Interrupted while receiving message");
        if (interruptedException.isPresent()) {
            return interruptedException.get();
        }
        if (e instanceof SocketTimeoutException) {
            return this.createReadTimeoutException((SocketTimeoutException)e);
        }
        if (e instanceof IOException) {
            return new MongoSocketReadException("Exception receiving message", this.getServerAddress(), e);
        }
        if (e instanceof RuntimeException) {
            return new MongoInternalException("Unexpected runtime exception", e);
        }
        return new MongoInternalException("Unexpected exception", e);
    }

    private MongoSocketReadTimeoutException createReadTimeoutException(SocketTimeoutException e) {
        return new MongoSocketReadTimeoutException("Timeout while receiving message", this.getServerAddress(), (Throwable)e);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private ResponseBuffers receiveResponseBuffers(OperationContext operationContext) {
        try {
            MessageHeader messageHeader;
            ByteBuf messageHeaderBuffer = this.stream.read(16, operationContext);
            try {
                messageHeader = new MessageHeader(messageHeaderBuffer, this.description.getMaxMessageSize());
            }
            finally {
                messageHeaderBuffer.release();
            }
            ByteBuf messageBuffer = this.stream.read(messageHeader.getMessageLength() - 16, operationContext);
            boolean releaseMessageBuffer = true;
            try {
                if (messageHeader.getOpCode() == OpCode.OP_COMPRESSED.getValue()) {
                    CompressedHeader compressedHeader = new CompressedHeader(messageBuffer, messageHeader);
                    Compressor compressor = this.getCompressor(compressedHeader);
                    ByteBuf buffer = this.getBuffer(compressedHeader.getUncompressedSize());
                    compressor.uncompress(messageBuffer, buffer);
                    buffer.flip();
                    ResponseBuffers responseBuffers = new ResponseBuffers(new ReplyHeader(buffer, compressedHeader), buffer);
                    return responseBuffers;
                }
                ResponseBuffers responseBuffers = new ResponseBuffers(new ReplyHeader(messageBuffer, messageHeader), messageBuffer);
                releaseMessageBuffer = false;
                ResponseBuffers responseBuffers2 = responseBuffers;
                return responseBuffers2;
            }
            finally {
                if (releaseMessageBuffer) {
                    messageBuffer.release();
                }
            }
        }
        catch (Throwable t) {
            this.close();
            throw this.translateReadException(t, operationContext);
        }
    }

    private Compressor getCompressor(CompressedHeader compressedHeader) {
        Compressor compressor = this.compressorMap.get(compressedHeader.getCompressorId());
        if (compressor == null) {
            throw new MongoClientException("Unsupported compressor with identifier " + compressedHeader.getCompressorId());
        }
        return compressor;
    }

    @Override
    public ByteBuf getBuffer(int size) {
        Assertions.notNull("open", this.stream);
        return this.stream.getBuffer(size);
    }

    private CommandEventSender createCommandEventSender(CommandMessage message, ByteBufferBsonOutput bsonOutput, OperationContext operationContext) {
        boolean listensOrLogs;
        boolean bl = listensOrLogs = this.commandListener != null || COMMAND_PROTOCOL_LOGGER.isRequired(LogMessage.Level.DEBUG, this.getClusterId());
        if (!(recordEverything || !this.isMonitoringConnection && this.opened() && this.authenticated.get() && listensOrLogs)) {
            return new NoOpCommandEventSender();
        }
        return new LoggingCommandEventSender(SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, this.description, this.commandListener, operationContext, message, bsonOutput, COMMAND_PROTOCOL_LOGGER, this.loggerSettings);
    }

    private ClusterId getClusterId() {
        return this.description.getConnectionId().getServerId().getClusterId();
    }

    private class MessageHeaderCallback
    implements SingleResultCallback<ByteBuf> {
        private final OperationContext operationContext;
        private final SingleResultCallback<ResponseBuffers> callback;

        MessageHeaderCallback(OperationContext operationContext, SingleResultCallback<ResponseBuffers> callback) {
            this.operationContext = operationContext;
            this.callback = callback;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onResult(@Nullable ByteBuf result, @Nullable Throwable t) {
            if (t != null) {
                this.callback.onResult(null, t);
                return;
            }
            try {
                Assertions.assertNotNull(result);
                MessageHeader messageHeader = new MessageHeader(result, InternalStreamConnection.this.description.getMaxMessageSize());
                InternalStreamConnection.this.readAsync(messageHeader.getMessageLength() - 16, this.operationContext, new MessageCallback(messageHeader));
            }
            catch (Throwable localThrowable) {
                this.callback.onResult(null, localThrowable);
            }
            finally {
                if (result != null) {
                    result.release();
                }
            }
        }

        private class MessageCallback
        implements SingleResultCallback<ByteBuf> {
            private final MessageHeader messageHeader;

            MessageCallback(MessageHeader messageHeader) {
                this.messageHeader = messageHeader;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onResult(@Nullable ByteBuf result, @Nullable Throwable t) {
                if (t != null) {
                    MessageHeaderCallback.this.callback.onResult(null, t);
                    return;
                }
                boolean releaseResult = true;
                Assertions.assertNotNull(result);
                try {
                    ByteBuf responseBuffer;
                    ReplyHeader replyHeader;
                    if (this.messageHeader.getOpCode() == OpCode.OP_COMPRESSED.getValue()) {
                        try {
                            CompressedHeader compressedHeader = new CompressedHeader(result, this.messageHeader);
                            Compressor compressor = InternalStreamConnection.this.getCompressor(compressedHeader);
                            ByteBuf buffer = InternalStreamConnection.this.getBuffer(compressedHeader.getUncompressedSize());
                            compressor.uncompress(result, buffer);
                            buffer.flip();
                            replyHeader = new ReplyHeader(buffer, compressedHeader);
                            responseBuffer = buffer;
                        }
                        finally {
                            releaseResult = false;
                            result.release();
                        }
                    } else {
                        replyHeader = new ReplyHeader(result, this.messageHeader);
                        responseBuffer = result;
                        releaseResult = false;
                    }
                    MessageHeaderCallback.this.callback.onResult(new ResponseBuffers(replyHeader, responseBuffer), null);
                }
                catch (Throwable localThrowable) {
                    MessageHeaderCallback.this.callback.onResult(null, localThrowable);
                }
                finally {
                    if (releaseResult) {
                        result.release();
                    }
                }
            }
        }
    }
}

