package com.mongodb.reactivestreams.client.internal.gridfs;

import com.mongodb.MongoGridFSException;
import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.ReadPreference;
import com.mongodb.assertions.Assertions;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher;
import com.mongodb.reactivestreams.client.internal.TimeoutHelper;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.types.Binary;
import org.bson.types.ObjectId;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.class */
public final class GridFSUploadPublisherImpl implements GridFSUploadPublisher<Void> {
    private static final String TIMEOUT_ERROR_MESSAGE = "Saving chunks exceeded the timeout limit.";
    private static final Document PROJECTION = new Document("_id", 1);
    private static final Document FILES_INDEX = new Document("filename", 1).append("uploadDate", 1);
    private static final Document CHUNKS_INDEX = new Document("files_id", 1).append("n", 1);
    private final ClientSession clientSession;
    private final MongoCollection<GridFSFile> filesCollection;
    private final MongoCollection<Document> chunksCollection;
    private final BsonValue fileId;
    private final String filename;
    private final int chunkSizeBytes;
    private final Document metadata;
    private final Publisher<ByteBuffer> source;

    @Nullable
    private final Long timeoutMs;

    public GridFSUploadPublisherImpl(@Nullable ClientSession clientSession, MongoCollection<GridFSFile> mongoCollection, MongoCollection<Document> mongoCollection2, BsonValue bsonValue, String str, int i, @Nullable Document document, Publisher<ByteBuffer> publisher) {
        this.clientSession = clientSession;
        this.filesCollection = (MongoCollection) Assertions.notNull("files collection", mongoCollection);
        this.chunksCollection = (MongoCollection) Assertions.notNull("chunks collection", mongoCollection2);
        this.fileId = (BsonValue) Assertions.notNull("File Id", bsonValue);
        this.filename = (String) Assertions.notNull("filename", str);
        this.chunkSizeBytes = i;
        this.metadata = document;
        this.source = publisher;
        this.timeoutMs = mongoCollection.getTimeout(TimeUnit.MILLISECONDS);
    }

    @Override // com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher
    public ObjectId getObjectId() {
        if (this.fileId.isObjectId()) {
            return this.fileId.asObjectId().getValue();
        }
        throw new MongoGridFSException("Custom id type used for this GridFS upload stream");
    }

    @Override // com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher
    public BsonValue getId() {
        return this.fileId;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super Void> subscriber) {
        Mono.deferContextual(contextView -> {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            Timeout startTimeout = TimeoutContext.startTimeout(this.timeoutMs);
            return createCheckAndCreateIndexesMono(startTimeout).then(createSaveChunksMono(atomicBoolean, startTimeout)).flatMap(l -> {
                return createSaveFileDataMono(atomicBoolean, l.longValue(), startTimeout);
            }).onErrorResume(th -> {
                return createCancellationMono(atomicBoolean, startTimeout).onErrorMap(th -> {
                    th.addSuppressed(th);
                    return th;
                }).then(Mono.error(th));
            }).doOnCancel(() -> {
                createCancellationMono(atomicBoolean, startTimeout).contextWrite(contextView).subscribe();
            }).then();
        }).subscribe(subscriber);
    }

    public GridFSUploadPublisher<ObjectId> withObjectId() {
        return new GridFSUploadPublisher<ObjectId>() { // from class: com.mongodb.reactivestreams.client.internal.gridfs.GridFSUploadPublisherImpl.1
            @Override // com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher
            public ObjectId getObjectId() {
                return this.getObjectId();
            }

            @Override // com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher
            public BsonValue getId() {
                return this.getId();
            }

            @Override // org.reactivestreams.Publisher
            public void subscribe(Subscriber<? super ObjectId> subscriber) {
                Mono.from(this).thenReturn(getObjectId()).subscribe(subscriber);
            }
        };
    }

    private Mono<Void> createCheckAndCreateIndexesMono(@Nullable Timeout timeout) {
        return TimeoutHelper.collectionWithTimeoutDeferred(this.filesCollection.withDocumentClass(Document.class).withReadPreference(ReadPreference.primary()), timeout).map(mongoCollection -> {
            return this.clientSession != null ? mongoCollection.find(this.clientSession) : mongoCollection.find();
        }).flatMap(findPublisher -> {
            return Mono.from(findPublisher.projection(PROJECTION).first());
        }).switchIfEmpty(Mono.defer(() -> {
            return checkAndCreateIndex(this.filesCollection.withReadPreference(ReadPreference.primary()), FILES_INDEX, timeout).then(checkAndCreateIndex(this.chunksCollection.withReadPreference(ReadPreference.primary()), CHUNKS_INDEX, timeout)).then(Mono.empty());
        })).then();
    }

