package com.github.mizosoft.methanol.internal.cache;

import com.github.mizosoft.methanol.internal.cache.Store;
import com.github.mizosoft.methanol.internal.flow.AbstractSubscription;
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
import java.lang.System;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:com/github/mizosoft/methanol/internal/cache/CacheReadingPublisher.class */
public final class CacheReadingPublisher implements Flow.Publisher<List<ByteBuffer>> {
    private static final System.Logger logger = System.getLogger(CacheReadingPublisher.class.getName());
    private final Store.Viewer viewer;
    private final Executor executor;
    private final Listener listener;
    private final AtomicBoolean subscribed;

    /* loaded from: input_file:com/github/mizosoft/methanol/internal/cache/CacheReadingPublisher$CacheReadingSubscription.class */
    static final class CacheReadingSubscription extends AbstractSubscription<List<ByteBuffer>> {
        private static final int PREFETCH = 8;
        private static final int PREFETCH_THRESHOLD = 4;
        private static final int BUFFER_SIZE = 8192;
        private static final int MAX_BATCH_SIZE = 4;
        private static final VarHandle STATE;
        private static final VarHandle POSITION;
        private final Store.Viewer viewer;
        private final Listener listener;
        private final ConcurrentLinkedQueue<ByteBuffer> readQueue;
        private volatile ReadingState state;
        private volatile long position;
        private volatile boolean endOfFile;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:com/github/mizosoft/methanol/internal/cache/CacheReadingPublisher$CacheReadingSubscription$ReadingState.class */
        public enum ReadingState {
            INITIAL,
            IDLE,
            READING,
            DISPOSED
        }

        CacheReadingSubscription(Flow.Subscriber<? super List<ByteBuffer>> subscriber, Executor executor, Store.Viewer viewer, Listener listener) {
            super(subscriber, executor);
            this.readQueue = new ConcurrentLinkedQueue<>();
            this.state = ReadingState.INITIAL;
            this.viewer = viewer;
            this.listener = listener.guarded();
        }

        @Override // com.github.mizosoft.methanol.internal.flow.AbstractSubscription
        protected long emit(Flow.Subscriber<? super List<ByteBuffer>> subscriber, long j) {
            long j2;
            if (this.state == ReadingState.INITIAL && STATE.compareAndSet(this, ReadingState.INITIAL, ReadingState.IDLE)) {
                tryScheduleRead(false);
            }
            long j3 = 0;
            while (true) {
                j2 = j3;
                if (!this.readQueue.isEmpty() || !this.endOfFile) {
                    if (j2 >= j) {
                        break;
                    }
                    List<ByteBuffer> pollBatch = pollBatch();
                    if (pollBatch.isEmpty()) {
                        break;
                    }
                    if (!submitOnNext(subscriber, pollBatch)) {
                        return 0L;
                    }
                    j3 = j2 + 1;
                } else {
                    cancelOnComplete(subscriber);
                    return 0L;
                }
            }
            return j2;
        }

        @Override // com.github.mizosoft.methanol.internal.flow.AbstractSubscription
        protected void abort(boolean z) {
            this.state = ReadingState.DISPOSED;
            this.viewer.close();
        }

        private List<ByteBuffer> pollBatch() {
            ArrayList arrayList = null;
            do {
                ByteBuffer poll = this.readQueue.poll();
                if (poll == null) {
                    break;
                }
                if (arrayList == null) {
                    arrayList = new ArrayList();
                }
                arrayList.add(poll);
            } while (arrayList.size() < 4);
            if (this.readQueue.size() <= 4) {
                tryScheduleRead(false);
            }
            return arrayList != null ? List.copyOf(arrayList) : List.of();
        }

        private boolean tryScheduleRead(boolean z) {
            if (this.readQueue.size() >= 8) {
                return false;
            }
            if ((!z || this.state != ReadingState.READING) && !STATE.compareAndSet(this, ReadingState.IDLE, ReadingState.READING)) {
                return false;
            }
            scheduleRead();
            return true;
        }

        private void scheduleRead() {
            ByteBuffer allocate = ByteBuffer.allocate(BUFFER_SIZE);
            try {
                this.viewer.readAsync(this.position, allocate).whenComplete((num, th) -> {
                    onReadCompletion(allocate, num, th);
                });
            } catch (RuntimeException e) {
                this.state = ReadingState.DISPOSED;
                signalError(e);
            }
        }

