package net.draycia.carbon.libs.io.nats.client.impl;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import net.draycia.carbon.libs.io.nats.client.Dispatcher;
import net.draycia.carbon.libs.io.nats.client.ErrorListener;
import net.draycia.carbon.libs.io.nats.client.JetStream;
import net.draycia.carbon.libs.io.nats.client.JetStreamApiException;
import net.draycia.carbon.libs.io.nats.client.JetStreamOptions;
import net.draycia.carbon.libs.io.nats.client.JetStreamSubscription;
import net.draycia.carbon.libs.io.nats.client.Message;
import net.draycia.carbon.libs.io.nats.client.MessageHandler;
import net.draycia.carbon.libs.io.nats.client.PublishOptions;
import net.draycia.carbon.libs.io.nats.client.PullSubscribeOptions;
import net.draycia.carbon.libs.io.nats.client.PushSubscribeOptions;
import net.draycia.carbon.libs.io.nats.client.api.ConsumerConfiguration;
import net.draycia.carbon.libs.io.nats.client.api.ConsumerInfo;
import net.draycia.carbon.libs.io.nats.client.api.PublishAck;
import net.draycia.carbon.libs.io.nats.client.support.ApiConstants;
import net.draycia.carbon.libs.io.nats.client.support.JsonUtils;
import net.draycia.carbon.libs.io.nats.client.support.NatsJetStreamConstants;
import net.draycia.carbon.libs.io.nats.client.support.Validator;

/* loaded from: input_file:net/draycia/carbon/libs/io/nats/client/impl/NatsJetStream.class */
public class NatsJetStream extends NatsJetStreamImplBase implements JetStream {

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:net/draycia/carbon/libs/io/nats/client/impl/NatsJetStream$AutoAckMessageHandler.class */
    public static class AutoAckMessageHandler implements MessageHandler {
        NatsConnection conn;
        MessageHandler userMH;

        AutoAckMessageHandler(NatsConnection natsConnection, MessageHandler messageHandler) {
            this.conn = natsConnection;
            this.userMH = messageHandler;
        }

        @Override // net.draycia.carbon.libs.io.nats.client.MessageHandler
        public void onMessage(Message message) throws InterruptedException {
            try {
                this.userMH.onMessage(message);
                if (message.isJetStream()) {
                    message.ack();
                }
            } catch (Exception e) {
                ErrorListener errorListener = this.conn.getOptions().getErrorListener();
                if (errorListener != null) {
                    errorListener.exceptionOccurred(this.conn, e);
                }
            }
        }
    }