    private <T> Mono<Boolean> hasIndex(MongoCollection<T> mongoCollection, Document document, @Nullable Timeout timeout) {
        return TimeoutHelper.collectionWithTimeoutDeferred(mongoCollection, timeout).map(mongoCollection2 -> {
            return this.clientSession != null ? mongoCollection2.listIndexes(this.clientSession) : mongoCollection2.listIndexes();
        }).flatMapMany((v0) -> {
            return Flux.from(v0);
        }).collectList().map(list -> {
            boolean z = false;
            Iterator it = list.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Document document2 = (Document) ((Document) it.next()).get((Object) "key", (String) new Document());
                for (Map.Entry<String, Object> entry : document2.entrySet()) {
                    if (entry.getValue() instanceof Number) {
                        entry.setValue(Integer.valueOf(((Number) entry.getValue()).intValue()));
                    }
                }
                if (document2.equals(document)) {
                    z = true;
                    break;
                }
            }
            return Boolean.valueOf(z);
        });
    }

    private <T> Mono<Void> checkAndCreateIndex(MongoCollection<T> mongoCollection, Document document, @Nullable Timeout timeout) {
        return hasIndex(mongoCollection, document, timeout).flatMap(bool -> {
            return !bool.booleanValue() ? createIndexMono(mongoCollection, document, timeout).flatMap(str -> {
                return Mono.empty();
            }) : Mono.empty();
        });
    }

    private <T> Mono<String> createIndexMono(MongoCollection<T> mongoCollection, Document document, @Nullable Timeout timeout) {
        return TimeoutHelper.collectionWithTimeoutDeferred(mongoCollection, timeout).flatMap(mongoCollection2 -> {
            return Mono.from(this.clientSession == null ? mongoCollection2.createIndex(document) : mongoCollection2.createIndex(this.clientSession, document));
        });
    }

    private Mono<Long> createSaveChunksMono(AtomicBoolean atomicBoolean, @Nullable Timeout timeout) {
        return new ResizingByteBufferFlux(this.source, this.chunkSizeBytes).takeUntilOther(createMonoTimer(timeout)).index().flatMap(tuple2 -> {
            if (atomicBoolean.get()) {
                return Mono.empty();
            }
            Long l = (Long) tuple2.getT1();
            ByteBuffer byteBuffer = (ByteBuffer) tuple2.getT2();
            byte[] bArr = new byte[byteBuffer.remaining()];
            if (byteBuffer.hasArray()) {
                System.arraycopy(byteBuffer.array(), byteBuffer.position(), bArr, 0, byteBuffer.remaining());
            } else {
                byteBuffer.mark();
                byteBuffer.get(bArr);
                byteBuffer.reset();
            }
            Binary binary = new Binary(bArr);
            Document append = new Document("files_id", this.fileId).append("n", Integer.valueOf(l.intValue())).append("data", binary);
            return Mono.from(this.clientSession == null ? TimeoutHelper.collectionWithTimeout(this.chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(append) : TimeoutHelper.collectionWithTimeout(this.chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(this.clientSession, (ClientSession) append)).thenReturn(Integer.valueOf(binary.length()));
        }).reduce(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        });
    }

    private static Mono<MongoOperationTimeoutException> createMonoTimer(@Nullable Timeout timeout) {
        return (Mono) Timeout.nullAsInfinite(timeout).call(TimeUnit.MILLISECONDS, () -> {
            return Mono.never();
        }, j -> {
            return Mono.delay(Duration.ofMillis(j)).then(createTimeoutMonoError());
        }, () -> {
            return createTimeoutMonoError();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Mono<MongoOperationTimeoutException> createTimeoutMonoError() {
        return Mono.error(TimeoutContext.createMongoTimeoutException("GridFS waiting for data from the source Publisher exceeded the timeout limit."));
    }

    private Mono<InsertOneResult> createSaveFileDataMono(AtomicBoolean atomicBoolean, long j, @Nullable Timeout timeout) {
        Mono collectionWithTimeoutDeferred = TimeoutHelper.collectionWithTimeoutDeferred(this.filesCollection, timeout);
        if (!atomicBoolean.compareAndSet(false, true)) {
            return Mono.empty();
        }
        GridFSFile gridFSFile = new GridFSFile(this.fileId, this.filename, j, this.chunkSizeBytes, new Date(), this.metadata);
        return this.clientSession != null ? collectionWithTimeoutDeferred.flatMap(mongoCollection -> {
            return Mono.from(mongoCollection.insertOne(this.clientSession, (ClientSession) gridFSFile));
        }) : collectionWithTimeoutDeferred.flatMap(mongoCollection2 -> {
            return Mono.from(mongoCollection2.insertOne(gridFSFile));
        });
    }

    private Mono<DeleteResult> createCancellationMono(AtomicBoolean atomicBoolean, @Nullable Timeout timeout) {
        Mono collectionWithTimeoutDeferred = TimeoutHelper.collectionWithTimeoutDeferred(this.chunksCollection, timeout);
        return atomicBoolean.compareAndSet(false, true) ? this.clientSession != null ? collectionWithTimeoutDeferred.flatMap(mongoCollection -> {
            return Mono.from(mongoCollection.deleteMany(this.clientSession, new Document("files_id", this.fileId)));
        }) : collectionWithTimeoutDeferred.flatMap(mongoCollection2 -> {
            return Mono.from(mongoCollection2.deleteMany(new Document("files_id", this.fileId)));
        }) : Mono.empty();
    }
}
