/*
 * Decompiled with CFR 0.152.
 */
package com.github.mizosoft.methanol.internal.cache;

import com.github.mizosoft.methanol.internal.cache.Store;
import com.github.mizosoft.methanol.internal.flow.AbstractSubscription;
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.nullness.qual.Nullable;

public final class CacheReadingPublisher
implements Flow.Publisher<List<ByteBuffer>> {
    private static final System.Logger logger = System.getLogger(CacheReadingPublisher.class.getName());
    private final Store.Viewer viewer;
    private final Executor executor;
    private final Listener listener;
    private final AtomicBoolean subscribed = new AtomicBoolean();

    public CacheReadingPublisher(Store.Viewer viewer, Executor executor) {
        this(viewer, executor, DisabledListener.INSTANCE);
    }

    public CacheReadingPublisher(Store.Viewer viewer, Executor executor, Listener listener) {
        this.viewer = Objects.requireNonNull(viewer);
        this.executor = Objects.requireNonNull(executor);
        this.listener = Objects.requireNonNull(listener);
    }

    @Override
    public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
        Objects.requireNonNull(subscriber);
        if (this.subscribed.compareAndSet(false, true)) {
            new CacheReadingSubscription(subscriber, this.executor, this.viewer, this.listener).signal(true);
        } else {
            FlowSupport.refuse(subscriber, FlowSupport.multipleSubscribersToUnicast());
        }
    }

    private static enum DisabledListener implements Listener
    {
        INSTANCE;


        @Override
        public void onReadSuccess() {
        }

        @Override
        public void onReadFailure(Throwable unused) {
        }
    }

    public static interface Listener {
        public void onReadSuccess();

        public void onReadFailure(Throwable var1);

        default public Listener guarded() {
            return new Listener(){

                @Override
                public void onReadSuccess() {
                    try {
                        this.onReadSuccess();
                    }
                    catch (Throwable e) {
                        logger.log(System.Logger.Level.WARNING, "exception thrown by Listener::onReadSuccess", e);
                    }
                }

                @Override
                public void onReadFailure(Throwable error) {
                    try {
                        this.onReadFailure(error);
                    }
                    catch (Throwable e) {
                        logger.log(System.Logger.Level.WARNING, "exception thrown by Listener::onReadFailure", e);
                    }
                }
            };
        }

        public static Listener disabled() {
            return DisabledListener.INSTANCE;
        }
    }

    static final class CacheReadingSubscription
    extends AbstractSubscription<List<ByteBuffer>> {
        private static final int PREFETCH = 8;
        private static final int PREFETCH_THRESHOLD = 4;
        private static final int BUFFER_SIZE = 8192;
        private static final int MAX_BATCH_SIZE = 4;
        private static final VarHandle STATE;
        private static final VarHandle POSITION;
        private final Store.Viewer viewer;
        private final Listener listener;
        private final ConcurrentLinkedQueue<ByteBuffer> readQueue = new ConcurrentLinkedQueue();
        private volatile ReadingState state = ReadingState.INITIAL;
        private volatile long position;
        private volatile boolean endOfFile;

        CacheReadingSubscription(Flow.Subscriber<? super List<ByteBuffer>> downstream, Executor executor, Store.Viewer viewer, Listener listener) {
            super(downstream, executor);
            this.viewer = viewer;
            this.listener = listener.guarded();
        }

        @Override
        protected long emit(Flow.Subscriber<? super List<ByteBuffer>> downstream, long emit) {
            if (this.state == ReadingState.INITIAL && STATE.compareAndSet(this, ReadingState.INITIAL, ReadingState.IDLE)) {
                this.tryScheduleRead(false);
            }
            long submitted = 0L;
            while (true) {
                List<ByteBuffer> batch;
                if (this.readQueue.isEmpty() && this.endOfFile) {
                    this.cancelOnComplete(downstream);
                    return 0L;
                }
                if (submitted >= emit || (batch = this.pollBatch()).isEmpty()) {
                    return submitted;
                }
                if (!this.submitOnNext(downstream, batch)) break;
                ++submitted;
            }
            return 0L;
        }

        @Override
        protected void abort(boolean flowInterrupted) {
            this.state = ReadingState.DISPOSED;
            this.viewer.close();
        }

        private List<ByteBuffer> pollBatch() {
            ByteBuffer buffer;
            ArrayList<ByteBuffer> batch = null;
            while ((buffer = this.readQueue.poll()) != null) {
                if (batch == null) {
                    batch = new ArrayList<ByteBuffer>();
                }
                batch.add(buffer);
                if (batch.size() < 4) continue;
            }
            if (this.readQueue.size() <= 4) {
                this.tryScheduleRead(false);
            }
            return batch != null ? List.copyOf(batch) : List.of();
        }

        private boolean tryScheduleRead(boolean maintainReadingState) {
            if (this.readQueue.size() < 8 && (maintainReadingState && this.state == ReadingState.READING || STATE.compareAndSet(this, ReadingState.IDLE, ReadingState.READING))) {
                this.scheduleRead();
                return true;
            }
            return false;
        }

        private void scheduleRead() {
            ByteBuffer buffer = ByteBuffer.allocate(8192);
            try {
                this.viewer.readAsync(this.position, buffer).whenComplete((read, error) -> this.onReadCompletion(buffer, (Integer)read, (Throwable)error));
            }
            catch (RuntimeException e) {
                this.state = ReadingState.DISPOSED;
                this.signalError(e);
            }
        }

        private void onReadCompletion(ByteBuffer buffer, @Nullable Integer read, @Nullable Throwable error) {
            assert (read != null || error != null);
            if (this.state == ReadingState.DISPOSED) {
                return;
            }
            if (error != null) {
                this.state = ReadingState.DISPOSED;
                this.listener.onReadFailure(error);
                this.signalError(error);
            } else if (read < 0) {
                this.endOfFile = true;
                this.state = ReadingState.DISPOSED;
                this.listener.onReadSuccess();
                this.signal(true);
            } else {
                if (read > 0) {
                    this.readQueue.offer(buffer.flip().asReadOnlyBuffer());
                    POSITION.getAndAdd(this, read);
                }
                if (!this.tryScheduleRead(true) && STATE.compareAndSet(this, ReadingState.READING, ReadingState.IDLE)) {
                    this.tryScheduleRead(false);
                }
                this.signal(false);
            }
        }

        static {
            MethodHandles.Lookup lookup = MethodHandles.lookup();
            try {
                STATE = lookup.findVarHandle(CacheReadingSubscription.class, "state", ReadingState.class);
                POSITION = lookup.findVarHandle(CacheReadingSubscription.class, "position", Long.TYPE);
            }
            catch (IllegalAccessException | NoSuchFieldException e) {
                throw new ExceptionInInitializerError(e);
            }
        }

        static enum ReadingState {
            INITIAL,
            IDLE,
            READING,
            DISPOSED;

        }
    }
}

