package org.eclipse.ditto.services.utils.akka.streaming;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.PatternsCS;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.ditto.services.models.streaming.BatchedEntityIdWithRevisions;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.akka.streaming.StreamAck;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/eclipse/ditto/services/utils/akka/streaming/AbstractStreamForwarder.class */
public abstract class AbstractStreamForwarder<E> extends AbstractActor {
    protected final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
    protected final ActorMaterializer materializer = ActorMaterializer.create(getContext());
    private Instant lastMessageReceived = Instant.now();
    private Cancellable activityCheck;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/utils/akka/streaming/AbstractStreamForwarder$CheckForActivity.class */
    public enum CheckForActivity {
        INSTANCE
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStreamForwarder() {
        this.log.info("Creating new StreamForwarder");
    }

    protected abstract Duration getMaxIdleTime();

    protected abstract Class<E> getElementClass();

    protected abstract ActorRef getRecipient();

    protected abstract ActorRef getCompletionRecipient();

    protected abstract Source<?, ?> mapEntity(E e);

    private AbstractActor.Receive initialBehavior() {
        return ReceiveBuilder.create().matchEquals(StreamConstants.STREAM_STARTED, obj -> {
            getSender().tell(StreamConstants.STREAM_ACK_MSG, getSelf());
            updateLastMessageReceived();
            getContext().become(iteratingBehavior());
        }).matchEquals(StreamConstants.STREAM_FAILED, this::streamFailed).matchEquals(CheckForActivity.INSTANCE, this::checkForActivity).build();
    }

    private AbstractActor.Receive iteratingBehavior() {
        return ReceiveBuilder.create().matchEquals(StreamConstants.STREAM_COMPLETED, this::streamCompleted).matchEquals(StreamConstants.STREAM_FAILED, this::streamFailed).matchEquals(CheckForActivity.INSTANCE, this::checkForActivity).match(BatchedEntityIdWithRevisions.class, this::transitionToForwardingLoop).build();
    }

    private AbstractActor.Receive hasNextBehavior(ActorRef actorRef) {
        return ReceiveBuilder.create().matchEquals(StreamConstants.STREAM_ACK_MSG, obj -> {
            updateLastMessageReceived();
        }).matchEquals(StreamConstants.DOES_NOT_HAVE_NEXT_MSG, obj2 -> {
            updateLastMessageReceived();
            getContext().become(iteratingBehavior());
            this.log.debug("sending ack {} to streaming actor {}", StreamConstants.STREAM_ACK_MSG, actorRef);
            actorRef.tell(StreamConstants.STREAM_ACK_MSG, getSelf());
        }).matchEquals(StreamConstants.STREAM_FAILED, this::streamFailed).match(CheckForActivity.class, this::checkForActivity).build();
    }

