/*
 * Decompiled with CFR 0.152.
 */
package pe.pi.sctp4j.sctp.small;

import com.phono.srtplight.Log;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import pe.pi.sctp4j.sctp.SCTPMessage;
import pe.pi.sctp4j.sctp.SCTPStream;
import pe.pi.sctp4j.sctp.dataChannel.DECP.DCOpen;
import pe.pi.sctp4j.sctp.messages.DataChunk;
import pe.pi.sctp4j.sctp.small.ThreadedAssociation;

public class BlockingSCTPStream
extends SCTPStream {
    private HashMap<Integer, SCTPMessage> undeliveredOutboundMessages = new HashMap();
    private final ThreadedAssociation _ta;
    private final ExecutorService _ex = Executors.newSingleThreadExecutor(r -> new Thread(r, "Stream-" + id + "-Exec"));

    BlockingSCTPStream(ThreadedAssociation a, Integer id) {
        super(a, id);
        this._ta = a;
    }

    @Override
    public synchronized void send(String message) throws Exception {
        SCTPMessage m = this._ta.makeMessage(message, (SCTPStream)this);
        this.undeliveredOutboundMessages.put(m.getSeq(), m);
        this._ta.sendAndBlock(m);
    }

    @Override
    public synchronized void send(byte[] message) throws Exception {
        SCTPMessage m = this._ta.makeMessage(message, (SCTPStream)this);
        this.undeliveredOutboundMessages.put(m.getSeq(), m);
        this._ta.sendAndBlock(m);
    }

    @Override
    public void send(DCOpen message) throws Exception {
        SCTPMessage m = this._ta.makeMessage(message, (SCTPStream)this);
        this.undeliveredOutboundMessages.put(m.getSeq(), m);
        Log.debug("About to send message for dcep size is " + m.getData().length);
        this._ta.sendAndBlock(m);
    }

    @Override
    public void deliverMessage(SCTPMessage message) {
        this._ex.execute(message);
    }

    @Override
    protected void alOnDCEPStream(SCTPStream _stream, String label, int _pPid) throws Exception {
        this._ex.execute(() -> {
            try {
                super.alOnDCEPStream(_stream, label, _pPid);
            }
            catch (Exception ex) {
                Log.error("can't notify  DCEPStream " + ex.getMessage());
            }
        });
    }

    @Override
    public void delivered(DataChunk d) {
        int ssn;
        SCTPMessage st;
        int f = d.getFlags();
        if ((f & 1) > 0 && (st = this.undeliveredOutboundMessages.remove(ssn = d.getSSeqNo())) != null) {
            st.acked();
        }
    }

    @Override
    public boolean idle() {
        return this.undeliveredOutboundMessages.isEmpty();
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this._ex != null && !this._ex.isShutdown()) {
            this._ex.shutdownNow();
            Log.debug("shutdown of Stream-" + this.getNum() + "-Exec");
        }
    }
}

