package org.eclipse.ditto.services.utils.search;

import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.thingsearch.SizeOption;
import org.eclipse.ditto.model.thingsearchparser.RqlOptionParser;
import org.eclipse.ditto.services.base.config.limits.DefaultLimitsConfig;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.signals.commands.thingsearch.ThingSearchCommand;
import org.eclipse.ditto.signals.commands.thingsearch.exceptions.InvalidOptionException;
import org.eclipse.ditto.signals.commands.thingsearch.exceptions.SubscriptionNotFoundException;
import org.eclipse.ditto.signals.commands.thingsearch.subscription.CancelSubscription;
import org.eclipse.ditto.signals.commands.thingsearch.subscription.CreateSubscription;
import org.eclipse.ditto.signals.commands.thingsearch.subscription.RequestFromSubscription;
import org.eclipse.ditto.signals.events.thingsearch.SubscriptionFailed;

/* loaded from: input_file:org/eclipse/ditto/services/utils/search/SubscriptionManager.class */
public final class SubscriptionManager extends AbstractActor {
    public static final String ACTOR_NAME = "subscriptionManager";
    private final Duration idleTimeout;
    private final ActorRef pubSubMediator;
    private final ActorRef proxyActor;
    private final ActorMaterializer materializer;
    private final int defaultPageSize;
    private final int maxPageSize;
    private int subscriptionIdCounter = 0;
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

    SubscriptionManager(Duration duration, ActorRef actorRef, ActorRef actorRef2, ActorMaterializer actorMaterializer) {
        this.idleTimeout = duration;
        this.pubSubMediator = actorRef;
        this.proxyActor = actorRef2;
        this.materializer = actorMaterializer;
        DefaultLimitsConfig of = DefaultLimitsConfig.of(DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()));
        this.defaultPageSize = of.getThingsSearchDefaultPageSize();
        this.maxPageSize = of.getThingsSearchMaxPageSize();
    }

    public static Props props(Duration duration, ActorRef actorRef, ActorRef actorRef2, ActorMaterializer actorMaterializer) {
        return Props.create(SubscriptionManager.class, new Object[]{duration, actorRef, actorRef2, actorMaterializer});
    }

    private static JsonArray asJsonArray(Collection<String> collection) {
        return (JsonArray) collection.stream().map(JsonValue::of).collect(JsonCollectors.valuesToArray());
    }

    private int getPageSize(@Nullable String str) {
        if (str == null) {
            return this.defaultPageSize;
        }
        int intValue = ((Integer) RqlOptionParser.parseOptions(str).stream().flatMap(option -> {
            return option instanceof SizeOption ? Stream.of(Integer.valueOf(((SizeOption) option).getSize())) : Stream.empty();
        }).findFirst().orElse(Integer.valueOf(this.defaultPageSize))).intValue();
        if (intValue <= 0 || intValue > this.maxPageSize) {
            throw InvalidOptionException.newBuilder().message("Invalid option: '" + str + "'").description("size(n) -- n must be between 1 and " + this.maxPageSize).build();
        }
        return intValue;
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(RequestFromSubscription.class, this::requestSubscription).match(CreateSubscription.class, this::createSubscription).match(CancelSubscription.class, this::cancelSubscription).build();
    }

    private void requestSubscription(RequestFromSubscription requestFromSubscription) {
        forwardToChild(requestFromSubscription.getSubscriptionId(), requestFromSubscription);
    }

    private void cancelSubscription(CancelSubscription cancelSubscription) {
        forwardToChild(cancelSubscription.getSubscriptionId(), cancelSubscription);
    }

    private void forwardToChild(String str, ThingSearchCommand<?> thingSearchCommand) {
        Optional findChild = getContext().findChild(str);
        if (findChild.isPresent()) {
            this.log.withCorrelationId(thingSearchCommand).debug("Forwarding to child: <{}>", thingSearchCommand);
            ((ActorRef) findChild.get()).tell(thingSearchCommand, getSender());
        } else {
            this.log.withCorrelationId(thingSearchCommand).info("SubscriptionID not found, responding with SubscriptionFailed: <{}>", thingSearchCommand);
            getSender().tell(SubscriptionFailed.of(str, SubscriptionNotFoundException.of(str, thingSearchCommand.getDittoHeaders()), thingSearchCommand.getDittoHeaders()), ActorRef.noSender());
        }
    }

    private void createSubscription(CreateSubscription createSubscription) {
        this.log.withCorrelationId(createSubscription).info("Processing <{}>", createSubscription);
        String nextSubscriptionId = nextSubscriptionId(createSubscription);
        connect(getContext().actorOf(SubscriptionActor.props(this.idleTimeout, getSender(), createSubscription.getDittoHeaders()), nextSubscriptionId), getPageSource(createSubscription));
    }

    private void connect(ActorRef actorRef, Source<JsonArray, NotUsed> source) {
        lazify(source).runWith(Sink.fromSubscriber(SubscriptionActor.asSubscriber(actorRef)), this.materializer);
    }

    private Source<JsonArray, NotUsed> getPageSource(CreateSubscription createSubscription) {
        String str = (String) createSubscription.getOptions().orElse(null);
        try {
            return SearchSource.newBuilder().pubSubMediator(this.pubSubMediator).conciergeForwarder(this.proxyActor).namespaces((JsonArray) createSubscription.getNamespaces().filter(set -> {
                return !set.isEmpty();
            }).map((v0) -> {
                return asJsonArray(v0);
            }).orElse(null)).filter((String) createSubscription.getFilter().orElse(null)).fields((JsonFieldSelector) createSubscription.getSelectedFields().orElse(null)).options(str).dittoHeaders(createSubscription.getDittoHeaders()).build().start(resumeSourceBuilder -> {
            }).grouped(getPageSize(str)).map((v0) -> {
                return JsonArray.of(v0);
            });
        } catch (DittoRuntimeException e) {
            return Source.failed(e);
        }
    }

    private String nextSubscriptionId(CreateSubscription createSubscription) {
        String str = (String) createSubscription.getPrefix().orElse("");
        int i = this.subscriptionIdCounter;
        this.subscriptionIdCounter = i + 1;
        return str + i;
    }

    private static <T> Source<T, ?> lazify(Source<T, ?> source) {
        return Source.lazily(() -> {
            return source;
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1538346547:
                if (implMethodName.equals("lambda$lazify$f5a4459f$1")) {
                    z = false;
                    break;
                }
                break;
            case 3543:
                if (implMethodName.equals("of")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/search/SubscriptionManager") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/Source;)Lakka/stream/javadsl/Source;")) {
                    Source source = (Source) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return source;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/json/JsonArray") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Lorg/eclipse/ditto/json/JsonArray;")) {
                    return (v0) -> {
                        return JsonArray.of(v0);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
