package org.eclipse.ditto.services.thingsearch.updater.actors;

import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.stream.Attributes;
import akka.stream.DelayOverflowStrategy;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.policies.PolicyId;
import org.eclipse.ditto.services.models.policies.PolicyReferenceTag;
import org.eclipse.ditto.services.models.policies.PolicyTag;
import org.eclipse.ditto.services.thingsearch.common.config.DittoSearchConfig;
import org.eclipse.ditto.services.thingsearch.persistence.write.ThingsSearchUpdaterPersistence;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.services.utils.namespaces.BlockNamespaceBehavior;
import org.eclipse.ditto.services.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.signals.events.policies.PolicyEvent;

/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/PolicyEventForwarder.class */
final class PolicyEventForwarder extends AbstractActor {
    private static final Duration ASK_SELF_TIMEOUT = Duration.ofSeconds(10);
    static final String ACTOR_NAME = "thingsSearchPolicyEventForwarder";
    private final ActorRef thingsUpdater;
    private final ThingsSearchUpdaterPersistence persistence;
    private final BlockNamespaceBehavior blockNamespaceBehavior;

    @Nullable
    private KillSwitch killSwitch;
    private final DiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private Map<PolicyId, Long> policyRevisions = new HashMap();
    private final Duration interval = DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())).getStreamConfig().getWriteInterval();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/PolicyEventForwarder$Control.class */
    public enum Control {
        DUMP_POLICY_REVISIONS,
        STREAM_COMPLETED
    }

    private PolicyEventForwarder(ActorRef actorRef, ActorRef actorRef2, BlockedNamespaces blockedNamespaces, ThingsSearchUpdaterPersistence thingsSearchUpdaterPersistence) {
        this.thingsUpdater = actorRef2;
        this.persistence = thingsSearchUpdaterPersistence;
        this.blockNamespaceBehavior = BlockNamespaceBehavior.of(blockedNamespaces);
        actorRef.tell(DistPubSubAccess.subscribeViaGroup("policies.events:", ACTOR_NAME, getSelf()), getSelf());
        restartPolicyReferenceTagStream();
    }

    public static Props props(ActorRef actorRef, ActorRef actorRef2, BlockedNamespaces blockedNamespaces, ThingsSearchUpdaterPersistence thingsSearchUpdaterPersistence) {
        return Props.create(PolicyEventForwarder.class, new Object[]{actorRef, actorRef2, blockedNamespaces, thingsSearchUpdaterPersistence});
    }

    public void postStop() throws Exception {
        terminateStream();
        super.postStop();
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(PolicyEvent.class, this::policyEvent).match(PolicyTag.class, this::updatePolicyRevision).match(PolicyReferenceTag.class, this::forwardToThingsUpdater).matchEquals(Control.DUMP_POLICY_REVISIONS, this::dumpPolicyRevisions).matchEquals(Control.STREAM_COMPLETED, (v1) -> {
            streamTerminated(v1);
        }).match(Status.Failure.class, (v1) -> {
            streamTerminated(v1);
        }).match(DistributedPubSubMediator.SubscribeAck.class, this::subscribeAck).build();
    }

    private void policyEvent(PolicyEvent policyEvent) {
        ActorRef self = getSelf();
        this.blockNamespaceBehavior.block(policyEvent).whenComplete((withDittoHeaders, th) -> {
            if (th == null) {
                self.tell(PolicyTag.of(policyEvent.getPolicyEntityId(), policyEvent.getRevision()), self);
            }
        });
    }

    private void updatePolicyRevision(PolicyTag policyTag) {
        this.policyRevisions.merge(policyTag.getEntityId(), Long.valueOf(policyTag.getRevision()), (v0, v1) -> {
            return Long.max(v0, v1);
        });
    }

    private void forwardToThingsUpdater(PolicyReferenceTag policyReferenceTag) {
        this.thingsUpdater.tell(policyReferenceTag, ActorRef.noSender());
    }

    private void dumpPolicyRevisions(Control control) {
        Map<PolicyId, Long> map = this.policyRevisions;
        this.policyRevisions = new HashMap();
        getSender().tell(map, getSelf());
    }

    private void streamTerminated(Object obj) {
        if (obj instanceof Status.Failure) {
            this.log.error(((Status.Failure) obj).cause(), "PolicyEventForwarder stream terminated (should NEVER happen!), restarting");
        } else {
            this.log.info("PolicyEventForwarder stream completed; restarting");
        }
        restartPolicyReferenceTagStream();
    }

    private void subscribeAck(DistributedPubSubMediator.SubscribeAck subscribeAck) {
        this.log.info("SubscribeAck: <{}>", subscribeAck);
    }

    private void restartPolicyReferenceTagStream() {
        terminateStream();
        ActorRef self = getSelf();
        this.killSwitch = (KillSwitch) Source.repeat(Control.DUMP_POLICY_REVISIONS).delay(this.interval, DelayOverflowStrategy.backpressure()).withAttributes(Attributes.inputBuffer(1, 1)).viaMat(KillSwitches.single(), Keep.right()).mapAsync(1, control -> {
            return Patterns.ask(self, control, ASK_SELF_TIMEOUT).exceptionally(Function.identity());
        }).flatMapConcat(this::mapDumpResult).to(Sink.actorRef(self, Control.STREAM_COMPLETED)).run(getContext().getSystem());
    }

    private void terminateStream() {
        if (this.killSwitch != null) {
            this.killSwitch.shutdown();
            this.killSwitch = null;
        }
    }

    private Source<PolicyReferenceTag, NotUsed> mapDumpResult(Object obj) {
        if (obj instanceof Map) {
            return this.persistence.getPolicyReferenceTags((Map) obj);
        }
        if (obj instanceof Throwable) {
            this.log.error((Throwable) obj, "dump failed");
        } else {
            this.log.warning("Unexpected dump result: <{}>", obj);
        }
        return Source.empty();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 876422573:
                if (implMethodName.equals("mapDumpResult")) {
                    z = false;
                    break;
                }
                break;
            case 999412271:
                if (implMethodName.equals("lambda$restartPolicyReferenceTagStream$b39f8e39$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/updater/actors/PolicyEventForwarder") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Lakka/stream/javadsl/Source;")) {
                    PolicyEventForwarder policyEventForwarder = (PolicyEventForwarder) serializedLambda.getCapturedArg(0);
                    return policyEventForwarder::mapDumpResult;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/updater/actors/PolicyEventForwarder") && serializedLambda.getImplMethodSignature().equals("(Lakka/actor/ActorRef;Lorg/eclipse/ditto/services/thingsearch/updater/actors/PolicyEventForwarder$Control;)Ljava/util/concurrent/CompletionStage;")) {
                    ActorRef actorRef = (ActorRef) serializedLambda.getCapturedArg(0);
                    return control -> {
                        return Patterns.ask(actorRef, control, ASK_SELF_TIMEOUT).exceptionally(Function.identity());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
