/*
 * Decompiled with CFR 0.152.
 */
package pe.pi.sctp4j.sctp.small;

import com.phono.srtplight.Log;
import com.rtm516.mcxboxbroadcast.shaded.org.bouncycastle.tls.DatagramTransport;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import pe.pi.sctp4j.sctp.Association;
import pe.pi.sctp4j.sctp.AssociationListener;
import pe.pi.sctp4j.sctp.SCTPMessage;
import pe.pi.sctp4j.sctp.SCTPStream;
import pe.pi.sctp4j.sctp.StreamNumberInUseException;
import pe.pi.sctp4j.sctp.dataChannel.DECP.DCOpen;
import pe.pi.sctp4j.sctp.messages.Chunk;
import pe.pi.sctp4j.sctp.messages.DataChunk;
import pe.pi.sctp4j.sctp.messages.InitAckChunk;
import pe.pi.sctp4j.sctp.messages.InitChunk;
import pe.pi.sctp4j.sctp.messages.SackChunk;
import pe.pi.sctp4j.sctp.messages.exceptions.SctpPacketFormatException;
import pe.pi.sctp4j.sctp.messages.exceptions.UnreadyAssociationException;
import pe.pi.sctp4j.sctp.small.BlockingSCTPStream;
import pe.pi.sctp4j.sctp.small.SimpleSCTPTimer;

