/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.internal.connection.netty;

import com.mongodb.MongoClientException;
import com.mongodb.MongoException;
import com.mongodb.MongoInternalException;
import com.mongodb.MongoSocketException;
import com.mongodb.MongoSocketOpenException;
import com.mongodb.MongoSocketReadTimeoutException;
import com.mongodb.ServerAddress;
import com.mongodb.annotations.ThreadSafe;
import com.mongodb.assertions.Assertions;
import com.mongodb.connection.AsyncCompletionHandler;
import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.SslSettings;
import com.mongodb.internal.Locks;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.connection.ServerAddressHelper;
import com.mongodb.internal.connection.SslHelper;
import com.mongodb.internal.connection.Stream;
import com.mongodb.internal.connection.netty.NettyByteBuf;
import com.mongodb.internal.thread.InterruptionUtil;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.InetAddressResolver;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.WriteTimeoutHandler;
import java.io.IOException;
import java.net.SocketAddress;
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import org.bson.ByteBuf;

final class NettyStream
implements Stream {
    private static final byte NO_SCHEDULE_TIME = 0;
    private final ServerAddress address;
    private final InetAddressResolver inetAddressResolver;
    private final SocketSettings settings;
    private final SslSettings sslSettings;
    private final EventLoopGroup workerGroup;
    private final Class<? extends SocketChannel> socketChannelClass;
    private final ByteBufAllocator allocator;
    @Nullable
    private final SslContext sslContext;
    private boolean isClosed;
    private volatile Channel channel;
    private final LinkedList<io.netty.buffer.ByteBuf> pendingInboundBuffers = new LinkedList();
    private final Lock lock = new ReentrantLock();
    private PendingReader pendingReader;
    private Throwable pendingException;
    @Nullable
    private ReadTimeoutTask readTimeoutTask;

    NettyStream(ServerAddress address, InetAddressResolver inetAddressResolver, SocketSettings settings, SslSettings sslSettings, EventLoopGroup workerGroup, Class<? extends SocketChannel> socketChannelClass, ByteBufAllocator allocator, @Nullable SslContext sslContext) {
        this.address = address;
        this.inetAddressResolver = inetAddressResolver;
        this.settings = settings;
        this.sslSettings = sslSettings;
        this.workerGroup = workerGroup;
        this.socketChannelClass = socketChannelClass;
        this.allocator = allocator;
        this.sslContext = sslContext;
    }

    @Override
    public ByteBuf getBuffer(int size) {
        return new NettyByteBuf(this.allocator.buffer(size, size));
    }

    @Override
    public void open(OperationContext operationContext) throws IOException {
        FutureAsyncCompletionHandler<Void> handler = new FutureAsyncCompletionHandler<Void>();
        this.openAsync(operationContext, handler);
        handler.get();
    }

    @Override
    public void openAsync(OperationContext operationContext, AsyncCompletionHandler<Void> handler) {
        LinkedList<SocketAddress> socketAddressQueue;
        try {
            socketAddressQueue = new LinkedList<SocketAddress>(ServerAddressHelper.getSocketAddresses(this.address, this.inetAddressResolver));
        }
        catch (Throwable t) {
            handler.failed(t);
            return;
        }
        this.initializeChannel(operationContext, handler, socketAddressQueue);
    }

    private void initializeChannel(OperationContext operationContext, AsyncCompletionHandler<Void> handler, Queue<SocketAddress> socketAddressQueue) {
        if (socketAddressQueue.isEmpty()) {
            handler.failed(new MongoSocketException("Exception opening socket", this.getAddress()));
        } else {
            SocketAddress nextAddress = socketAddressQueue.poll();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(this.workerGroup);
            bootstrap.channel(this.socketChannelClass);
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, operationContext.getTimeoutContext().getConnectTimeoutMs());
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            if (this.settings.getReceiveBufferSize() > 0) {
                bootstrap.option(ChannelOption.SO_RCVBUF, this.settings.getReceiveBufferSize());
            }
            if (this.settings.getSendBufferSize() > 0) {
                bootstrap.option(ChannelOption.SO_SNDBUF, this.settings.getSendBufferSize());
            }
            bootstrap.option(ChannelOption.ALLOCATOR, this.allocator);
            bootstrap.handler(new ChannelInitializer<SocketChannel>(){

                @Override
                public void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (NettyStream.this.sslSettings.isEnabled()) {
                        NettyStream.this.addSslHandler(ch);
                    }
                    pipeline.addLast("ChannelInboundHandlerAdapter", (ChannelHandler)new ChannelInboundHandlerAdapter());
                    NettyStream.this.readTimeoutTask = new ReadTimeoutTask(pipeline.lastContext());
                    pipeline.addLast("InboundBufferHandler", (ChannelHandler)new InboundBufferHandler());
                }
            });
            ChannelFuture channelFuture = bootstrap.connect(nextAddress);
            channelFuture.addListener(new OpenChannelFutureListener(operationContext, socketAddressQueue, channelFuture, handler));
        }
    }

    @Override
    public void write(List<ByteBuf> buffers, OperationContext operationContext) throws IOException {
        FutureAsyncCompletionHandler<Void> future = new FutureAsyncCompletionHandler<Void>();
        this.writeAsync(buffers, operationContext, future);
        future.get();
    }

    @Override
    public ByteBuf read(int numBytes, OperationContext operationContext) throws IOException {
        FutureAsyncCompletionHandler<ByteBuf> future = new FutureAsyncCompletionHandler<ByteBuf>();
        this.readAsync(numBytes, future, operationContext.getTimeoutContext().getReadTimeoutMS());
        return future.get();
    }

    @Override
    public void writeAsync(List<ByteBuf> buffers, OperationContext operationContext, AsyncCompletionHandler<Void> handler) {
        CompositeByteBuf composite = PooledByteBufAllocator.DEFAULT.compositeBuffer();
        for (ByteBuf cur : buffers) {
            composite.addComponent(true, ((NettyByteBuf)cur).asByteBuf().retain());
        }
        long writeTimeoutMS = operationContext.getTimeoutContext().getWriteTimeoutMS();
        Optional<WriteTimeoutHandler> writeTimeoutHandler = this.addWriteTimeoutHandler(writeTimeoutMS);
        this.channel.writeAndFlush(composite).addListener(future -> {
            writeTimeoutHandler.map(w -> this.channel.pipeline().remove((ChannelHandler)w));
            if (!future.isSuccess()) {
                handler.failed(future.cause());
            } else {
                handler.completed(null);
            }
        });
    }

    private Optional<WriteTimeoutHandler> addWriteTimeoutHandler(long writeTimeoutMS) {
        if (writeTimeoutMS != 0L) {
            WriteTimeoutHandler writeTimeoutHandler = new WriteTimeoutHandler(writeTimeoutMS, TimeUnit.MILLISECONDS);
            this.channel.pipeline().addBefore("ChannelInboundHandlerAdapter", "WriteTimeoutHandler", writeTimeoutHandler);
            return Optional.of(writeTimeoutHandler);
        }
        return Optional.empty();
    }

    @Override
    public void readAsync(int numBytes, OperationContext operationContext, AsyncCompletionHandler<ByteBuf> handler) {
        this.readAsync(numBytes, handler, operationContext.getTimeoutContext().getReadTimeoutMS());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void readAsync(int numBytes, AsyncCompletionHandler<ByteBuf> handler, long readTimeoutMillis) {
        ByteBuf buffer = null;
        Throwable exceptionResult = null;
        this.lock.lock();
        try {
            exceptionResult = this.pendingException;
            if (exceptionResult == null) {
                if (!this.hasBytesAvailable(numBytes)) {
                    if (this.pendingReader == null) {
                        this.pendingReader = new PendingReader(numBytes, handler, NettyStream.scheduleReadTimeout(this.readTimeoutTask, readTimeoutMillis));
                    }
                } else {
                    CompositeByteBuf composite = this.allocator.compositeBuffer(this.pendingInboundBuffers.size());
                    int bytesNeeded = numBytes;
                    Iterator iter = this.pendingInboundBuffers.iterator();
                    while (iter.hasNext()) {
                        io.netty.buffer.ByteBuf next = (io.netty.buffer.ByteBuf)iter.next();
                        int bytesNeededFromCurrentBuffer = Math.min(next.readableBytes(), bytesNeeded);
                        if (bytesNeededFromCurrentBuffer == next.readableBytes()) {
                            composite.addComponent(next);
                            iter.remove();
                        } else {
                            next.retain();
                            composite.addComponent(next.readSlice(bytesNeededFromCurrentBuffer));
                        }
                        composite.writerIndex(composite.writerIndex() + bytesNeededFromCurrentBuffer);
                        if ((bytesNeeded -= bytesNeededFromCurrentBuffer) != 0) continue;
                        break;
                    }
                    buffer = new NettyByteBuf(composite).flip();
                }
            }
            if ((exceptionResult != null || buffer != null) && this.pendingReader != null) {
                NettyStream.cancel(this.pendingReader.timeout);
                this.pendingReader = null;
            }
        }
        finally {
            this.lock.unlock();
        }
        if (exceptionResult != null) {
            handler.failed(exceptionResult);
        }
        if (buffer != null) {
            handler.completed(buffer);
        }
    }

    private boolean hasBytesAvailable(int numBytes) {
        int bytesAvailable = 0;
        for (io.netty.buffer.ByteBuf cur : this.pendingInboundBuffers) {
            if ((bytesAvailable += cur.readableBytes()) < numBytes) continue;
            return true;
        }
        return false;
    }

    private void handleReadResponse(@Nullable io.netty.buffer.ByteBuf buffer, @Nullable Throwable t) {
        PendingReader localPendingReader = Locks.withLock(this.lock, () -> {
            if (buffer != null) {
                this.pendingInboundBuffers.add(buffer.retain());
            } else {
                this.pendingException = t;
            }
            return this.pendingReader;
        });
        if (localPendingReader != null) {
            this.readAsync(localPendingReader.numBytes, localPendingReader.handler, 0L);
        }
    }

    @Override
    public ServerAddress getAddress() {
        return this.address;
    }

    @Override
    public void close() {
        Locks.withLock(this.lock, () -> {
            this.isClosed = true;
            if (this.channel != null) {
                this.channel.close();
                this.channel = null;
            }
            Iterator iterator2 = this.pendingInboundBuffers.iterator();
            while (iterator2.hasNext()) {
                io.netty.buffer.ByteBuf nextByteBuf = (io.netty.buffer.ByteBuf)iterator2.next();
                iterator2.remove();
                nextByteBuf.release();
            }
        });
    }

    @Override
    public boolean isClosed() {
        return this.isClosed;
    }

    public SocketSettings getSettings() {
        return this.settings;
    }

    public SslSettings getSslSettings() {
        return this.sslSettings;
    }

    public EventLoopGroup getWorkerGroup() {
        return this.workerGroup;
    }

    public Class<? extends SocketChannel> getSocketChannelClass() {
        return this.socketChannelClass;
    }

    public ByteBufAllocator getAllocator() {
        return this.allocator;
    }

    private void addSslHandler(SocketChannel channel) {
        SSLEngine engine;
        if (this.sslContext == null) {
            SSLContext sslContext;
            try {
                sslContext = Optional.ofNullable(this.sslSettings.getContext()).orElse(SSLContext.getDefault());
            }
            catch (NoSuchAlgorithmException e) {
                throw new MongoClientException("Unable to create standard SSLContext", e);
            }
            engine = sslContext.createSSLEngine(this.address.getHost(), this.address.getPort());
        } else {
            engine = this.sslContext.newEngine(channel.alloc(), this.address.getHost(), this.address.getPort());
        }
        engine.setUseClientMode(true);
        SSLParameters sslParameters = engine.getSSLParameters();
        SslHelper.enableSni(this.address.getHost(), sslParameters);
        if (!this.sslSettings.isInvalidHostNameAllowed()) {
            SslHelper.enableHostNameVerification(sslParameters);
        }
        engine.setSSLParameters(sslParameters);
        channel.pipeline().addFirst("ssl", (ChannelHandler)new SslHandler(engine, false));
    }

    private static void cancel(@Nullable Future<?> f) {
        if (f != null) {
            f.cancel(false);
        }
    }

    @Nullable
    private static ScheduledFuture<?> scheduleReadTimeout(@Nullable ReadTimeoutTask readTimeoutTask, long timeoutMillis) {
        if (timeoutMillis == 0L) {
            return null;
        }
        return Assertions.assertNotNull(readTimeoutTask).schedule(timeoutMillis);
    }

    @ThreadSafe
    private static final class ReadTimeoutTask
    implements Runnable {
        private final ChannelHandlerContext ctx;

        private ReadTimeoutTask(ChannelHandlerContext timeoutChannelHandlerContext) {
            this.ctx = timeoutChannelHandlerContext;
        }

        @Override
        public void run() {
            try {
                if (this.ctx.channel().isOpen()) {
                    this.ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
                    this.ctx.close();
                }
            }
            catch (Throwable t) {
                this.ctx.fireExceptionCaught(t);
            }
        }

        @Nullable
        private ScheduledFuture<?> schedule(long timeoutMillis) {
            return timeoutMillis > 0L ? this.ctx.executor().schedule(this, timeoutMillis, TimeUnit.MILLISECONDS) : null;
        }
    }

    private static final class FutureAsyncCompletionHandler<T>
    implements AsyncCompletionHandler<T> {
        private final CountDownLatch latch = new CountDownLatch(1);
        private volatile T t;
        private volatile Throwable throwable;

        FutureAsyncCompletionHandler() {
        }

        @Override
        public void completed(@Nullable T t) {
            this.t = t;
            this.latch.countDown();
        }

        @Override
        public void failed(Throwable t) {
            this.throwable = t;
            this.latch.countDown();
        }

        public T get() throws IOException {
            try {
                this.latch.await();
                if (this.throwable != null) {
                    if (this.throwable instanceof IOException) {
                        throw (IOException)this.throwable;
                    }
                    if (this.throwable instanceof MongoException) {
                        throw (MongoException)this.throwable;
                    }
                    throw new MongoInternalException("Exception thrown from Netty Stream", this.throwable);
                }
                return this.t;
            }
            catch (InterruptedException e) {
                throw InterruptionUtil.interruptAndCreateMongoInterruptedException("Interrupted", e);
            }
        }
    }

    private class OpenChannelFutureListener
    implements ChannelFutureListener {
        private final Queue<SocketAddress> socketAddressQueue;
        private final ChannelFuture channelFuture;
        private final AsyncCompletionHandler<Void> handler;
        private final OperationContext operationContext;

        OpenChannelFutureListener(OperationContext operationContext, Queue<SocketAddress> socketAddressQueue, ChannelFuture channelFuture, AsyncCompletionHandler<Void> handler) {
            this.operationContext = operationContext;
            this.socketAddressQueue = socketAddressQueue;
            this.channelFuture = channelFuture;
            this.handler = handler;
        }

        @Override
        public void operationComplete(ChannelFuture future) {
            Locks.withLock(NettyStream.this.lock, () -> {
                if (future.isSuccess()) {
                    if (NettyStream.this.isClosed) {
                        this.channelFuture.channel().close();
                    } else {
                        NettyStream.this.channel = this.channelFuture.channel();
                        NettyStream.this.channel.closeFuture().addListener(future1 -> NettyStream.this.handleReadResponse(null, new IOException("The connection to the server was closed")));
                    }
                    this.handler.completed(null);
                } else if (NettyStream.this.isClosed) {
                    this.handler.completed(null);
                } else if (this.socketAddressQueue.isEmpty()) {
                    this.handler.failed(new MongoSocketOpenException("Exception opening socket", NettyStream.this.getAddress(), future.cause()));
                } else {
                    NettyStream.this.initializeChannel(this.operationContext, this.handler, this.socketAddressQueue);
                }
            });
        }
    }

    private static final class PendingReader {
        private final int numBytes;
        private final AsyncCompletionHandler<ByteBuf> handler;
        @Nullable
        private final ScheduledFuture<?> timeout;

        private PendingReader(int numBytes, AsyncCompletionHandler<ByteBuf> handler, @Nullable ScheduledFuture<?> timeout) {
            this.numBytes = numBytes;
            this.handler = handler;
            this.timeout = timeout;
        }
    }

    private class InboundBufferHandler
    extends SimpleChannelInboundHandler<io.netty.buffer.ByteBuf> {
        private InboundBufferHandler() {
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, io.netty.buffer.ByteBuf buffer) {
            NettyStream.this.handleReadResponse(buffer, null);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) {
            if (t instanceof ReadTimeoutException) {
                NettyStream.this.handleReadResponse(null, new MongoSocketReadTimeoutException("Timeout while receiving message", NettyStream.this.address, t));
            } else {
                NettyStream.this.handleReadResponse(null, t);
            }
            ctx.close();
        }
    }
}

