package io.mantisrx.connector.kafka.source.checkpoint.trigger;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;

/* loaded from: input_file:io/mantisrx/connector/kafka/source/checkpoint/trigger/CountingCheckpointTrigger.class */
public class CountingCheckpointTrigger implements CheckpointTrigger {
    private final int threshold;
    private final Subscription checkpointOffsetsTimer;
    private final AtomicBoolean checkpoint = new AtomicBoolean(false);
    private final AtomicInteger counter = new AtomicInteger(0);
    private final AtomicBoolean isActive = new AtomicBoolean(true);

    public CountingCheckpointTrigger(int i, int i2) {
        this.threshold = i;
        this.checkpointOffsetsTimer = Observable.interval(i2, TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() { // from class: io.mantisrx.connector.kafka.source.checkpoint.trigger.CountingCheckpointTrigger.1
            public void call(Long l) {
                CountingCheckpointTrigger.this.checkpoint.set(true);
            }
        });
    }

    @Override // io.mantisrx.connector.kafka.source.checkpoint.trigger.CheckpointTrigger
    public boolean shouldCheckpoint() {
        return this.counter.get() > this.threshold || this.checkpoint.get();
    }

    @Override // io.mantisrx.connector.kafka.source.checkpoint.trigger.CheckpointTrigger
    public void update(int i) {
        this.counter.addAndGet(i);
    }

    @Override // io.mantisrx.connector.kafka.source.checkpoint.trigger.CheckpointTrigger
    public void reset() {
        this.counter.set(0);
        this.checkpoint.set(false);
    }

    @Override // io.mantisrx.connector.kafka.source.checkpoint.trigger.CheckpointTrigger
    public boolean isActive() {
        return this.isActive.get();
    }

    @Override // io.mantisrx.connector.kafka.source.checkpoint.trigger.CheckpointTrigger
    public void shutdown() {
        if (isActive()) {
            this.checkpointOffsetsTimer.unsubscribe();
            reset();
            this.isActive.compareAndSet(true, false);
        }
    }
}
