/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.hystrix.metric.consumer;

import com.netflix.hystrix.metric.HystrixEvent;
import com.netflix.hystrix.metric.HystrixEventStream;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.Subscription;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.BehaviorSubject;

public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> {
    protected final int numBuckets;
    protected final Observable<Bucket> bucketedStream;
    protected final AtomicReference<Subscription> subscription = new AtomicReference<Object>(null);
    private final Func1<Observable<Event>, Observable<Bucket>> reduceBucketToSummary;
    private final BehaviorSubject<Output> counterSubject = BehaviorSubject.create(this.getEmptyOutputValue());

    protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, int numBuckets, final int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
        this.numBuckets = numBuckets;
        this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>(){

            @Override
            public Observable<Bucket> call(Observable<Event> eventBucket) {
                return eventBucket.reduce(BucketedCounterStream.this.getEmptyBucketSummary(), appendRawEventToBucket);
            }
        };
        final ArrayList<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();
        for (int i = 0; i < numBuckets; ++i) {
            emptyEventCountsToStart.add(this.getEmptyBucketSummary());
        }
        this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>(){

            @Override
            public Observable<Bucket> call() {
                return inputEventStream.observe().window((long)bucketSizeInMs, TimeUnit.MILLISECONDS).flatMap(BucketedCounterStream.this.reduceBucketToSummary).startWith(emptyEventCountsToStart);
            }
        });
    }

    abstract Bucket getEmptyBucketSummary();

    abstract Output getEmptyOutputValue();

    public abstract Observable<Output> observe();

    public void startCachingStreamValuesIfUnstarted() {
        Subscription candidateSubscription;
        if (this.subscription.get() == null && !this.subscription.compareAndSet(null, candidateSubscription = this.observe().subscribe(this.counterSubject))) {
            candidateSubscription.unsubscribe();
        }
    }

    public Output getLatest() {
        this.startCachingStreamValuesIfUnstarted();
        if (this.counterSubject.hasValue()) {
            return this.counterSubject.getValue();
        }
        return this.getEmptyOutputValue();
    }

    public void unsubscribe() {
        Subscription s2 = this.subscription.get();
        if (s2 != null) {
            s2.unsubscribe();
            this.subscription.compareAndSet(s2, null);
        }
    }
}

