package org.eclipse.ditto.services.thingsearch.updater.actors;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.cluster.sharding.ShardRegion;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.json.FieldType;
import org.eclipse.ditto.model.base.json.Jsonifiable;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.services.models.policies.PolicyReferenceTag;
import org.eclipse.ditto.services.models.streaming.IdentifiableStreamingMessage;
import org.eclipse.ditto.services.models.things.ThingTag;
import org.eclipse.ditto.services.models.thingsearch.commands.sudo.UpdateThing;
import org.eclipse.ditto.services.thingsearch.common.config.UpdaterConfig;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.akka.streaming.StreamAck;
import org.eclipse.ditto.services.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.services.utils.cluster.RetrieveStatisticsDetailsResponseSupplier;
import org.eclipse.ditto.services.utils.namespaces.BlockNamespaceBehavior;
import org.eclipse.ditto.services.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.services.utils.pubsub.DistributedSub;
import org.eclipse.ditto.signals.base.ShardedMessageEnvelope;
import org.eclipse.ditto.signals.commands.devops.RetrieveStatisticsDetails;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.things.ThingEvent;
import org.eclipse.ditto.signals.events.thingsearch.ThingsOutOfSync;

/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/ThingsUpdater.class */
final class ThingsUpdater extends AbstractActorWithTimers {
    static final String ACTOR_NAME = "thingsUpdater";
    private final ActorRef shardRegion;
    private final BlockNamespaceBehavior namespaceBlockingBehavior;
    private final RetrieveStatisticsDetailsResponseSupplier retrieveStatisticsDetailsResponseSupplier;
    private final DistributedSub thingEventSub;
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private Set<String> previousShardIds = Collections.emptySet();

    /* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/ThingsUpdater$Clock.class */
    private enum Clock {
        REBALANCE_TICK
    }

