package org.eclipse.ditto.services.utils.akka.streaming;

import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.services.models.streaming.SudoStreamModifiedEntities;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/eclipse/ditto/services/utils/akka/streaming/DefaultStreamSupervisor.class */
public final class DefaultStreamSupervisor<E> extends AbstractActor {
    public static final String STREAM_FORWARDER_ACTOR_NAME = "streamForwarder";
    protected final DiagnosticLoggingAdapter log;
    private final SupervisorStrategy supervisorStrategy;
    private Cancellable activityCheck;
    private final ActorRef forwardTo;
    private final ActorRef provider;
    private final Class<E> elementClass;
    private final Function<E, Source<Object, NotUsed>> mapEntityFunction;
    private final Function<SudoStreamModifiedEntities, ?> streamTriggerMessageMapper;
    private final StreamMetadataPersistence streamMetadataPersistence;
    private final Materializer materializer;
    private final StreamConsumerSettings streamConsumerSettings;

    @Nullable
    private ActorRef forwarder;

    @Nullable
    private StreamTrigger nextStream;

    @Nullable
    private StreamTrigger activeStream;

    @Nullable
    private Boolean activeStreamSuccess;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/utils/akka/streaming/DefaultStreamSupervisor$TryToStartStream.class */
    public enum TryToStartStream {
        INSTANCE
    }

    private DefaultStreamSupervisor(ActorRef actorRef, ActorRef actorRef2, Class<E> cls, Function<E, Source<Object, NotUsed>> function, Function<SudoStreamModifiedEntities, ?> function2, StreamMetadataPersistence streamMetadataPersistence, Materializer materializer, StreamConsumerSettings streamConsumerSettings) {
        this.log = LogUtil.obtain(this);
        this.supervisorStrategy = new OneForOneStrategy(true, DeciderBuilder.matchAny(th -> {
            return SupervisorStrategy.stop();
        }).build());
        this.forwardTo = (ActorRef) Objects.requireNonNull(actorRef);
        this.provider = (ActorRef) Objects.requireNonNull(actorRef2);
        this.elementClass = (Class) Objects.requireNonNull(cls);
        this.mapEntityFunction = (Function) Objects.requireNonNull(function);
        this.streamTriggerMessageMapper = (Function) Objects.requireNonNull(function2);
        this.streamMetadataPersistence = (StreamMetadataPersistence) Objects.requireNonNull(streamMetadataPersistence);
        this.materializer = (Materializer) Objects.requireNonNull(materializer);
        this.streamConsumerSettings = (StreamConsumerSettings) Objects.requireNonNull(streamConsumerSettings);
    }

