/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.hystrix.metric.consumer;

import com.netflix.hystrix.metric.CachedValuesHistogram;
import com.netflix.hystrix.metric.HystrixEvent;
import com.netflix.hystrix.metric.HystrixEventStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.HdrHistogram.Histogram;
import rx.Observable;
import rx.Subscription;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.BehaviorSubject;

public class RollingDistributionStream<Event extends HystrixEvent> {
    private AtomicReference<Subscription> rollingDistributionSubscription = new AtomicReference<Object>(null);
    private final BehaviorSubject<CachedValuesHistogram> rollingDistribution = BehaviorSubject.create(CachedValuesHistogram.backedBy(CachedValuesHistogram.getNewHistogram()));
    private final Observable<CachedValuesHistogram> rollingDistributionStream;
    private static final Func2<Histogram, Histogram, Histogram> distributionAggregator = new Func2<Histogram, Histogram, Histogram>(){

        @Override
        public Histogram call(Histogram initialDistribution, Histogram distributionToAdd) {
            initialDistribution.add(distributionToAdd);
            return initialDistribution;
        }
    };
    private static final Func1<Observable<Histogram>, Observable<Histogram>> reduceWindowToSingleDistribution = new Func1<Observable<Histogram>, Observable<Histogram>>(){

        @Override
        public Observable<Histogram> call(Observable<Histogram> window) {
            return window.reduce(distributionAggregator);
        }
    };
    private static final Func1<Histogram, CachedValuesHistogram> cacheHistogramValues = new Func1<Histogram, CachedValuesHistogram>(){

        @Override
        public CachedValuesHistogram call(Histogram histogram) {
            return CachedValuesHistogram.backedBy(histogram);
        }
    };
    private static final Func1<Observable<CachedValuesHistogram>, Observable<List<CachedValuesHistogram>>> convertToList = new Func1<Observable<CachedValuesHistogram>, Observable<List<CachedValuesHistogram>>>(){

        @Override
        public Observable<List<CachedValuesHistogram>> call(Observable<CachedValuesHistogram> windowOf2) {
            return windowOf2.toList();
        }
    };

    protected RollingDistributionStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs, final Func2<Histogram, Event, Histogram> addValuesToBucket) {
        ArrayList<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
        for (int i = 0; i < numBuckets; ++i) {
            emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
        }
        Func1 reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>(){

            @Override
            public Observable<Histogram> call(Observable<Event> bucket) {
                return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
            }
        };
        this.rollingDistributionStream = stream.observe().window((long)bucketSizeInMs, TimeUnit.MILLISECONDS).flatMap(reduceBucketToSingleDistribution).startWith(emptyDistributionsToStart).window(numBuckets, 1).flatMap(reduceWindowToSingleDistribution).map(cacheHistogramValues).share().onBackpressureDrop();
    }

    public Observable<CachedValuesHistogram> observe() {
        return this.rollingDistributionStream;
    }

    public int getLatestMean() {
        CachedValuesHistogram latest = this.getLatest();
        if (latest != null) {
            return latest.getMean();
        }
        return 0;
    }

    public int getLatestPercentile(double percentile) {
        CachedValuesHistogram latest = this.getLatest();
        if (latest != null) {
            return latest.getValueAtPercentile(percentile);
        }
        return 0;
    }

    public void startCachingStreamValuesIfUnstarted() {
        Subscription candidateSubscription;
        if (this.rollingDistributionSubscription.get() == null && !this.rollingDistributionSubscription.compareAndSet(null, candidateSubscription = this.observe().subscribe(this.rollingDistribution))) {
            candidateSubscription.unsubscribe();
        }
    }

    CachedValuesHistogram getLatest() {
        this.startCachingStreamValuesIfUnstarted();
        if (this.rollingDistribution.hasValue()) {
            return this.rollingDistribution.getValue();
        }
        return null;
    }

    public void unsubscribe() {
        Subscription s2 = this.rollingDistributionSubscription.get();
        if (s2 != null) {
            s2.unsubscribe();
            this.rollingDistributionSubscription.compareAndSet(s2, null);
        }
    }
}