public class ThreadedAssociation
extends Association
implements Runnable {
    static final int MAXBLOCKS = 100;
    private ArrayBlockingQueue<DataChunk> _freeBlocks;
    private HashMap<Long, DataChunk> _inFlight;
    private long _lastCumuTSNAck;
    static final int MAX_INIT_RETRANS = 8;
    private long _rwnd;
    private long _cwnd;
    private long _ssthresh;
    private long _partial_bytes_acked;
    private boolean _fastRecovery;
    private int _transpMTU = 768;
    private Thread retryThread;
    private Chunk[] _stashCookieEcho;
    private final Object _congestion = new Object();
    private boolean _firstRTT = true;
    private double _srtt;
    private double _rttvar;
    private double _rto = 3.0;
    private final double _rtoBeta = 0.25;
    private final double _rtoAlpha = 0.125;
    private final double _rtoMin = 0.01;
    private final double _rtoMax = 6.0;
    private long t1 = 1000L;
    private long t3 = 1000L;

    public ThreadedAssociation(DatagramTransport transport, AssociationListener al) {
        super(transport, new ExecutorAssociationListener(al));
        try {
            this._transpMTU = Math.min(transport.getReceiveLimit(), transport.getSendLimit());
            Log.debug("Transport MTU is now " + this._transpMTU);
        }
        catch (IOException x) {
            Log.warn("Failed to get suitable transport mtu ");
        }
        this._freeBlocks = new ArrayBlockingQueue(100);
        this._inFlight = new HashMap(100);
        for (int i = 0; i < 100; ++i) {
            DataChunk dc = new DataChunk();
            this._freeBlocks.add(dc);
        }
        this.resetCwnd();
        this.retryThread = new Thread((Runnable)this, "AssocRetry" + __assocNo);
        this.retryThread.start();
    }

    @Override
    protected Chunk[] iackDeal(InitAckChunk iack) {
        Chunk[] ret = super.iackDeal(iack);
        this._stashCookieEcho = ret;
        this._ssthresh = this._rwnd = iack.getAdRecWinCredit();
        return ret;
    }

    @Override
    public void associate() throws SctpPacketFormatException, IOException {
        SimpleSCTPTimer init = new SimpleSCTPTimer(){
            int retries = 0;

            @Override
            public void tick() {
                Log.debug("T1 init timer expired in state " + ThreadedAssociation.this._state.name());
                if (ThreadedAssociation.this._state == Association.State.COOKIEECHOED || ThreadedAssociation.this._state == Association.State.COOKIEWAIT) {
                    try {
                        if (ThreadedAssociation.this._state == Association.State.COOKIEWAIT) {
                            ThreadedAssociation.this.sendInit();
                        } else {
                            ThreadedAssociation.this.send(ThreadedAssociation.this._stashCookieEcho);
                        }
                    }
                    catch (EOFException end) {
                        ThreadedAssociation.this.unexpectedClose(end);
                    }
                    catch (Exception ex) {
                        Log.error("Cant send Init/cookie retry " + this.retries + " because " + ex.toString());
                    }
                    ++this.retries;
                    if (this.retries < 8) {
                        this.setNextRun(ThreadedAssociation.this.getT1());
                    }
                } else {
                    Log.debug("T1 init timer expired with nothing to do");
                }
            }
        };
        this.sendInit();
        init.setNextRun(this.getT1());
    }

    @Override
    public BlockingSCTPStream mkStream(int id) {
        Log.debug("Make new Blocking stream " + id);
        return new BlockingSCTPStream(this, (Integer)id);
    }

    @Override
    public SCTPStream mkStream(int sno, String label) throws StreamNumberInUseException, UnreadyAssociationException, SctpPacketFormatException, IOException, Exception {
        SCTPStream sout = super.mkStream(sno, label);
        if (sout != null) {
            DCOpen dco = new DCOpen(label);
            SCTPMessage mess = this.makeMessage(dco, sout);
            this.sendAndBlock(mess);
        }
        return sout;
    }

    public long getT3() {
        return this.t3;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void enqueue(DataChunk[] ds) {
        Log.verb(" Aspiring to enqueue " + ds[0].toString());
        ThreadedAssociation threadedAssociation = this;
        synchronized (threadedAssociation) {
            long now = System.currentTimeMillis();
            for (DataChunk d : ds) {
                d.setTsn(this._nearTSN++);
                d.setGapAck(false);
                d.setRetryTime(now + this.getT3() - 1L);
                d.setSentTime(now);
                this.reduceRwnd(d.getDataSize());
                Log.verb(" DataChunk enqueued " + d.toString());
            }
            Chunk[] toSend = ds;
            try {
                this.send(toSend);
                for (DataChunk d : ds) {
                    Log.verb("sent, syncing on inFlight... " + d.getTsn());
                    HashMap<Long, DataChunk> hashMap = this._inFlight;
                    synchronized (hashMap) {
                        this._inFlight.put(new Long(d.getTsn()), d);
                    }
                    Log.verb("added to inFlight... " + d.getTsn());
                }
            }
            catch (SctpPacketFormatException ex) {
                Log.error("badly formatted chunks " + ex);
            }
            catch (EOFException end) {
                this.unexpectedClose(end);
            }
            catch (IOException ex) {
                Log.error("Can not send chunks ");
            }
        }
        Log.verb("leaving enqueue" + ds[0].getTsn());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void enqueue(DataChunk d) {
        Log.verb(" Aspiring to enqueue " + d.toString());
        ThreadedAssociation threadedAssociation = this;
        synchronized (threadedAssociation) {
            long now = System.currentTimeMillis();
            d.setTsn(this._nearTSN++);
            d.setGapAck(false);
            d.setRetryTime(now + this.getT3() - 1L);
            d.setSentTime(now);
            this.reduceRwnd(d.getDataSize());
            Log.verb(" DataChunk enqueued " + d.toString());
            Chunk[] toSend = this.addSackIfNeeded(d);
            try {
                this.send(toSend);
                Log.verb("sent, syncing on inFlight... " + d.getTsn());
                HashMap<Long, DataChunk> hashMap = this._inFlight;
                synchronized (hashMap) {
                    this._inFlight.put(new Long(d.getTsn()), d);
                }
                Log.verb("added to inFlight... " + d.getTsn());
            }
            catch (SctpPacketFormatException ex) {
                Log.error("badly formatted chunk " + d.toString());
            }
            catch (EOFException end) {
                this.unexpectedClose(end);
            }
            catch (IOException ex) {
                Log.error("Can not send chunk " + d.toString());
            }
        }
        Log.verb("leaving enqueue" + d.getTsn());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void sendAndBlock(SCTPMessage m) throws Exception {
        if (!this.isAssociated()) {
            throw new IllegalStateException("not associated");
        }
        while (m.hasMoreData()) {
            DataChunk dc = this._freeBlocks.take();
            dc.clean();
            m.fill(dc);
            Log.verb("thinking about waiting for congestion " + dc.getTsn());
            Object object = this._congestion;
            synchronized (object) {
                Log.verb("In congestion sync block ");
                while (!this.maySend(dc.getDataSize())) {
                    Log.verb("about to wait for congestion for " + this.getT3());
                    if (this.isAssociated()) {
                        this._congestion.wait(1000L);
                        continue;
                    }
                    throw new IllegalStateException("not associated");
                }
            }
            this.enqueue(dc);
        }
    }

    private Chunk[] addSackIfNeeded(DataChunk d) {
        Chunk[] ret = new Chunk[]{d};
        return ret;
    }

    @Override
    protected Chunk[] inboundInit(InitChunk init) {
        this._rwnd = init.getAdRecWinCredit();
        Log.debug("Inited rwnd to " + this._rwnd);
        this.setSsthresh(init);
        return super.inboundInit(init);
    }

    private void reduceRwnd(int dataSize) {
        this._rwnd -= (long)dataSize;
        if (this._rwnd < 0L) {
            this._rwnd = 0L;
        }
        Log.debug("Decreased rwnd to " + this._rwnd);
    }

    private void incrRwnd(int dataSize) {
        this._rwnd += (long)dataSize;
        Log.debug("Increased rwnd to " + this._rwnd);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void releaseAllBlocks() {
        HashMap<Long, DataChunk> hashMap = this._inFlight;
        synchronized (hashMap) {
            Collection<DataChunk> vals = this._inFlight.values();
            this._freeBlocks.addAll(vals);
            this._inFlight.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected Chunk[] sackDeal(SackChunk sack) {
        Chunk[] ret = new Chunk[]{};
        if (sack.getCumuTSNAck() >= this._lastCumuTSNAck) {
            long ackedTo = sack.getCumuTSNAck();
            int totalAcked = 0;
            long now = System.currentTimeMillis();
            SackChunk.GapBlock[] gapBlockArray = this._inFlight;
            synchronized (this._inFlight) {
                ArrayList<Long> removals22 = new ArrayList<Long>();
                for (Long k : this._inFlight.keySet()) {
                    if (k > ackedTo) continue;
                    removals22.add(k);
                }
                for (Long k : removals22) {
                    DataChunk d = this._inFlight.remove(k);
                    totalAcked += d.getDataSize();
                    this.setRTO(now - d.getSentTime());
                    try {
                        int sid = d.getStreamId();
                        SCTPStream stream = this.getStream(sid);
                        if (stream != null) {
                            stream.delivered(d);
                        }
                        this._freeBlocks.put(d);
                    }
                    catch (InterruptedException ex) {
                        Log.error("eek - can't replace free block on list!?!");
                    }
                }
                // ** MonitorExit[var8_6] (shouldn't be in output)
                for (SackChunk.GapBlock gb : sack.getGaps()) {
                    long ts = (long)gb.getStart() + ackedTo;
                    long te = (long)gb.getEnd() + ackedTo;
                    HashMap<Long, DataChunk> hashMap = this._inFlight;
                    synchronized (hashMap) {
                        for (long t = ts; t <= te; ++t) {
                            Long l = new Long(t);
                            DataChunk d = this._inFlight.get(l);
                            Log.verb("gap block says far end has seen " + l);
                            if (d == null) {
                                Log.debug("Huh? gap for something not inFlight ?!? " + l);
                                continue;
                            }
                            d.setGapAck(true);
                            totalAcked += d.getDataSize();
                        }
                    }
                }
                int totalDataInFlight = 0;
                HashMap<Long, DataChunk> removals22 = this._inFlight;
                synchronized (removals22) {
                    for (Long k : this._inFlight.keySet()) {
                        DataChunk d = this._inFlight.get(k);
                        if (d.getGapAck()) continue;
                        totalDataInFlight += d.getDataSize();
                    }
                }
                this._rwnd = sack.getArWin() - (long)totalDataInFlight;
                Log.debug("Setting rwnd to " + this._rwnd + " " + sack.getArWin() + " - " + totalDataInFlight);
                boolean advanced = this._lastCumuTSNAck < ackedTo;
                this.adjustCwind(advanced, totalDataInFlight, totalAcked);
                this._lastCumuTSNAck = ackedTo;
            }
        } else {
            Log.debug("Dumping Sack - already seen later sack.");
        }
        return ret;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void resetCwnd() {
        this._cwnd = Math.min(4 * this._transpMTU, Math.max(2 * this._transpMTU, 4380));
        Object object = this._congestion;
        synchronized (object) {
            this._congestion.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void setCwndPostRetrans() {
        this._cwnd = this._transpMTU;
        Object object = this._congestion;
        synchronized (object) {
            this._congestion.notifyAll();
        }
    }

    void setSsthresh(InitChunk init) {
        this._ssthresh = init.getAdRecWinCredit();
    }

    boolean maySend(int sz) {
        boolean maysend = (long)sz <= this._rwnd;
        Log.debug("MaySend (simple version ignores cwnd)" + maysend + " rwnd = " + this._rwnd + " sz = " + sz);
        return maysend;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void adjustCwind(boolean didAdvance, int inFlightBytes, int totalAcked) {
        boolean fullyUtilized;
        boolean bl = fullyUtilized = (long)inFlightBytes - this._cwnd < (long)DataChunk.getCapacity();
        if (this._cwnd <= this._ssthresh) {
            Log.debug("slow start");
            if (didAdvance && fullyUtilized && !this._fastRecovery) {
                int incCwinBy = Math.min(this._transpMTU, totalAcked);
                this._cwnd += (long)incCwinBy;
                Log.debug("cwnd now " + this._cwnd);
            } else {
                Log.debug("cwnd static at " + this._cwnd + " (didAdvance fullyUtilized  _fastRecovery inFlightBytes totalAcked)  " + didAdvance + " " + fullyUtilized + " " + this._fastRecovery + " " + inFlightBytes + " " + totalAcked);
            }
        } else if (didAdvance) {
            this._partial_bytes_acked += (long)totalAcked;
            if (this._partial_bytes_acked >= this._cwnd && fullyUtilized) {
                this._cwnd += (long)this._transpMTU;
                this._partial_bytes_acked -= this._cwnd;
            }
        }
        Object object = this._congestion;
        synchronized (object) {
            this._congestion.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Log.verb("starting retry thread");
        while (this.retryThread != null) {
            long now = System.currentTimeMillis();
            long nextTime = now + this.t3;
            Log.verb("retry timer went off ");
            if (this.canSend()) {
                ArrayList dcs = new ArrayList();
                HashMap<Long, DataChunk> hashMap = this._inFlight;
                synchronized (hashMap) {
                    Log.verb("have " + this._inFlight.values().stream().count() + " data chunks in flight");
                    Log.verb("have " + this._inFlight.values().stream().mapToInt(d -> d.getDataSize()).sum() + " data bytes in flight");
                    this._inFlight.values().stream().filter(d -> d.getRetryTime() <= now).sorted().filter(d -> !d.getGapAck()).forEachOrdered(d -> {
                        Log.verb("adding this to resend list " + d.toString());
                        dcs.add(d);
                    });
                }
                int count = 0;
                while (!dcs.isEmpty() && count < 5) {
                    DataChunk d2 = (DataChunk)dcs.remove(0);
                    Chunk[] da = new DataChunk[]{d2};
                    d2.setRetryTime(now + this.getT3() - 1L);
                    d2.incrementRetryCount();
                    try {
                        Log.debug("Sending retry for  " + d2);
                        this.send(da);
                        ++count;
                    }
                    catch (EOFException end) {
                        if (Log.getLevel() >= 4) {
                            Log.debug("Retry send failed " + end.getMessage());
                            end.printStackTrace();
                        }
                        this.unexpectedClose(end);
                        count = Integer.MAX_VALUE;
                    }
                    catch (Exception ex) {
                        Log.error("Cant send retry - eek " + ex.toString());
                    }
                }
                if (count > 0) {
                    this.setCwndPostRetrans();
                }
            } else {
                Log.verb("Can't send");
            }
            try {
                long toSleep = nextTime - now;
                Log.verb("Will sleep for " + toSleep);
                Thread.sleep(toSleep);
            }
            catch (InterruptedException ex) {
                Log.info("sleep was awoken.");
            }
        }
        Log.verb("leaving retry thread");
    }

    private long getT1() {
        return this.t1;
    }

    private void setRTOnonRFC(long r) {
        this._rto = 0.2;
    }

    private void setRTO(long r) {
        double nrto = 1.0;
        double cr = (double)r / 1000.0;
        if (this._firstRTT) {
            this._firstRTT = false;
            this._srtt = cr;
            this._rttvar = cr / 2.0;
            nrto = this._srtt + 4.0 * this._rttvar;
        } else {
            this._rttvar = 0.75 * this._rttvar + 0.25 * Math.abs(this._srtt - cr);
            this._srtt = 0.875 * this._srtt + 0.125 * cr;
            nrto = this._srtt + 4.0 * this._rttvar;
        }
        Log.debug("new r =" + r + "candidate  rto is " + nrto);
        if (nrto < 0.01) {
            Log.debug("clamping min rto as " + nrto + " < 0.01");
            nrto = 0.01;
        }
        if (nrto > 6.0) {
            Log.debug("clamping max rto as " + nrto + " > 6.0");
            nrto = 6.0;
        }
        if (nrto < 6.0 && nrto > 0.01) {
            this._rto = nrto;
        }
        Log.debug("new rto is " + this._rto);
        this.t1 = (long)(this._rto * 1000.0) * 10L;
        Log.debug("T1 is now " + this.t1 + " ms");
        this.t3 = this._rto > 0.0 ? (long)(1000.0 * this._rto) : 100L;
        Log.debug("T3 is now " + this.t3 + " ms");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unexpectedClose(EOFException end) {
        super.unexpectedClose(end);
        this.releaseAllBlocks();
        Object object = this._congestion;
        synchronized (object) {
            this._congestion.notifyAll();
        }
        this.retryThread = null;
    }

    private static class ExecutorAssociationListener
    implements AssociationListener,
    AutoCloseable {
        private final AssociationListener _appAl;
        private final ExecutorService _ex;
        private int id = 0;

        public ExecutorAssociationListener(AssociationListener al) {
            this._appAl = al;
            this.id = __assocNo - 1;
            this._ex = Executors.newSingleThreadExecutor(r -> new Thread(r, "Assoc-" + this.id + "-Exec"));
        }

        @Override
        public void onAssociated(Association a) {
            if (this._appAl != null) {
                this._ex.execute(() -> this._appAl.onAssociated(a));
            }
        }

        @Override
        public void onDisAssociated(Association a) {
            if (this._appAl != null) {
                if (this._ex.isTerminated()) {
                    Log.warn("Executor terminated... - direct call made..");
                    this._appAl.onDisAssociated(a);
                } else {
                    this._ex.execute(() -> this._appAl.onDisAssociated(a));
                    try {
                        this._ex.awaitTermination(1000L, TimeUnit.MILLISECONDS);
                        this.close();
                    }
                    catch (Throwable x) {
                        Log.warn("Timeout on " + this.toString());
                        try {
                            this.close();
                        }
                        catch (Throwable throwable) {
                            // empty catch block
                        }
                    }
                }
            }
        }

        @Override
        public void onDCEPStream(SCTPStream s2, String label, int type) throws Exception {
            if (this._appAl != null) {
                this._ex.execute(() -> {
                    try {
                        this._appAl.onDCEPStream(s2, label, type);
                    }
                    catch (Exception ex) {
                        Log.warn("onDCEPStream threw exception " + ex.getMessage());
                    }
                });
            }
        }

        @Override
        public void onRawStream(SCTPStream s2) {
            if (this._appAl != null) {
                this._ex.execute(() -> this._appAl.onRawStream(s2));
            }
        }

        @Override
        public void close() throws Exception {
            if (this._ex != null && !this._ex.isShutdown()) {
                this._ex.shutdownNow();
                Log.warn("shutdown of Assoc-" + this.id + "-Exec");
            }
        }
    }
}

