package fr.modcraftmc.crossservercore.rabbitmq;

import com.mojang.datafixers.util.Pair;
import fr.modcraftmc.crossservercore.CrossServerCore;
import fr.modcraftmc.crossservercore.References;
import fr.modcraftmc.crossservercore.api.rabbitmq.IMessageStreamsManager;
import fr.modcraftmc.crossservercore.events.RabbitmqConnectionReadyEvent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import net.minecraftforge.common.MinecraftForge;

/* loaded from: input_file:fr/modcraftmc/crossservercore/rabbitmq/MessageStreamsManager.class */
public class MessageStreamsManager implements IMessageStreamsManager {
    private RabbitmqDirectStream directStream;
    private RabbitmqBroadcastStream broadcastStream;
    private boolean ready = false;
    private final List<Pair<String, Consumer<String>>> directSubscriptions = new ArrayList();
    private final List<Consumer<String>> broadcastSubscription = new ArrayList();

    public MessageStreamsManager() {
        MinecraftForge.EVENT_BUS.addListener(this::onRabbitmqConnectionReady);
    }

    private void onRabbitmqConnectionReady(RabbitmqConnectionReadyEvent rabbitmqConnectionReadyEvent) {
        CrossServerCore.LOGGER.debug("Initializing message streams");
        this.directStream = new RabbitmqDirectStream(rabbitmqConnectionReadyEvent.getRabbitmqConnection(), References.DIRECT_EXCHANGE_NAME);
        this.broadcastStream = new RabbitmqBroadcastStream(rabbitmqConnectionReadyEvent.getRabbitmqConnection(), References.GLOBAL_EXCHANGE_NAME);
        this.ready = true;
        for (Pair<String, Consumer<String>> pair : this.directSubscriptions) {
            subscribeDirectMessage((String) pair.getFirst(), (Consumer) pair.getSecond());
        }
        Iterator<Consumer<String>> it = this.broadcastSubscription.iterator();
        while (it.hasNext()) {
            subscribeBroadcastMessage(it.next());
        }
        CrossServerCore.LOGGER.debug("Message streams initialized");
    }

    @Override // fr.modcraftmc.crossservercore.api.rabbitmq.IMessageStreamsManager
    public void sendDirectMessage(String str, String str2) {
        if (!this.ready) {
            CrossServerCore.LOGGER.error("Cannot send direct message, rabbitmq connection is not ready");
            return;
        }
        try {
            this.directStream.publish(str, str2);
        } catch (IOException e) {
            CrossServerCore.LOGGER.error(String.format("Error while publishing message to rabbitmq cannot send message to route point %s : %s", str, e.getMessage()));
        }
    }

    @Override // fr.modcraftmc.crossservercore.api.rabbitmq.IMessageStreamsManager
    public void sendBroadcastMessage(String str) {
        if (!this.ready) {
            CrossServerCore.LOGGER.error("Cannot send global message, rabbitmq connection is not ready");
            return;
        }
        try {
            this.broadcastStream.publish(str);
        } catch (IOException e) {
            CrossServerCore.LOGGER.error(String.format("Error while publishing message to rabbitmq cannot send message to global exchange : %s", e.getMessage()));
        }
    }

    @Override // fr.modcraftmc.crossservercore.api.rabbitmq.IMessageStreamsManager
    public void subscribeDirectMessage(String str, Consumer<String> consumer) {
        if (this.ready) {
            this.directStream.subscribe(str, (str2, delivery) -> {
                consumer.accept(new String(delivery.getBody()));
            });
        } else {
            this.directSubscriptions.add(Pair.of(str, consumer));
        }
    }

    @Override // fr.modcraftmc.crossservercore.api.rabbitmq.IMessageStreamsManager
    public void subscribeBroadcastMessage(Consumer<String> consumer) {
        if (this.ready) {
            this.broadcastStream.subscribe((str, delivery) -> {
                consumer.accept(new String(delivery.getBody()));
            });
        } else {
            this.broadcastSubscription.add(consumer);
        }
    }
}