    private void transitionToForwardingLoop(BatchedEntityIdWithRevisions<?> batchedEntityIdWithRevisions) {
        this.log.debug("got element: {}", batchedEntityIdWithRevisions);
        ActorRef self = getSelf();
        ActorRef recipient = getRecipient();
        long millis = getMaxIdleTime().toMillis();
        List<E> typecheck = typecheck(batchedEntityIdWithRevisions.getElements());
        getContext().become(hasNextBehavior(getSender()));
        typecheck.getClass();
        Source.fromIterator(typecheck::iterator).flatMapConcat(this::mapEntity).concat(Source.single(StreamConstants.DOES_NOT_HAVE_NEXT_MSG)).mapAsync(1, obj -> {
            if (!StreamConstants.DOES_NOT_HAVE_NEXT_MSG.equals(obj)) {
                return PatternsCS.ask(recipient, obj, millis).thenApply(obj -> {
                    if (isSuccessAck(obj)) {
                        this.log.debug("got ack: {}", obj);
                    } else {
                        this.log.error("got failure ack: {}", obj);
                    }
                    self.tell(StreamConstants.STREAM_ACK_MSG, ActorRef.noSender());
                    return obj;
                });
            }
            self.tell(obj, self);
            return CompletableFuture.completedFuture(obj);
        }).runWith(Sink.ignore(), this.materializer);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private List<E> typecheck(List<?> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (Object obj : list) {
            if (!getElementClass().isInstance(obj)) {
                streamFailed(String.format("Element type mismatch. Expected <%s>, actual <%s %s>", getElementClass().toString(), obj.getClass().toString(), obj.toString()));
                return Collections.emptyList();
            }
            arrayList.add(obj);
        }
        return arrayList;
    }

    private static boolean isSuccessAck(Object obj) {
        return (obj instanceof StreamAck) && StreamAck.Status.SUCCESS == ((StreamAck) obj).getStatus();
    }

    public void preStart() throws Exception {
        super.preStart();
        FiniteDuration create = FiniteDuration.create(getMaxIdleTime().getSeconds(), TimeUnit.SECONDS);
        this.activityCheck = getContext().system().scheduler().schedule(create, create, getSelf(), CheckForActivity.INSTANCE, getContext().dispatcher(), ActorRef.noSender());
    }

    public void postStop() throws Exception {
        if (null != this.activityCheck) {
            this.activityCheck.cancel();
        }
        super.postStop();
    }

    public AbstractActor.Receive createReceive() {
        return initialBehavior();
    }

    private void checkForActivity(CheckForActivity checkForActivity) {
        if (Duration.between(this.lastMessageReceived, Instant.now()).compareTo(getMaxIdleTime()) <= 0) {
            this.log.debug("Stream is still considered as active");
            return;
        }
        this.log.error("Stream timed out");
        getCompletionRecipient().tell(StreamConstants.FORWARDER_EXCEEDED_MAX_IDLE_TIME_MSG, getSelf());
        shutdown();
    }

    private void streamCompleted(Object obj) {
        this.log.info("Stream successfully finished.");
        getCompletionRecipient().tell(obj, getSelf());
        getContext().stop(getSelf());
    }

    private void streamFailed(Object obj) {
        getCompletionRecipient().forward(obj, getContext());
        shutdown();
    }

    private void updateLastMessageReceived() {
        this.lastMessageReceived = Instant.now();
        this.log.debug("Updated last message");
    }

    private void shutdown() {
        getContext().stop(getSelf());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 139728271:
                if (implMethodName.equals("lambda$transitionToForwardingLoop$b52813a2$1")) {
                    z = true;
                    break;
                }
                break;
            case 506593375:
                if (implMethodName.equals("mapEntity")) {
                    z = 2;
                    break;
                }
                break;
            case 1182533742:
                if (implMethodName.equals("iterator")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/List") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/Iterator;")) {
                    List list = (List) serializedLambda.getCapturedArg(0);
                    return list::iterator;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/streaming/AbstractStreamForwarder") && serializedLambda.getImplMethodSignature().equals("(Lakka/actor/ActorRef;Lakka/actor/ActorRef;JLjava/lang/Object;)Ljava/util/concurrent/CompletionStage;")) {
                    AbstractStreamForwarder abstractStreamForwarder = (AbstractStreamForwarder) serializedLambda.getCapturedArg(0);
                    ActorRef actorRef = (ActorRef) serializedLambda.getCapturedArg(1);
                    ActorRef actorRef2 = (ActorRef) serializedLambda.getCapturedArg(2);
                    long longValue = ((Long) serializedLambda.getCapturedArg(3)).longValue();
                    return obj -> {
                        if (!StreamConstants.DOES_NOT_HAVE_NEXT_MSG.equals(obj)) {
                            return PatternsCS.ask(actorRef2, obj, longValue).thenApply(obj -> {
                                if (isSuccessAck(obj)) {
                                    this.log.debug("got ack: {}", obj);
                                } else {
                                    this.log.error("got failure ack: {}", obj);
                                }
                                actorRef.tell(StreamConstants.STREAM_ACK_MSG, ActorRef.noSender());
                                return obj;
                            });
                        }
                        actorRef.tell(obj, actorRef);
                        return CompletableFuture.completedFuture(obj);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/streaming/AbstractStreamForwarder") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Lakka/stream/javadsl/Source;")) {
                    AbstractStreamForwarder abstractStreamForwarder2 = (AbstractStreamForwarder) serializedLambda.getCapturedArg(0);
                    return abstractStreamForwarder2::mapEntity;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