    public static <E> Props props(final ActorRef actorRef, final ActorRef actorRef2, final Class<E> cls, final Function<E, Source<Object, NotUsed>> function, final Function<SudoStreamModifiedEntities, ?> function2, final StreamMetadataPersistence streamMetadataPersistence, final Materializer materializer, final StreamConsumerSettings streamConsumerSettings) {
        return Props.create(DefaultStreamSupervisor.class, new Creator<DefaultStreamSupervisor>() { // from class: org.eclipse.ditto.services.utils.akka.streaming.DefaultStreamSupervisor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public DefaultStreamSupervisor m5create() throws Exception {
                return new DefaultStreamSupervisor(actorRef, actorRef2, cls, function, function2, streamMetadataPersistence, materializer, streamConsumerSettings);
            }
        });
    }

    private Props getStreamForwarderProps() {
        return DefaultStreamForwarder.props(this.forwardTo, getSelf(), this.streamConsumerSettings.getMaxIdleTime(), this.elementClass, this.mapEntityFunction);
    }

    private Object newStartStreamingCommand(StreamTrigger streamTrigger) {
        return this.streamTriggerMessageMapper.apply(SudoStreamModifiedEntities.of(streamTrigger.getQueryStart(), streamTrigger.getQueryEnd(), Integer.valueOf(this.streamConsumerSettings.getElementsStreamedPerBatch()), Long.valueOf(this.streamConsumerSettings.getStreamingActorTimeout().toMillis()), DittoHeaders.empty()));
    }

    public SupervisorStrategy supervisorStrategy() {
        return this.supervisorStrategy;
    }

    public void preStart() throws Exception {
        super.preStart();
        scheduleStream(computeNextStreamTrigger(null));
    }

    public void postStop() throws Exception {
        if (null != this.activityCheck) {
            this.activityCheck.cancel();
        }
        super.postStop();
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().matchEquals(TryToStartStream.INSTANCE, tryToStartStream -> {
            this.log.debug("Switching to supervisingBehaviour has been triggered by message: {}", tryToStartStream);
            becomeSupervising();
            tryToStartStream();
        }).match(Terminated.class, this::terminated).build();
    }

    private void becomeSupervising() {
        this.log.debug("becoming supervising...");
        getContext().become(createSupervisingBehavior());
    }

    private AbstractActor.Receive createSupervisingBehavior() {
        return ReceiveBuilder.create().matchEquals(StreamConstants.STREAM_COMPLETED, obj -> {
            streamCompleted();
        }).matchEquals(StreamConstants.STREAM_FAILED, obj2 -> {
            streamFailed();
        }).matchEquals(StreamConstants.FORWARDER_EXCEEDED_MAX_IDLE_TIME_MSG, obj3 -> {
            streamTimedOut();
        }).matchEquals(TryToStartStream.INSTANCE, tryToStartStream -> {
            tryToStartStream();
        }).match(Terminated.class, this::terminated).build();
    }

    private void streamCompleted() {
        this.log.debug("Stream completed.");
        this.activeStreamSuccess = true;
    }

    private void streamTimedOut() {
        this.log.debug("Stream timed out.");
        this.activeStreamSuccess = false;
    }

    private void streamFailed() {
        this.log.debug("Stream failed");
        this.activeStreamSuccess = false;
    }

    private void scheduleNextStream() {
        if (this.activeStream == null) {
            this.log.error("Cannot schedule next stream, because active stream is unknown.");
            return;
        }
        if (this.activeStreamSuccess == null) {
            this.log.warning("Cannot schedule next stream, because success of active stream is unknown.");
        } else {
            if (!this.activeStreamSuccess.booleanValue()) {
                rescheduleActiveStream();
                return;
            }
            Instant queryEnd = this.activeStream.getQueryEnd();
            ((CompletionStage) this.streamMetadataPersistence.updateLastSuccessfulStreamEnd(queryEnd).runWith(Sink.last(), this.materializer)).thenRun(() -> {
                this.log.info("Updated last sync timestamp to value: <{}>.", queryEnd);
            }).exceptionally(th -> {
                this.log.error(th, "Failed to update last sync timestamp to value: <{}>.", queryEnd);
                return null;
            });
            scheduleStream(computeNextStreamTrigger(queryEnd));
        }
    }

    private StreamTrigger computeNextStreamTrigger(@Nullable Instant instant) {
        Instant orElse;
        Instant now = Instant.now();
        if (instant != null) {
            orElse = instant;
        } else {
            orElse = this.streamMetadataPersistence.retrieveLastSuccessfulStreamEnd().orElse(now.minus((TemporalAmount) this.streamConsumerSettings.getInitialStartOffset()));
        }
        Duration between = Duration.between(orElse, now);
        Duration outdatedWarningOffset = this.streamConsumerSettings.getOutdatedWarningOffset();
        if (!between.isNegative() && between.compareTo(outdatedWarningOffset) > 0) {
            this.log.warning("The next Query-Start <{}> is older than the configured warn-offset <{}>. Please verify that this does not happen frequently, otherwise won't get \"up-to-date\" anymore.", orElse, outdatedWarningOffset);
        }
        return StreamTrigger.calculateStreamTrigger(now, orElse, this.streamConsumerSettings.getStartOffset(), this.streamConsumerSettings.getStreamInterval());
    }

    private void scheduleStream(StreamTrigger streamTrigger) {
        this.nextStream = streamTrigger;
        scheduleStream(streamTrigger.getPlannedStreamStart());
    }

    private void scheduleStream(Instant instant) {
        Instant now = Instant.now();
        scheduleStream(instant.isBefore(now) ? Duration.ZERO : Duration.between(now, instant));
    }

    private void scheduleStream(Duration duration) {
        this.log.info("Schedule Stream in: {}", duration);
        if (this.activityCheck != null) {
            this.activityCheck.cancel();
        }
        this.activityCheck = getContext().system().scheduler().scheduleOnce(FiniteDuration.create(duration.getSeconds(), TimeUnit.SECONDS), getSelf(), TryToStartStream.INSTANCE, getContext().dispatcher(), ActorRef.noSender());
    }

    private void tryToStartStream() {
        if (this.forwarder != null) {
            this.log.warning("Forwarder is still running: {}. Re-scheduling current stream.", this.forwarder);
            rescheduleActiveStream();
            return;
        }
        Object newStartStreamingCommand = newStartStreamingCommand(this.nextStream);
        this.forwarder = createOrGetForwarder();
        this.log.info("Requesting stream from <{}> on behalf of <{}> by <{}>", this.provider, this.forwarder, newStartStreamingCommand);
        this.provider.tell(newStartStreamingCommand, this.forwarder);
        this.activeStream = this.nextStream;
        this.activeStreamSuccess = null;
    }

    private void rescheduleActiveStream() {
        Instant plus = Instant.now().plus((TemporalAmount) this.streamConsumerSettings.getStreamInterval());
        this.log.warning("Re-scheduling at {}", plus);
        if (this.activeStream == null) {
            this.log.error("Cannot re-schedule stream, because metadata of active stream is unknown.");
        } else {
            scheduleStream(this.activeStream.rescheduleAt(plus));
        }
    }

    private ActorRef createOrGetForwarder() {
        ActorRef actorOf;
        Option child = getContext().child(STREAM_FORWARDER_ACTOR_NAME);
        if (child.isDefined()) {
            actorOf = (ActorRef) child.get();
        } else {
            actorOf = getContext().actorOf(getStreamForwarderProps(), STREAM_FORWARDER_ACTOR_NAME);
            this.log.debug("Watching forwarder: {}", actorOf);
            getContext().watch(actorOf);
        }
        return actorOf;
    }

    private void terminated(Terminated terminated) {
        ActorRef actor = terminated.getActor();
        this.log.debug("Received Terminated-Message: {}", terminated);
        if (!Objects.equals(actor, this.forwarder)) {
            this.log.warning("Received Terminated-Message from actor <{}> which does not match current forwarder <{}>", actor, this.forwarder);
            return;
        }
        getContext().unwatch(actor);
        this.forwarder = null;
        scheduleNextStream();
    }
}
