package org.eclipse.ditto.services.utils.akka.streaming;

import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.PFBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.PatternsCS;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.eclipse.ditto.services.utils.akka.LogUtil;

/* loaded from: input_file:org/eclipse/ditto/services/utils/akka/streaming/AbstractStreamingActor.class */
public abstract class AbstractStreamingActor<C, E> extends AbstractActor {
    protected final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
    protected final ActorMaterializer materializer = ActorMaterializer.create(getContext());

    protected abstract Class<C> getCommandClass();

    protected abstract Optional<Integer> getBurst(C c);

    protected abstract Optional<Long> getTimeoutMillis(C c);

    protected abstract Source<E, NotUsed> createSource(C c);

    protected abstract Object batchMessages(List<E> list);

    public final AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(getCommandClass(), this::startStreaming).matchAny(obj -> {
            this.log.warning("Unexpected message: <{}>", obj);
        }).build();
    }

    private void startStreaming(C c) {
        this.log.debug("Starting streaming due to command: {}", c);
        ActorRef sender = getSender();
        int intValue = getBurst(c).orElse(1).intValue();
        long longValue = getTimeoutMillis(c).orElse(60000L).longValue();
        createSource(c).grouped(intValue).map(this::batchMessages).prepend(Source.single(StreamConstants.STREAM_STARTED)).concat(Source.single(StreamConstants.STREAM_COMPLETED)).mapAsync(1, obj -> {
            if (!StreamConstants.STREAM_COMPLETED.equals(obj)) {
                return PatternsCS.ask(sender, obj, longValue);
            }
            sender.tell(obj, ActorRef.noSender());
            return CompletableFuture.completedFuture(obj);
        }).recoverWithRetries(1, new PFBuilder().matchAny(th -> {
            sender.tell(StreamConstants.STREAM_FAILED, ActorRef.noSender());
            return Source.single(StreamConstants.STREAM_FAILED);
        }).build()).log("future completed", this.log).runWith(Sink.ignore(), this.materializer);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1775257319:
                if (implMethodName.equals("lambda$startStreaming$8a2557b9$1")) {
                    z = false;
                    break;
                }
                break;
            case 1285317286:
                if (implMethodName.equals("batchMessages")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/streaming/AbstractStreamingActor") && serializedLambda.getImplMethodSignature().equals("(Lakka/actor/ActorRef;JLjava/lang/Object;)Ljava/util/concurrent/CompletionStage;")) {
                    ActorRef actorRef = (ActorRef) serializedLambda.getCapturedArg(0);
                    long longValue = ((Long) serializedLambda.getCapturedArg(1)).longValue();
                    return obj -> {
                        if (!StreamConstants.STREAM_COMPLETED.equals(obj)) {
                            return PatternsCS.ask(actorRef, obj, longValue);
                        }
                        actorRef.tell(obj, ActorRef.noSender());
                        return CompletableFuture.completedFuture(obj);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/streaming/AbstractStreamingActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;)Ljava/lang/Object;")) {
                    AbstractStreamingActor abstractStreamingActor = (AbstractStreamingActor) serializedLambda.getCapturedArg(0);
                    return abstractStreamingActor::batchMessages;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
