package org.eclipse.ditto.edge.service.streaming;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.stream.Materializer;
import akka.stream.SourceRef;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Optional;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.base.model.signals.FeatureToggle;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.exceptions.StreamingSubscriptionNotFoundException;
import org.eclipse.ditto.base.model.signals.commands.streaming.CancelStreamingSubscription;
import org.eclipse.ditto.base.model.signals.commands.streaming.RequestFromStreamingSubscription;
import org.eclipse.ditto.base.model.signals.commands.streaming.StreamingSubscriptionCommand;
import org.eclipse.ditto.base.model.signals.commands.streaming.SubscribeForPersistedEvents;
import org.eclipse.ditto.base.model.signals.events.streaming.StreamingSubscriptionFailed;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;

/* loaded from: input_file:org/eclipse/ditto/edge/service/streaming/StreamingSubscriptionManager.class */
public final class StreamingSubscriptionManager extends AbstractActor {
    public static final String ACTOR_NAME = "streamingSubscriptionManager";
    private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
    private static final Duration COMMAND_FORWARDER_LOCAL_ASK_TIMEOUT = Duration.ofSeconds(15);
    private final Duration idleTimeout;
    private final ActorSelection commandForwarder;
    private final Materializer materializer;
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private int subscriptionIdCounter = 0;

    private StreamingSubscriptionManager(Duration duration, ActorSelection actorSelection, Materializer materializer) {
        this.idleTimeout = duration;
        this.commandForwarder = actorSelection;
        this.materializer = materializer;
    }

    public static Props props(Duration duration, ActorSelection actorSelection, Materializer materializer) {
        return Props.create(StreamingSubscriptionManager.class, new Object[]{duration, actorSelection, materializer});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(RequestFromStreamingSubscription.class, this::requestSubscription).match(SubscribeForPersistedEvents.class, this::subscribeForPersistedEvents).match(CancelStreamingSubscription.class, this::cancelSubscription).build();
    }

    private void requestSubscription(RequestFromStreamingSubscription requestFromStreamingSubscription) {
        forwardToChild(requestFromStreamingSubscription.getSubscriptionId(), requestFromStreamingSubscription);
    }

    private void cancelSubscription(CancelStreamingSubscription cancelStreamingSubscription) {
        forwardToChild(cancelStreamingSubscription.getSubscriptionId(), cancelStreamingSubscription);
    }

    private void forwardToChild(String str, StreamingSubscriptionCommand<?> streamingSubscriptionCommand) {
        Optional findChild = getContext().findChild(str);
        if (findChild.isPresent()) {
            this.log.withCorrelationId(streamingSubscriptionCommand).debug("Forwarding to child: <{}>", streamingSubscriptionCommand);
            ((ActorRef) findChild.get()).tell(streamingSubscriptionCommand, getSender());
        } else {
            this.log.withCorrelationId(streamingSubscriptionCommand).info("StreamingSubscriptionID not found, responding with StreamingSubscriptionFailed: <{}>", streamingSubscriptionCommand);
            getSender().tell(StreamingSubscriptionFailed.of(str, streamingSubscriptionCommand.getEntityId(), StreamingSubscriptionNotFoundException.of(str, streamingSubscriptionCommand.getDittoHeaders()), streamingSubscriptionCommand.getDittoHeaders()), ActorRef.noSender());
        }
    }

    private void subscribeForPersistedEvents(SubscribeForPersistedEvents subscribeForPersistedEvents) {
        FeatureToggle.checkHistoricalApiAccessFeatureEnabled(subscribeForPersistedEvents.getType(), subscribeForPersistedEvents.getDittoHeaders());
        this.log.withCorrelationId(subscribeForPersistedEvents).info("Processing <{}>", subscribeForPersistedEvents);
        EntityId entityId = subscribeForPersistedEvents.getEntityId();
        String nextSubscriptionId = nextSubscriptionId(subscribeForPersistedEvents);
        connect(getContext().actorOf(StreamingSubscriptionActor.props(this.idleTimeout, entityId, getSender(), subscribeForPersistedEvents.getDittoHeaders()), nextSubscriptionId), getPersistedEventsSource(subscribeForPersistedEvents), entityId);
    }

    private void connect(ActorRef actorRef, Source<JsonValue, ?> source, EntityId entityId) {
        lazify(source).runWith(Sink.fromSubscriber(StreamingSubscriptionActor.asSubscriber(actorRef, entityId)), this.materializer);
    }

    private Source<JsonValue, ?> getPersistedEventsSource(SubscribeForPersistedEvents subscribeForPersistedEvents) {
        return Source.completionStageSource(Patterns.ask(this.commandForwarder, subscribeForPersistedEvents, (Duration) subscribeForPersistedEvents.getDittoHeaders().getTimeout().orElse(COMMAND_FORWARDER_LOCAL_ASK_TIMEOUT)).handle((obj, th) -> {
            return obj instanceof SourceRef ? ((SourceRef) obj).getSource().map(obj -> {
                if (obj instanceof Signal) {
                    return ProtocolFactory.wrapAsJsonifiableAdaptable(DITTO_PROTOCOL_ADAPTER.toAdaptable((Signal) obj)).toJson();
                }
                if (obj instanceof Jsonifiable) {
                    return ((Jsonifiable) obj).toJson();
                }
                if (obj instanceof JsonValue) {
                    return (JsonValue) obj;
                }
                throw new IllegalStateException("Unexpected element!");
            }) : obj instanceof DittoRuntimeException ? Source.failed((DittoRuntimeException) obj) : Source.failed(DittoRuntimeException.asDittoRuntimeException(th, th -> {
                return DittoInternalErrorException.newBuilder().dittoHeaders(subscribeForPersistedEvents.getDittoHeaders()).cause(th).build();
            }));
        }));
    }

    private String nextSubscriptionId(SubscribeForPersistedEvents subscribeForPersistedEvents) {
        String str = (String) subscribeForPersistedEvents.getPrefix().orElse("");
        int i = this.subscriptionIdCounter;
        this.subscriptionIdCounter = i + 1;
        return str + i;
    }

    private static <T> Source<T, ?> lazify(Source<T, ?> source) {
        return Source.lazySource(() -> {
            return source;
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1538346547:
                if (implMethodName.equals("lambda$lazify$f5a4459f$1")) {
                    z = true;
                    break;
                }
                break;
            case 190256741:
                if (implMethodName.equals("lambda$getPersistedEventsSource$1b5c44be$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/edge/service/streaming/StreamingSubscriptionManager") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Lorg/eclipse/ditto/json/JsonValue;")) {
                    return obj -> {
                        if (obj instanceof Signal) {
                            return ProtocolFactory.wrapAsJsonifiableAdaptable(DITTO_PROTOCOL_ADAPTER.toAdaptable((Signal) obj)).toJson();
                        }
                        if (obj instanceof Jsonifiable) {
                            return ((Jsonifiable) obj).toJson();
                        }
                        if (obj instanceof JsonValue) {
                            return (JsonValue) obj;
                        }
                        throw new IllegalStateException("Unexpected element!");
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/edge/service/streaming/StreamingSubscriptionManager") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/Source;)Lakka/stream/javadsl/Source;")) {
                    Source source = (Source) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return source;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
