package com.mongodb.reactivestreams.client.internal.gridfs;

import com.mongodb.MongoGridFSException;
import com.mongodb.assertions.Assertions;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.gridfs.GridFSDownloadPublisher;
import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher;
import com.mongodb.reactivestreams.client.internal.TimeoutHelper;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.bson.Document;
import org.bson.types.Binary;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/mongodb/reactivestreams/client/internal/gridfs/GridFSDownloadPublisherImpl.class */
public class GridFSDownloadPublisherImpl implements GridFSDownloadPublisher {
    private static final String TIMEOUT_ERROR_MESSAGE = "Finding chunks exceeded the timeout limit.";
    private final ClientSession clientSession;
    private final Function<Timeout, GridFSFindPublisher> gridFSFileMono;
    private final MongoCollection<Document> chunksCollection;
    private Integer bufferSizeBytes;
    private volatile GridFSFile fileInfo;

    @Nullable
    private final Long timeoutMs;

    public GridFSDownloadPublisherImpl(@Nullable ClientSession clientSession, Function<Timeout, GridFSFindPublisher> function, MongoCollection<Document> mongoCollection) {
        this.clientSession = clientSession;
        this.gridFSFileMono = (Function) Assertions.notNull("gridFSFilePublisherCreator", function);
        this.chunksCollection = (MongoCollection) Assertions.notNull("chunksCollection", mongoCollection);
        this.timeoutMs = mongoCollection.getTimeout(TimeUnit.MILLISECONDS);
    }

    @Override // com.mongodb.reactivestreams.client.gridfs.GridFSDownloadPublisher
    public Publisher<GridFSFile> getGridFSFile() {
        return this.fileInfo != null ? Mono.fromCallable(() -> {
            return this.fileInfo;
        }) : Mono.from(this.gridFSFileMono.apply(TimeoutContext.startTimeout(this.timeoutMs))).doOnNext(gridFSFile -> {
            this.fileInfo = gridFSFile;
        });
    }

    @Override // com.mongodb.reactivestreams.client.gridfs.GridFSDownloadPublisher
    public GridFSDownloadPublisher bufferSizeBytes(int i) {
        this.bufferSizeBytes = Integer.valueOf(i);
        return this;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
        Flux.defer(() -> {
            Timeout startTimeout = TimeoutContext.startTimeout(this.timeoutMs);
            return Mono.from(this.gridFSFileMono.apply(startTimeout)).doOnSuccess(gridFSFile -> {
                if (gridFSFile == null) {
                    throw new MongoGridFSException("File not found");
                }
                this.fileInfo = gridFSFile;
            }).flatMapMany(gridFSFile2 -> {
                return getChunkPublisher(gridFSFile2, startTimeout);
            });
        }).subscribe(subscriber);
    }

    private Flux<ByteBuffer> getChunkPublisher(GridFSFile gridFSFile, @Nullable Timeout timeout) {
        Document document = new Document("files_id", gridFSFile.getId());
        FindPublisher find = this.clientSession != null ? TimeoutHelper.collectionWithTimeout(this.chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).find(this.clientSession, document) : TimeoutHelper.collectionWithTimeout(this.chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).find(document);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        int ceil = (int) Math.ceil(gridFSFile.getLength() / gridFSFile.getChunkSize());
        Flux<ByteBuffer> doOnComplete = Flux.from(find.sort(new Document("n", 1))).map(document2 -> {
            int andAdd = atomicInteger.getAndAdd(1);
            if (document2 == null || document2.getInteger("n").intValue() != andAdd) {
                throw new MongoGridFSException(String.format("Could not find file chunk for files_id: %s at chunk index %s.", gridFSFile.getId(), Integer.valueOf(andAdd)));
            }
            if (!(document2.get("data") instanceof Binary)) {
                throw new MongoGridFSException("Unexpected data format for the chunk");
            }
            byte[] data = ((Binary) document2.get("data", Binary.class)).getData();
            long j = 0;
            if (ceil > 0) {
                j = andAdd + 1 == ceil ? gridFSFile.getLength() - (andAdd * gridFSFile.getChunkSize()) : gridFSFile.getChunkSize();
            }
            if (data.length != j) {
                throw new MongoGridFSException(String.format("Chunk size data length is not the expected size. The size was %s for file_id: %s chunk index %s it should be %s bytes.", Integer.valueOf(data.length), gridFSFile.getId(), Integer.valueOf(andAdd), Long.valueOf(j)));
            }
            return ByteBuffer.wrap(data);
        }).doOnComplete(() -> {
            if (atomicInteger.get() < ceil) {
                throw new MongoGridFSException(String.format("Could not find file chunk for files_id: %s at chunk index %s.", gridFSFile.getId(), Integer.valueOf(atomicInteger.get())));
            }
        });
        return this.bufferSizeBytes == null ? doOnComplete : new ResizingByteBufferFlux(doOnComplete, this.bufferSizeBytes.intValue());
    }
}