    public NatsJetStream(NatsConnection natsConnection, JetStreamOptions jetStreamOptions) throws IOException {
        super(natsConnection, jetStreamOptions);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public PublishAck publish(String str, byte[] bArr) throws IOException, JetStreamApiException {
        return publishSyncInternal(str, null, bArr, false, null);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public PublishAck publish(String str, byte[] bArr, PublishOptions publishOptions) throws IOException, JetStreamApiException {
        return publishSyncInternal(str, null, bArr, publishOptions);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public PublishAck publish(Message message) throws IOException, JetStreamApiException {
        Validator.validateNotNull(message, "Message");
        return publishSyncInternal(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), null);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public PublishAck publish(Message message, PublishOptions publishOptions) throws IOException, JetStreamApiException {
        Validator.validateNotNull(message, "Message");
        return publishSyncInternal(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), publishOptions);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public CompletableFuture<PublishAck> publishAsync(String str, byte[] bArr) {
        return publishAsyncInternal(str, null, bArr, null, null);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public CompletableFuture<PublishAck> publishAsync(String str, byte[] bArr, PublishOptions publishOptions) {
        return publishAsyncInternal(str, null, bArr, publishOptions, null);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public CompletableFuture<PublishAck> publishAsync(Message message) {
        Validator.validateNotNull(message, "Message");
        return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), null, null);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public CompletableFuture<PublishAck> publishAsync(Message message, PublishOptions publishOptions) {
        Validator.validateNotNull(message, "Message");
        return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), publishOptions, null);
    }

    private PublishAck publishSyncInternal(String str, Headers headers, byte[] bArr, PublishOptions publishOptions) throws IOException, JetStreamApiException {
        return publishSyncInternal(str, headers, bArr, false, publishOptions);
    }

    @Deprecated
    private PublishAck publishSyncInternal(String str, Headers headers, byte[] bArr, boolean z, PublishOptions publishOptions) throws IOException, JetStreamApiException {
        Headers mergePublishOptions = mergePublishOptions(headers, publishOptions);
        if (!this.jso.isPublishNoAck()) {
            return processPublishResponse(makeInternalRequestResponseRequired(str, mergePublishOptions, bArr, z, publishOptions == null ? this.jso.getRequestTimeout() : publishOptions.getStreamTimeout(), false), publishOptions);
        }
        this.conn.publishInternal(str, null, mergePublishOptions, bArr, z);
        return null;
    }

    private CompletableFuture<PublishAck> publishAsyncInternal(String str, Headers headers, byte[] bArr, PublishOptions publishOptions, Duration duration) {
        return publishAsyncInternal(str, headers, bArr, false, publishOptions, duration);
    }

    @Deprecated
    private CompletableFuture<PublishAck> publishAsyncInternal(String str, Headers headers, byte[] bArr, boolean z, PublishOptions publishOptions, Duration duration) {
        Headers mergePublishOptions = mergePublishOptions(headers, publishOptions);
        if (!this.jso.isPublishNoAck()) {
            return this.conn.requestFutureInternal(str, mergePublishOptions, bArr, z, duration, false).thenCompose(message -> {
                try {
                    responseRequired(message);
                    return CompletableFuture.completedFuture(processPublishResponse(message, publishOptions));
                } catch (IOException | JetStreamApiException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        this.conn.publishInternal(str, null, mergePublishOptions, bArr, z);
        return null;
    }

    private PublishAck processPublishResponse(Message message, PublishOptions publishOptions) throws IOException, JetStreamApiException {
        if (message.isStatusMessage()) {
            if (message.getStatus().getCode() == 503) {
                throw new IOException("Error Publishing: No stream available.");
            }
            throw new IOException("Error Publishing: " + message.getStatus().getMessage());
        }
        PublishAck publishAck = new PublishAck(message);
        String stream = publishAck.getStream();
        String stream2 = publishOptions == null ? null : publishOptions.getStream();
        if (stream2 == null || stream2.equals(stream)) {
            return publishAck;
        }
        throw new IOException("Expected ack from stream " + stream2 + ", received from: " + stream);
    }

    private Headers mergePublishOptions(Headers headers, PublishOptions publishOptions) {
        Headers headers2 = headers == null ? null : new Headers(headers);
        if (publishOptions != null) {
            headers2 = mergeString(mergeString(mergeString(mergeNum(mergeNum(headers2, NatsJetStreamConstants.EXPECTED_LAST_SEQ_HDR, publishOptions.getExpectedLastSequence()), NatsJetStreamConstants.EXPECTED_LAST_SUB_SEQ_HDR, publishOptions.getExpectedLastSubjectSequence()), NatsJetStreamConstants.EXPECTED_LAST_MSG_ID_HDR, publishOptions.getExpectedLastMsgId()), NatsJetStreamConstants.EXPECTED_STREAM_HDR, publishOptions.getExpectedStream()), NatsJetStreamConstants.MSG_ID_HDR, publishOptions.getMessageId());
        }
        return headers2;
    }

    private Headers mergeNum(Headers headers, String str, long j) {
        return j > 0 ? _mergeNum(headers, str, Long.toString(j)) : headers;
    }

    private Headers mergeString(Headers headers, String str, String str2) {
        return Validator.nullOrEmpty(str2) ? headers : _mergeNum(headers, str, str2);
    }

    private Headers _mergeNum(Headers headers, String str, String str2) {
        if (headers == null) {
            headers = new Headers();
        }
        return headers.add(str, str2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    NatsJetStreamSubscription createSubscription(String str, String str2, NatsDispatcher natsDispatcher, MessageHandler messageHandler, boolean z, PushSubscribeOptions pushSubscribeOptions, PullSubscribeOptions pullSubscribeOptions) throws IOException, JetStreamApiException {
        PushSubscribeOptions build;
        String stream;
        ConsumerConfiguration.Builder builder;
        NatsJetStreamSubscription natsJetStreamSubscription;
        boolean z2 = pullSubscribeOptions != 0;
        if (z2) {
            build = pullSubscribeOptions;
            stream = pullSubscribeOptions.getStream();
            builder = ConsumerConfiguration.builder(pullSubscribeOptions.getConsumerConfiguration());
            builder.deliverSubject(null);
            builder.deliverGroup(null);
        } else {
            build = pushSubscribeOptions == null ? PushSubscribeOptions.builder().build() : pushSubscribeOptions;
            stream = build.getStream();
            builder = ConsumerConfiguration.builder(build.getConsumerConfiguration());
            builder.maxPullWaiting(0L);
            str2 = Validator.validateMustMatchIfBothSupplied(builder.getDeliverGroup(), str2, "[SUB-Q01] Consumer Configuration DeliverGroup", "Queue Name");
            builder.deliverGroup(str2);
        }
        boolean isBind = build.isBind();
        String durable = builder.getDurable();
        String deliverSubject = builder.getDeliverSubject();
        String filterSubject = builder.getFilterSubject();
        boolean z3 = true;
        if (stream == null) {
            stream = lookupStreamBySubject(str);
        }
        if (durable != null) {
            ConsumerInfo lookupConsumerInfo = lookupConsumerInfo(stream, durable);
            if (lookupConsumerInfo != null) {
                z3 = false;
                ConsumerConfiguration consumerConfiguration = lookupConsumerInfo.getConsumerConfiguration();
                String deliverSubject2 = consumerConfiguration.getDeliverSubject();
                if (z2) {
                    if (!Validator.nullOrEmpty(deliverSubject2)) {
                        throw new IllegalArgumentException(String.format("[SUB-DS01] Consumer is already configured as a push consumer with deliver subject '%s'.", deliverSubject2));
                    }
                } else {
                    if (Validator.nullOrEmpty(deliverSubject2)) {
                        throw new IllegalArgumentException("[SUB-DS02] Consumer is already configured as a pull consumer with no deliver subject.");
                    }
                    if (deliverSubject != null && !deliverSubject.equals(deliverSubject2)) {
                        throw new IllegalArgumentException(String.format("[SUB-DS03] Existing consumer deliver subject '%s' does not match requested deliver subject '%s'.", deliverSubject2, deliverSubject));
                    }
                }
                String filterSubject2 = consumerConfiguration.getFilterSubject();
                if (filterSubject != null && !filterSubject.equals(filterSubject2)) {
                    throw new IllegalArgumentException(String.format("[SUB-FS01] Subject '%s' mismatches consumer configuration '%s'.", str, filterSubject));
                }
                filterSubject = filterSubject2;
                String deliverGroup = consumerConfiguration.getDeliverGroup();
                if (deliverGroup == null) {
                    if (str2 != null) {
                        throw new IllegalArgumentException(String.format("[SUB-Q03] Existing consumer '%s' is not configured as a queue / deliver group.", durable));
                    }
                    if (lookupConsumerInfo.isPushBound()) {
                        throw new IllegalArgumentException(String.format("[SUB-Q02] Consumer '%s' is already bound to a subscription.", durable));
                    }
                } else {
                    if (str2 == null) {
                        throw new IllegalArgumentException(String.format("[SUB-Q04] Existing consumer '%s' is configured as a queue / deliver group.", durable));
                    }
                    if (!deliverGroup.equals(str2)) {
                        throw new IllegalArgumentException(String.format("[SUB-Q05] Existing consumer deliver group '%s' does not match requested queue / deliver group '%s'.", deliverGroup, str2));
                    }
                }
                deliverSubject = consumerConfiguration.getDeliverSubject();
            } else if (isBind) {
                throw new IllegalArgumentException("[SUB-B01] Consumer not found for durable. Required in bind mode.");
            }
        }
        if (deliverSubject == null) {
            deliverSubject = this.conn.createInbox();
        }
        if (natsDispatcher == null) {
            natsJetStreamSubscription = (NatsJetStreamSubscription) this.conn.createSubscription(deliverSubject, str2, null, true);
        } else {
            natsJetStreamSubscription = (NatsJetStreamSubscription) natsDispatcher.subscribeImpl(deliverSubject, str2, z ? new AutoAckMessageHandler(this.conn, messageHandler) : messageHandler, true);
        }
        if (z3) {
            if (!z2) {
                builder.deliverSubject(deliverSubject);
            }
            builder.filterSubject(filterSubject == null ? str : filterSubject);
            try {
                ConsumerInfo addOrUpdateConsumerInternal = addOrUpdateConsumerInternal(stream, builder.build());
                natsJetStreamSubscription.setupJetStream(this, addOrUpdateConsumerInternal.getName(), addOrUpdateConsumerInternal.getStreamName(), deliverSubject, build);
            } catch (JetStreamApiException e) {
                if (natsDispatcher == null) {
                    natsJetStreamSubscription.unsubscribe();
                } else {
                    natsDispatcher.unsubscribe(natsJetStreamSubscription);
                }
                throw e;
            }
        } else {
            natsJetStreamSubscription.setupJetStream(this, durable, stream, deliverSubject, build);
        }
        return natsJetStreamSubscription;
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        return createSubscription(str, null, null, null, false, null, null);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str, PushSubscribeOptions pushSubscribeOptions) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        return createSubscription(str, null, null, null, false, pushSubscribeOptions, null);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str, String str2, PushSubscribeOptions pushSubscribeOptions) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        return createSubscription(str, Validator.emptyAsNull(Validator.validateQueueName(str2, false)), null, null, false, pushSubscribeOptions, null);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str, Dispatcher dispatcher, MessageHandler messageHandler, boolean z) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        Validator.validateNotNull(dispatcher, "Dispatcher");
        Validator.validateNotNull(messageHandler, "Handler");
        return createSubscription(str, null, (NatsDispatcher) dispatcher, messageHandler, z, null, null);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str, Dispatcher dispatcher, MessageHandler messageHandler, boolean z, PushSubscribeOptions pushSubscribeOptions) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        Validator.validateNotNull(dispatcher, "Dispatcher");
        Validator.validateNotNull(messageHandler, "Handler");
        return createSubscription(str, null, (NatsDispatcher) dispatcher, messageHandler, z, pushSubscribeOptions, null);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str, String str2, Dispatcher dispatcher, MessageHandler messageHandler, boolean z, PushSubscribeOptions pushSubscribeOptions) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        String emptyAsNull = Validator.emptyAsNull(Validator.validateQueueName(str2, false));
        Validator.validateNotNull(dispatcher, "Dispatcher");
        Validator.validateNotNull(messageHandler, "Handler");
        return createSubscription(str, emptyAsNull, (NatsDispatcher) dispatcher, messageHandler, z, pushSubscribeOptions, null);
    }

    @Override // net.draycia.carbon.libs.io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str, PullSubscribeOptions pullSubscribeOptions) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        Validator.validateNotNull(pullSubscribeOptions, "Options");
        return createSubscription(str, null, null, null, false, null, pullSubscribeOptions);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerInfo lookupConsumerInfo(String str, String str2) throws IOException, JetStreamApiException {
        try {
            return getConsumerInfo(str, str2);
        } catch (JetStreamApiException e) {
            if (e.getApiErrorCode() == 10014) {
                return null;
            }
            if (e.getErrorCode() == 404 && e.getErrorDescription().contains("consumer")) {
                return null;
            }
            throw e;
        }
    }

    protected String lookupStreamBySubject(String str) throws IOException, JetStreamApiException {
        byte[] simpleMessageBody = JsonUtils.simpleMessageBody(ApiConstants.SUBJECT, str);
        StreamNamesReader streamNamesReader = new StreamNamesReader();
        streamNamesReader.process(makeRequestResponseRequired(NatsJetStreamConstants.JSAPI_STREAM_NAMES, simpleMessageBody, this.jso.getRequestTimeout()));
        if (streamNamesReader.getStrings().size() != 1) {
            throw new IllegalStateException("No matching streams for subject: " + str);
        }
        return streamNamesReader.getStrings().get(0);
    }
}
