package fr.romitou.mongosk.libs.driver.reactivestreams.client.internal.gridfs;

import fr.romitou.mongosk.libs.bson.BsonValue;
import fr.romitou.mongosk.libs.bson.Document;
import fr.romitou.mongosk.libs.bson.types.Binary;
import fr.romitou.mongosk.libs.bson.types.ObjectId;
import fr.romitou.mongosk.libs.driver.MongoGridFSException;
import fr.romitou.mongosk.libs.driver.ReadPreference;
import fr.romitou.mongosk.libs.driver.assertions.Assertions;
import fr.romitou.mongosk.libs.driver.client.gridfs.model.GridFSFile;
import fr.romitou.mongosk.libs.driver.client.result.DeleteResult;
import fr.romitou.mongosk.libs.driver.client.result.InsertOneResult;
import fr.romitou.mongosk.libs.driver.lang.Nullable;
import fr.romitou.mongosk.libs.driver.reactivestreams.client.ClientSession;
import fr.romitou.mongosk.libs.driver.reactivestreams.client.FindPublisher;
import fr.romitou.mongosk.libs.driver.reactivestreams.client.MongoCollection;
import fr.romitou.mongosk.libs.driver.reactivestreams.client.gridfs.GridFSUploadPublisher;
import fr.romitou.mongosk.libs.reactivestreams.Publisher;
import fr.romitou.mongosk.libs.reactivestreams.Subscriber;
import fr.romitou.mongosk.libs.reactor.core.publisher.Flux;
import fr.romitou.mongosk.libs.reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

/* loaded from: input_file:fr/romitou/mongosk/libs/driver/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.class */
public final class GridFSUploadPublisherImpl implements GridFSUploadPublisher<Void> {
    private static final Document PROJECTION = new Document("_id", 1);
    private static final Document FILES_INDEX = new Document("filename", 1).append("uploadDate", 1);
    private static final Document CHUNKS_INDEX = new Document("files_id", 1).append("n", 1);
    private final ClientSession clientSession;
    private final MongoCollection<GridFSFile> filesCollection;
    private final MongoCollection<Document> chunksCollection;
    private final BsonValue fileId;
    private final String filename;
    private final int chunkSizeBytes;
    private final Document metadata;
    private final Publisher<ByteBuffer> source;

    public GridFSUploadPublisherImpl(@Nullable ClientSession clientSession, MongoCollection<GridFSFile> mongoCollection, MongoCollection<Document> mongoCollection2, BsonValue bsonValue, String str, int i, @Nullable Document document, Publisher<ByteBuffer> publisher) {
        this.clientSession = clientSession;
        this.filesCollection = (MongoCollection) Assertions.notNull("files collection", mongoCollection);
        this.chunksCollection = (MongoCollection) Assertions.notNull("chunks collection", mongoCollection2);
        this.fileId = (BsonValue) Assertions.notNull("File Id", bsonValue);
        this.filename = (String) Assertions.notNull("filename", str);
        this.chunkSizeBytes = i;
        this.metadata = document;
        this.source = publisher;
    }

    @Override // fr.romitou.mongosk.libs.driver.reactivestreams.client.gridfs.GridFSUploadPublisher
    public ObjectId getObjectId() {
        if (this.fileId.isObjectId()) {
            return this.fileId.asObjectId().getValue();
        }
        throw new MongoGridFSException("Custom id type used for this GridFS upload stream");
    }

    @Override // fr.romitou.mongosk.libs.driver.reactivestreams.client.gridfs.GridFSUploadPublisher
    public BsonValue getId() {
        return this.fileId;
    }

    @Override // fr.romitou.mongosk.libs.reactivestreams.Publisher
    public void subscribe(Subscriber<? super Void> subscriber) {
        Mono.create(monoSink -> {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            monoSink.onCancel(() -> {
                createCancellationMono(atomicBoolean).subscribe();
            });
            Consumer<? super Throwable> consumer = th -> {
                createCancellationMono(atomicBoolean).doOnError(th -> {
                    monoSink.error(th);
                }).doOnSuccess(deleteResult -> {
                    monoSink.error(th);
                }).subscribe();
            };
            Consumer consumer2 = l -> {
                createSaveFileDataMono(atomicBoolean, l.longValue()).doOnError(consumer).doOnSuccess(insertOneResult -> {
                    monoSink.success();
                }).subscribe();
            };
            createCheckAndCreateIndexesMono().doOnError(consumer).doOnSuccess(r7 -> {
                createSaveChunksMono(atomicBoolean).doOnError(consumer).doOnSuccess(consumer2).subscribe();
            }).subscribe();
        }).subscribe(subscriber);
    }

    public GridFSUploadPublisher<ObjectId> withObjectId() {
        return new GridFSUploadPublisher<ObjectId>() { // from class: fr.romitou.mongosk.libs.driver.reactivestreams.client.internal.gridfs.GridFSUploadPublisherImpl.1
            @Override // fr.romitou.mongosk.libs.driver.reactivestreams.client.gridfs.GridFSUploadPublisher
            public ObjectId getObjectId() {
                return this.getObjectId();
            }

            @Override // fr.romitou.mongosk.libs.driver.reactivestreams.client.gridfs.GridFSUploadPublisher
            public BsonValue getId() {
                return this.getId();
            }

            @Override // fr.romitou.mongosk.libs.reactivestreams.Publisher
            public void subscribe(Subscriber<? super ObjectId> subscriber) {
                Mono.from(this).thenReturn(getObjectId()).subscribe(subscriber);
            }
        };
    }

