package dev.miku.r2dbc.mysql;

import dev.miku.r2dbc.mysql.codec.Codecs;
import dev.miku.r2dbc.mysql.message.FieldValue;
import dev.miku.r2dbc.mysql.message.server.DefinitionMetadataMessage;
import dev.miku.r2dbc.mysql.message.server.EofMessage;
import dev.miku.r2dbc.mysql.message.server.OkMessage;
import dev.miku.r2dbc.mysql.message.server.RowMessage;
import dev.miku.r2dbc.mysql.message.server.ServerMessage;
import dev.miku.r2dbc.mysql.message.server.SyntheticMetadataMessage;
import dev.miku.r2dbc.mysql.util.AssertUtils;
import dev.miku.r2dbc.mysql.util.OperatorUtils;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SynchronousSink;
import reactor.util.annotation.Nullable;

/* loaded from: input_file:dev/miku/r2dbc/mysql/MySqlResult.class */
public final class MySqlResult implements Result {
    private static final Function<OkMessage, Integer> ROWS_UPDATED = okMessage -> {
        return Integer.valueOf((int) okMessage.getAffectedRows());
    };
    private static final Consumer<ReferenceCounted> RELEASE = (v0) -> {
        v0.release();
    };
    private final boolean isBinary;
    private final Codecs codecs;
    private final ConnectionContext context;

    @Nullable
    private final String generatedKeyName;
    private final AtomicReference<Flux<ServerMessage>> messages;
    private final MonoProcessor<OkMessage> okProcessor = MonoProcessor.create();
    private MySqlRowMetadata rowMetadata;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MySqlResult(boolean z, Codecs codecs, ConnectionContext connectionContext, @Nullable String str, Flux<ServerMessage> flux) {
        this.isBinary = z;
        this.codecs = (Codecs) AssertUtils.requireNonNull(codecs, "codecs must not be null");
        this.context = (ConnectionContext) AssertUtils.requireNonNull(connectionContext, "context must not be null");
        this.generatedKeyName = str;
        this.messages = new AtomicReference<>(AssertUtils.requireNonNull(flux, "messages must not be null"));
    }

    @Override // io.r2dbc.spi.Result
    public Mono<Integer> getRowsUpdated() {
        return affects().map(ROWS_UPDATED);
    }

    @Override // io.r2dbc.spi.Result
    public <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> biFunction) {
        AssertUtils.requireNonNull(biFunction, "mapping function must not be null");
        return this.generatedKeyName == null ? results().handle((serverMessage, synchronousSink) -> {
            handleResult(serverMessage, synchronousSink, biFunction);
        }) : affects().map(okMessage -> {
            InsertSyntheticRow insertSyntheticRow = new InsertSyntheticRow(this.codecs, this.generatedKeyName, okMessage.getLastInsertId());
            return biFunction.apply(insertSyntheticRow, insertSyntheticRow);
        });
    }

    private Mono<OkMessage> affects() {
        return this.okProcessor.doOnSubscribe(subscription -> {
            Flux<ServerMessage> andSet = this.messages.getAndSet(null);
            if (andSet == null) {
                return;
            }
            Consumer<? super ServerMessage> consumer = serverMessage -> {
                if (serverMessage instanceof OkMessage) {
                    this.okProcessor.onNext((OkMessage) serverMessage);
                } else if (serverMessage instanceof EofMessage) {
                    this.okProcessor.onComplete();
                } else {
                    ReferenceCountUtil.safeRelease(serverMessage);
                }
            };
            MonoProcessor<OkMessage> monoProcessor = this.okProcessor;
            monoProcessor.getClass();
            Consumer<? super Throwable> consumer2 = monoProcessor::onError;
            MonoProcessor<OkMessage> monoProcessor2 = this.okProcessor;
            monoProcessor2.getClass();
            andSet.subscribe(consumer, consumer2, monoProcessor2::onComplete);
        });
    }

    private Flux<ServerMessage> results() {
        return Flux.defer(() -> {
            Flux<ServerMessage> andSet = this.messages.getAndSet(null);
            if (andSet == null) {
                return Flux.error(new IllegalStateException("Source has been released"));
            }
            this.okProcessor.onComplete();
            return OperatorUtils.discardOnCancel(andSet).doOnDiscard(ReferenceCounted.class, RELEASE);
        });
    }

    private <T> void handleResult(ServerMessage serverMessage, SynchronousSink<T> synchronousSink, BiFunction<Row, RowMetadata, ? extends T> biFunction) {
        if (serverMessage instanceof SyntheticMetadataMessage) {
            DefinitionMetadataMessage[] unwrap = ((SyntheticMetadataMessage) serverMessage).unwrap();
            if (unwrap.length == 0) {
                return;
            }
            this.rowMetadata = MySqlRowMetadata.create(unwrap);
            return;
        }
        if (serverMessage instanceof RowMessage) {
            processRow((RowMessage) serverMessage, synchronousSink, biFunction);
        } else {
            ReferenceCountUtil.safeRelease(serverMessage);
        }
    }

    private <T> void processRow(RowMessage rowMessage, SynchronousSink<T> synchronousSink, BiFunction<Row, RowMetadata, ? extends T> biFunction) {
        MySqlRowMetadata mySqlRowMetadata = this.rowMetadata;
        if (mySqlRowMetadata == null) {
            ReferenceCountUtil.safeRelease(rowMessage);
            synchronousSink.error(new IllegalStateException("No MySqlRowMetadata available"));
            return;
        }
        try {
            FieldValue[] decode = rowMessage.decode(this.isBinary, mySqlRowMetadata.unwrap());
            ReferenceCountUtil.safeRelease(rowMessage);
            try {
                T apply = biFunction.apply(new MySqlRow(decode, mySqlRowMetadata, this.codecs, this.isBinary, this.context), mySqlRowMetadata);
                for (FieldValue fieldValue : decode) {
                    ReferenceCountUtil.safeRelease(fieldValue);
                }
                synchronousSink.next(apply);
            } catch (Throwable th) {
                for (FieldValue fieldValue2 : decode) {
                    ReferenceCountUtil.safeRelease(fieldValue2);
                }
                throw th;
            }
        } catch (Throwable th2) {
            ReferenceCountUtil.safeRelease(rowMessage);
            throw th2;
        }
    }
}