    private ThingsUpdater(DistributedSub distributedSub, ActorRef actorRef, UpdaterConfig updaterConfig, BlockedNamespaces blockedNamespaces, ActorRef actorRef2) {
        this.thingEventSub = distributedSub;
        this.shardRegion = actorRef;
        this.namespaceBlockingBehavior = BlockNamespaceBehavior.of(blockedNamespaces);
        this.retrieveStatisticsDetailsResponseSupplier = RetrieveStatisticsDetailsResponseSupplier.of(this.shardRegion, "search-updater", this.log);
        if (updaterConfig.isEventProcessingActive()) {
            getTimers().startPeriodicTimer(Clock.REBALANCE_TICK, Clock.REBALANCE_TICK, updaterConfig.getShardingStatePollInterval());
            getSelf().tell(Clock.REBALANCE_TICK, getSelf());
        }
        actorRef2.tell(DistPubSubAccess.subscribeViaGroup("thing-search.events:updateThings", ACTOR_NAME, getSelf()), getSelf());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(DistributedSub distributedSub, ActorRef actorRef, UpdaterConfig updaterConfig, BlockedNamespaces blockedNamespaces, ActorRef actorRef2) {
        return Props.create(ThingsUpdater.class, new Object[]{distributedSub, actorRef, updaterConfig, blockedNamespaces, actorRef2});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(ThingEvent.class, this::processThingEvent).match(ThingTag.class, this::processThingTag).match(PolicyReferenceTag.class, this::processPolicyReferenceTag).matchEquals(ShardRegion.getShardRegionStateInstance(), shardRegion$GetShardRegionState$ -> {
            this.shardRegion.forward(shardRegion$GetShardRegionState$, getContext());
        }).match(RetrieveStatisticsDetails.class, this::handleRetrieveStatisticsDetails).matchEquals(Clock.REBALANCE_TICK, this::retrieveShardIds).match(ShardRegion.ShardRegionStats.class, this::updateSubscriptions).match(ThingsOutOfSync.class, this::updateThings).match(UpdateThing.class, this::updateThing).match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck -> {
            this.log.debug("Got <{}>", subscribeAck);
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private void retrieveShardIds(Clock clock) {
        this.shardRegion.tell(ShardRegion.getRegionStatsInstance(), getSelf());
    }

    private void updateSubscriptions(ShardRegion.ShardRegionStats shardRegionStats) {
        Set<String> keySet = shardRegionStats.getStats().keySet();
        this.log.debug("Updating event subscriptions: <{}> -> <{}>", this.previousShardIds, keySet);
        List list = (List) keySet.stream().filter(str -> {
            return !this.previousShardIds.contains(str);
        }).collect(Collectors.toList());
        List list2 = (List) this.previousShardIds.stream().filter(str2 -> {
            return !keySet.contains(str2);
        }).collect(Collectors.toList());
        this.thingEventSub.subscribeWithoutAck(list, getSelf());
        this.thingEventSub.unsubscribeWithoutAck(list2, getSelf());
        this.previousShardIds = keySet;
    }

    private void handleRetrieveStatisticsDetails(RetrieveStatisticsDetails retrieveStatisticsDetails) {
        this.log.info("Sending the namespace stats of the search-updater shard as requested..");
        Patterns.pipe(this.retrieveStatisticsDetailsResponseSupplier.apply(retrieveStatisticsDetails.getDittoHeaders()), getContext().dispatcher()).to(getSender());
    }

    private void processThingTag(ThingTag thingTag) {
        String asIdentifierString = thingTag.asIdentifierString();
        this.log.withCorrelationId("things-tags-sync-" + asIdentifierString).debug("Forwarding incoming ThingTag '{}'", asIdentifierString);
        forwardJsonifiableToShardRegion(thingTag, (v0) -> {
            return v0.getEntityId();
        });
    }

    private void updateThings(ThingsOutOfSync thingsOutOfSync) {
        this.log.withCorrelationId(thingsOutOfSync).warning("Out-of-sync things are reported: <{}>", thingsOutOfSync);
        thingsOutOfSync.getThingIds().forEach(namespacedEntityId -> {
            forwardToShardRegion(UpdateThing.of(ThingId.of(namespacedEntityId), thingsOutOfSync.getDittoHeaders()), (v0) -> {
                return v0.getEntityId();
            }, (v0) -> {
                return v0.getType();
            }, (v0) -> {
                return v0.toJson();
            }, (v0) -> {
                return v0.getDittoHeaders();
            });
        });
    }

    private void updateThing(UpdateThing updateThing) {
        this.log.withCorrelationId(updateThing).warning("Out-of-sync thing is reported: <{}>", updateThing);
        forwardToShardRegion(updateThing, (v0) -> {
            return v0.getEntityId();
        }, (v0) -> {
            return v0.getType();
        }, (v0) -> {
            return v0.toJson();
        }, (v0) -> {
            return v0.getDittoHeaders();
        });
    }

    private void processPolicyReferenceTag(PolicyReferenceTag policyReferenceTag) {
        String asIdentifierString = policyReferenceTag.asIdentifierString();
        this.log.withCorrelationId("policies-tags-sync-" + asIdentifierString).debug("Forwarding PolicyReferenceTag '{}'", asIdentifierString);
        forwardJsonifiableToShardRegion(policyReferenceTag, policyReferenceTag2 -> {
            return policyReferenceTag.getEntityId();
        });
    }

    private void processThingEvent(ThingEvent<?> thingEvent) {
        this.log.withCorrelationId(thingEvent).debug("Forwarding incoming ThingEvent for thingId '{}'", String.valueOf(thingEvent.getThingEntityId()));
        forwardEventToShardRegion(thingEvent, (v0) -> {
            return v0.getThingEntityId();
        });
    }

    private <J extends Jsonifiable<?>> void forwardJsonifiableToShardRegion(J j, Function<J, EntityId> function) {
        forwardToShardRegion(j, function, jsonifiable -> {
            return jsonifiable.getClass().getSimpleName();
        }, jsonifiable2 -> {
            return jsonifiable2.toJson().asObject();
        }, jsonifiable3 -> {
            return DittoHeaders.empty();
        });
    }

    private <E extends Event<?>> void forwardEventToShardRegion(E e, Function<E, EntityId> function) {
        forwardToShardRegion(e, function, (v0) -> {
            return v0.getType();
        }, event -> {
            return event.toJson(event.getImplementedSchemaVersion(), FieldType.regularOrSpecial());
        }, (v0) -> {
            return v0.getDittoHeaders();
        });
    }

    private <M> void forwardToShardRegion(M m, Function<M, EntityId> function, Function<M, String> function2, Function<M, JsonObject> function3, Function<M, DittoHeaders> function4) {
        EntityId apply = function.apply(m);
        this.log.debug("Forwarding incoming {} to shard region of {}", m.getClass().getSimpleName(), apply);
        ShardedMessageEnvelope of = ShardedMessageEnvelope.of(apply, function2.apply(m), function3.apply(m), function4.apply(m));
        ActorRef sender = getSender();
        ActorRef deadLetters = getContext().getSystem().deadLetters();
        this.namespaceBlockingBehavior.block(of).thenAccept(withDittoHeaders -> {
            this.shardRegion.tell(withDittoHeaders, sender);
        }).exceptionally(th -> {
            if (Objects.equals(sender, deadLetters) || !(m instanceof IdentifiableStreamingMessage)) {
                return null;
            }
            sender.tell(StreamAck.success(((IdentifiableStreamingMessage) m).asIdentifierString()), getSelf());
            return null;
        });
    }
}
