package org.eclipse.ditto.services.thingsearch.updater.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.cluster.sharding.ShardRegion;
import akka.event.DiagnosticLoggingAdapter;
import akka.event.Logging;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.CircuitBreaker;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.json.FieldType;
import org.eclipse.ditto.model.base.json.Jsonifiable;
import org.eclipse.ditto.services.models.policies.PolicyReferenceTag;
import org.eclipse.ditto.services.models.things.ThingTag;
import org.eclipse.ditto.services.thingsearch.persistence.write.ThingsSearchUpdaterPersistence;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.ShardedMessageEnvelope;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.policies.PolicyEvent;
import org.eclipse.ditto.signals.events.things.ThingEvent;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/ThingsUpdater.class */
public final class ThingsUpdater extends AbstractActor {
    static final String ACTOR_NAME = "thingsUpdater";
    private static final String UPDATER_GROUP = "thingsUpdaterGroup";
    private final DiagnosticLoggingAdapter log;
    private final ActorRef shardRegion;
    private final ThingsSearchUpdaterPersistence searchUpdaterPersistence;
    private final Materializer materializer;

    private ThingsUpdater(int i, ShardRegionFactory shardRegionFactory, ThingsSearchUpdaterPersistence thingsSearchUpdaterPersistence, CircuitBreaker circuitBreaker, boolean z, Duration duration, int i2) {
        this.log = Logging.apply(this);
        ActorSystem system = context().system();
        ActorRef thingsShardRegion = shardRegionFactory.getThingsShardRegion(i);
        ActorRef policiesShardRegion = shardRegionFactory.getPoliciesShardRegion(i);
        ActorRef mediator = DistributedPubSub.get(system).mediator();
        this.shardRegion = shardRegionFactory.getSearchUpdaterShardRegion(i, ThingUpdater.props(thingsSearchUpdaterPersistence, circuitBreaker, thingsShardRegion, policiesShardRegion, duration, ThingUpdater.DEFAULT_THINGS_TIMEOUT, i2).withMailbox("akka.actor.custom-updater-mailbox"));
        this.searchUpdaterPersistence = thingsSearchUpdaterPersistence;
        this.materializer = ActorMaterializer.create(getContext());
        if (z) {
            mediator.tell(new DistributedPubSubMediator.Subscribe("things.events:", UPDATER_GROUP, self()), self());
            mediator.tell(new DistributedPubSubMediator.Subscribe("policies.events:", UPDATER_GROUP, self()), self());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(final int i, final ShardRegionFactory shardRegionFactory, final ThingsSearchUpdaterPersistence thingsSearchUpdaterPersistence, final CircuitBreaker circuitBreaker, final boolean z, final Duration duration, final int i2) {
        return Props.create(ThingsUpdater.class, new Creator<ThingsUpdater>() { // from class: org.eclipse.ditto.services.thingsearch.updater.actors.ThingsUpdater.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public ThingsUpdater m7create() {
                return new ThingsUpdater(i, shardRegionFactory, thingsSearchUpdaterPersistence, circuitBreaker, z, duration, i2);
            }
        });
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().matchEquals(ShardRegion.getShardRegionStateInstance(), shardRegion$GetShardRegionState$ -> {
            this.shardRegion.forward(shardRegion$GetShardRegionState$, getContext());
        }).match(ThingEvent.class, this::processThingEvent).match(PolicyEvent.class, this::processPolicyEvent).match(ThingTag.class, this::processThingTag).match(PolicyReferenceTag.class, this::processPolicyReferenceTag).match(DistributedPubSubMediator.SubscribeAck.class, this::subscribeAck).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private void processThingTag(ThingTag thingTag) {
        String asIdentifierString = thingTag.asIdentifierString();
        LogUtil.enhanceLogWithCorrelationId(this.log, "things-tags-sync-" + asIdentifierString);
        this.log.debug("Forwarding incoming ThingTag '{}'", asIdentifierString);
        forwardJsonifiableToShardRegion(thingTag, (v0) -> {
            return v0.getId();
        });
    }

    private void processPolicyReferenceTag(PolicyReferenceTag policyReferenceTag) {
        String asIdentifierString = policyReferenceTag.asIdentifierString();
        LogUtil.enhanceLogWithCorrelationId(this.log, "policies-tags-sync-" + asIdentifierString);
        this.log.debug("Forwarding PolicyReferenceTag '{}'", asIdentifierString);
        forwardJsonifiableToShardRegion(policyReferenceTag, policyReferenceTag2 -> {
            return policyReferenceTag.getEntityId();
        });
    }

    private void processThingEvent(ThingEvent<?> thingEvent) {
        LogUtil.enhanceLogWithCorrelationId(this.log, thingEvent);
        this.log.debug("Forwarding incoming ThingEvent for thingId '{}'", thingEvent.getThingId());
        forwardEventToShardRegion(thingEvent, (v0) -> {
            return v0.getId();
        });
    }

    private void processPolicyEvent(PolicyEvent<?> policyEvent) {
        LogUtil.enhanceLogWithCorrelationId(this.log, policyEvent);
        thingIdsForPolicy(policyEvent.getPolicyId()).thenAccept(set -> {
            set.forEach(str -> {
                forwardPolicyEventToShardRegion(policyEvent, str);
            });
        });
    }

    private CompletionStage<Set<String>> thingIdsForPolicy(String str) {
        return (CompletionStage) this.searchUpdaterPersistence.getThingIdsForPolicy(str).runWith(Sink.last(), this.materializer);
    }

    private <J extends Jsonifiable<?>> void forwardJsonifiableToShardRegion(J j, Function<J, String> function) {
        forwardToShardRegion(j, function, jsonifiable -> {
            return jsonifiable.getClass().getSimpleName();
        }, jsonifiable2 -> {
            return jsonifiable2.toJson().asObject();
        }, jsonifiable3 -> {
            return DittoHeaders.empty();
        });
    }

    private <E extends Event<?>> void forwardEventToShardRegion(E e, Function<E, String> function) {
        forwardToShardRegion(e, function, (v0) -> {
            return v0.getType();
        }, event -> {
            return event.toJson(event.getImplementedSchemaVersion(), FieldType.regularOrSpecial());
        }, (v0) -> {
            return v0.getDittoHeaders();
        });
    }

    private <M> void forwardToShardRegion(M m, Function<M, String> function, Function<M, String> function2, Function<M, JsonObject> function3, Function<M, DittoHeaders> function4) {
        String apply = function.apply(m);
        this.log.debug("Forwarding incoming {} to shard region of {}", m.getClass().getSimpleName(), apply);
        this.shardRegion.forward(ShardedMessageEnvelope.of(apply, function2.apply(m), function3.apply(m), function4.apply(m)), context());
    }

    private void forwardPolicyEventToShardRegion(PolicyEvent<?> policyEvent, String str) {
        this.log.debug("Will forward incoming event message for policyId '{}' to update actor '{}':", policyEvent.getPolicyId(), str);
        forwardEventToShardRegion(policyEvent, policyEvent2 -> {
            return str;
        });
    }

    private void subscribeAck(DistributedPubSubMediator.SubscribeAck subscribeAck) {
        this.log.debug("Successfully subscribed to distributed pub/sub on topic '{}'", subscribeAck.subscribe().topic());
    }
}
