package io.vlingo.symbio.store;

import io.vlingo.actors.Stage;
import io.vlingo.reactivestreams.PublisherConfiguration;
import io.vlingo.reactivestreams.Sink;
import io.vlingo.reactivestreams.Source;
import io.vlingo.reactivestreams.Stream;
import io.vlingo.reactivestreams.StreamPublisher;
import io.vlingo.reactivestreams.StreamSubscriber;
import io.vlingo.reactivestreams.Streams;
import io.vlingo.symbio.Entry;
import io.vlingo.symbio.EntryAdapterProvider;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/vlingo/symbio/store/EntryReaderStream.class */
public class EntryReaderStream<T extends Entry<?>> implements Stream {
    private final EntryAdapterProvider entryAdapterProvider;
    private long flowElementsRate;
    private final EntryReader<T> entryReader;
    private Source<T> entryReaderSource;
    private Publisher<T> publisher;
    private final Stage stage;
    private EntryStreamSubscriber<T> subscriber;

    /* loaded from: input_file:io/vlingo/symbio/store/EntryReaderStream$EntryStreamSubscriber.class */
    public static class EntryStreamSubscriber<T> extends StreamSubscriber<T> {
        Subscription subscriptionHook;

        public EntryStreamSubscriber(Sink<T> sink, long j) {
            super(sink, j);
        }

        public void onComplete() {
            super.onComplete();
        }

        public void onSubscribe(Subscription subscription) {
            this.subscriptionHook = subscription;
            super.onSubscribe(subscription);
        }
    }

    public EntryReaderStream(Stage stage, EntryReader<T> entryReader, EntryAdapterProvider entryAdapterProvider) {
        this.stage = stage;
        this.entryReader = entryReader;
        this.entryAdapterProvider = entryAdapterProvider;
    }

    public void request(long j) {
        this.flowElementsRate = j;
        this.subscriber.subscriptionHook.request(this.flowElementsRate);
    }

    public <S> void flowInto(Sink<S> sink) {
        flowInto(sink, 100L, DefaultProbeInterval);
    }

    public <S> void flowInto(Sink<S> sink, long j) {
        flowInto(sink, j, DefaultProbeInterval);
    }

    public <S> void flowInto(Sink<S> sink, long j, int i) {
        this.flowElementsRate = j;
        PublisherConfiguration with = PublisherConfiguration.with(i, -1, 256, Streams.OverflowPolicy.DropCurrent);
        this.entryReaderSource = (Source) this.stage.actorFor(Source.class, EntryReaderSource.class, new Object[]{this.entryReader, this.entryAdapterProvider, Long.valueOf(j)});
        this.publisher = (Publisher) this.stage.actorFor(Publisher.class, StreamPublisher.class, new Object[]{this.entryReaderSource, with});
        this.publisher.subscribe((Subscriber) this.stage.actorFor(Subscriber.class, EntryStreamSubscriber.class, new Object[]{sink, Long.valueOf(j)}));
    }

    public void stop() {
        this.subscriber.subscriptionHook.cancel();
    }
}