        private void onReadCompletion(ByteBuffer byteBuffer, Integer num, Throwable th) {
            if (!$assertionsDisabled && num == null && th == null) {
                throw new AssertionError();
            }
            if (this.state == ReadingState.DISPOSED) {
                return;
            }
            if (th != null) {
                this.state = ReadingState.DISPOSED;
                this.listener.onReadFailure(th);
                signalError(th);
            } else {
                if (num.intValue() < 0) {
                    this.endOfFile = true;
                    this.state = ReadingState.DISPOSED;
                    this.listener.onReadSuccess();
                    signal(true);
                    return;
                }
                if (num.intValue() > 0) {
                    this.readQueue.offer(byteBuffer.flip().asReadOnlyBuffer());
                    POSITION.getAndAdd(this, num);
                }
                if (!tryScheduleRead(true) && STATE.compareAndSet(this, ReadingState.READING, ReadingState.IDLE)) {
                    tryScheduleRead(false);
                }
                signal(false);
            }
        }

        static {
            $assertionsDisabled = !CacheReadingPublisher.class.desiredAssertionStatus();
            MethodHandles.Lookup lookup = MethodHandles.lookup();
            try {
                STATE = lookup.findVarHandle(CacheReadingSubscription.class, "state", ReadingState.class);
                POSITION = lookup.findVarHandle(CacheReadingSubscription.class, "position", Long.TYPE);
            } catch (IllegalAccessException | NoSuchFieldException e) {
                throw new ExceptionInInitializerError(e);
            }
        }
    }

    /* loaded from: input_file:com/github/mizosoft/methanol/internal/cache/CacheReadingPublisher$DisabledListener.class */
    private enum DisabledListener implements Listener {
        INSTANCE;

        @Override // com.github.mizosoft.methanol.internal.cache.CacheReadingPublisher.Listener
        public void onReadSuccess() {
        }

        @Override // com.github.mizosoft.methanol.internal.cache.CacheReadingPublisher.Listener
        public void onReadFailure(Throwable th) {
        }
    }

    /* loaded from: input_file:com/github/mizosoft/methanol/internal/cache/CacheReadingPublisher$Listener.class */
    public interface Listener {
        void onReadSuccess();

        void onReadFailure(Throwable th);

        default Listener guarded() {
            return new Listener() { // from class: com.github.mizosoft.methanol.internal.cache.CacheReadingPublisher.Listener.1
                @Override // com.github.mizosoft.methanol.internal.cache.CacheReadingPublisher.Listener
                public void onReadSuccess() {
                    try {
                        Listener.this.onReadSuccess();
                    } catch (Throwable th) {
                        CacheReadingPublisher.logger.log(System.Logger.Level.WARNING, "exception thrown by Listener::onReadSuccess", th);
                    }
                }

                @Override // com.github.mizosoft.methanol.internal.cache.CacheReadingPublisher.Listener
                public void onReadFailure(Throwable th) {
                    try {
                        Listener.this.onReadFailure(th);
                    } catch (Throwable th2) {
                        CacheReadingPublisher.logger.log(System.Logger.Level.WARNING, "exception thrown by Listener::onReadFailure", th2);
                    }
                }
            };
        }

        static Listener disabled() {
            return DisabledListener.INSTANCE;
        }
    }

    public CacheReadingPublisher(Store.Viewer viewer, Executor executor) {
        this(viewer, executor, DisabledListener.INSTANCE);
    }

    public CacheReadingPublisher(Store.Viewer viewer, Executor executor, Listener listener) {
        this.subscribed = new AtomicBoolean();
        this.viewer = (Store.Viewer) Objects.requireNonNull(viewer);
        this.executor = (Executor) Objects.requireNonNull(executor);
        this.listener = (Listener) Objects.requireNonNull(listener);
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
        Objects.requireNonNull(subscriber);
        if (this.subscribed.compareAndSet(false, true)) {
            new CacheReadingSubscription(subscriber, this.executor, this.viewer, this.listener).signal(true);
        } else {
            FlowSupport.refuse(subscriber, FlowSupport.multipleSubscribersToUnicast());
        }
    }
}