    private Mono<Void> createCheckAndCreateIndexesMono() {
        MongoCollection withReadPreference = this.filesCollection.withDocumentClass(Document.class).withReadPreference(ReadPreference.primary());
        FindPublisher find = this.clientSession != null ? withReadPreference.find(this.clientSession) : withReadPreference.find();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        FindPublisher findPublisher = find;
        return Mono.create(monoSink -> {
            Mono from = Mono.from(findPublisher.projection(PROJECTION).first());
            Consumer consumer = document -> {
                atomicBoolean.set(true);
            };
            Objects.requireNonNull(monoSink);
            from.subscribe(consumer, monoSink::error, () -> {
                if (atomicBoolean.get()) {
                    monoSink.success();
                    return;
                }
                Mono<Void> checkAndCreateIndex = checkAndCreateIndex(this.filesCollection.withReadPreference(ReadPreference.primary()), FILES_INDEX);
                Objects.requireNonNull(monoSink);
                checkAndCreateIndex.doOnError(monoSink::error).doOnSuccess(r6 -> {
                    Mono<Void> checkAndCreateIndex2 = checkAndCreateIndex(this.chunksCollection.withReadPreference(ReadPreference.primary()), CHUNKS_INDEX);
                    Objects.requireNonNull(monoSink);
                    Mono<Void> doOnError = checkAndCreateIndex2.doOnError(monoSink::error);
                    Objects.requireNonNull(monoSink);
                    doOnError.doOnSuccess((v1) -> {
                        r1.success(v1);
                    }).subscribe();
                }).subscribe();
            });
        });
    }

    private <T> Mono<Boolean> hasIndex(MongoCollection<T> mongoCollection, Document document) {
        return Flux.from(this.clientSession != null ? mongoCollection.listIndexes(this.clientSession) : mongoCollection.listIndexes()).collectList().map(list -> {
            boolean z = false;
            Iterator it = list.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Document document2 = (Document) ((Document) it.next()).get((Object) "key", (String) new Document());
                for (Map.Entry<String, Object> entry : document2.entrySet()) {
                    if (entry.getValue() instanceof Number) {
                        entry.setValue(Integer.valueOf(((Number) entry.getValue()).intValue()));
                    }
                }
                if (document2.equals(document)) {
                    z = true;
                    break;
                }
            }
            return Boolean.valueOf(z);
        });
    }

    private <T> Mono<Void> checkAndCreateIndex(MongoCollection<T> mongoCollection, Document document) {
        return hasIndex(mongoCollection, document).flatMap(bool -> {
            return !bool.booleanValue() ? createIndexMono(mongoCollection, document).flatMap(str -> {
                return Mono.empty();
            }) : Mono.empty();
        });
    }

    private <T> Mono<String> createIndexMono(MongoCollection<T> mongoCollection, Document document) {
        return Mono.from(this.clientSession == null ? mongoCollection.createIndex(document) : mongoCollection.createIndex(this.clientSession, document));
    }

    private Mono<Long> createSaveChunksMono(AtomicBoolean atomicBoolean) {
        return Mono.create(monoSink -> {
            AtomicLong atomicLong = new AtomicLong(0L);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            Flux<R> flatMap = new ResizingByteBufferFlux(this.source, this.chunkSizeBytes).flatMap(byteBuffer -> {
                if (atomicBoolean.get()) {
                    return Mono.empty();
                }
                byte[] bArr = new byte[byteBuffer.remaining()];
                if (byteBuffer.hasArray()) {
                    System.arraycopy(byteBuffer.array(), byteBuffer.position(), bArr, 0, byteBuffer.remaining());
                } else {
                    byteBuffer.mark();
                    byteBuffer.get(bArr);
                    byteBuffer.reset();
                }
                Binary binary = new Binary(bArr);
                atomicLong.addAndGet(binary.length());
                Document append = new Document("files_id", this.fileId).append("n", Integer.valueOf(atomicInteger.getAndIncrement())).append("data", binary);
                return this.clientSession == null ? this.chunksCollection.insertOne(append) : this.chunksCollection.insertOne(this.clientSession, (ClientSession) append);
            });
            Objects.requireNonNull(monoSink);
            flatMap.subscribe(null, monoSink::error, () -> {
                monoSink.success(Long.valueOf(atomicLong.get()));
            });
        });
    }

    private Mono<InsertOneResult> createSaveFileDataMono(AtomicBoolean atomicBoolean, long j) {
        if (!atomicBoolean.compareAndSet(false, true)) {
            return Mono.empty();
        }
        GridFSFile gridFSFile = new GridFSFile(this.fileId, this.filename, j, this.chunkSizeBytes, new Date(), this.metadata);
        return this.clientSession != null ? Mono.from(this.filesCollection.insertOne(this.clientSession, (ClientSession) gridFSFile)) : Mono.from(this.filesCollection.insertOne(gridFSFile));
    }

    private Mono<DeleteResult> createCancellationMono(AtomicBoolean atomicBoolean) {
        return atomicBoolean.compareAndSet(false, true) ? this.clientSession != null ? Mono.from(this.chunksCollection.deleteMany(this.clientSession, new Document("files_id", this.fileId))) : Mono.from(this.chunksCollection.deleteMany(new Document("files_id", this.fileId))) : Mono.empty();
    }
}
