package org.eclipse.ditto.services.utils.akka.streaming;

import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamRefs;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletionStage;
import org.eclipse.ditto.services.utils.akka.LogUtil;

/* loaded from: input_file:org/eclipse/ditto/services/utils/akka/streaming/AbstractStreamingActor.class */
public abstract class AbstractStreamingActor<C, E> extends AbstractActor {
    protected final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
    protected final ActorMaterializer materializer = ActorMaterializer.create(getContext());

    protected abstract Class<C> getCommandClass();

    protected abstract int getBurst(C c);

    protected abstract Duration getInitialTimeout(C c);

    protected abstract Duration getIdleTimeout(C c);

    protected abstract Source<E, NotUsed> createSource(C c);

    protected Object batchMessages(List<E> list) {
        return list.size() == 1 ? list.get(0) : list;
    }

    public final AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(getCommandClass(), this::startStreaming).matchAny(obj -> {
            this.log.warning("Unexpected message: <{}>", obj);
        }).build();
    }

    private void startStreaming(C c) {
        this.log.debug("Starting streaming due to command: {}", c);
        int burst = getBurst(c);
        Duration initialTimeout = getInitialTimeout(c);
        Patterns.pipe((CompletionStage) createSource(c).grouped(burst).map(this::batchMessages).initialTimeout(initialTimeout).idleTimeout(getIdleTimeout(c)).runWith(StreamRefs.sourceRef(), this.materializer), getContext().getDispatcher()).to(getSender());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1285317286:
                if (implMethodName.equals("batchMessages")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/streaming/AbstractStreamingActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;)Ljava/lang/Object;")) {
                    AbstractStreamingActor abstractStreamingActor = (AbstractStreamingActor) serializedLambda.getCapturedArg(0);
                    return abstractStreamingActor::batchMessages;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
