package io.vlingo.symbio.store;

import io.vlingo.actors.Actor;
import io.vlingo.common.Cancellable;
import io.vlingo.common.Completes;
import io.vlingo.common.Scheduled;
import io.vlingo.reactivestreams.Elements;
import io.vlingo.reactivestreams.Source;
import io.vlingo.reactivestreams.Stream;
import io.vlingo.symbio.BaseEntry;
import io.vlingo.symbio.Entry;
import io.vlingo.symbio.EntryAdapterProvider;
import io.vlingo.symbio.EntryBundle;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;

/* loaded from: input_file:io/vlingo/symbio/store/EntryReaderSource.class */
public class EntryReaderSource<T extends Entry<?>> extends Actor implements Source<T>, Scheduled<Object> {
    private final Deque<T> cache = new ArrayDeque();
    private final Cancellable cancellable = scheduler().schedule((Scheduled) selfAs(Scheduled.class), (Object) null, 0, Stream.FastProbeInterval);
    private final EntryReader<T> entryReader;
    private final long flowElementsRate;
    private boolean reading;
    private final EntryAdapterProvider entryAdapterProvider;

    public EntryReaderSource(EntryReader<T> entryReader, EntryAdapterProvider entryAdapterProvider, long j) {
        this.entryReader = entryReader;
        this.entryAdapterProvider = entryAdapterProvider;
        this.flowElementsRate = j;
    }

    public Completes<Elements<T>> next() {
        if (this.cache.isEmpty()) {
            return completes().with(Elements.empty());
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.flowElementsRate && !this.cache.isEmpty(); i++) {
            T poll = this.cache.poll();
            arrayList.add(new EntryBundle(poll, this.entryAdapterProvider.asSource(poll instanceof BaseEntry ? poll : poll.withId(poll.id()))));
        }
        return completes().with(Elements.of(arrayFrom(arrayList)));
    }

    public Completes<Elements<T>> next(int i) {
        return next();
    }

    public Completes<Elements<T>> next(long j) {
        return next();
    }

    public Completes<Elements<T>> next(long j, int i) {
        return next();
    }

    public Completes<Boolean> isSlow() {
        return completes().with(false);
    }

    public void intervalSignal(Scheduled<Object> scheduled, Object obj) {
        if (!this.cache.isEmpty() || this.reading) {
            return;
        }
        this.reading = true;
        this.entryReader.readNext(this.flowElementsRate > 2147483647L ? Integer.MAX_VALUE : (int) this.flowElementsRate).andThenConsume(list -> {
            this.cache.addAll(list);
            this.reading = false;
        });
    }

    public void stop() {
        this.cancellable.cancel();
        super.stop();
    }

    private EntryBundle[] arrayFrom(List<EntryBundle> list) {
        return (EntryBundle[]) list.toArray(new EntryBundle[list.size()]);
    }
}
