package org.eclipse.ditto.services.thingsearch.updater.actors;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.cluster.sharding.ShardRegion;
import akka.event.DiagnosticLoggingAdapter;
import akka.event.Logging;
import akka.japi.pf.ReceiveBuilder;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.ditto.services.thingsearch.common.config.DittoSearchConfig;
import org.eclipse.ditto.services.thingsearch.common.config.UpdaterConfig;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.cluster.ShardRegionExtractor;
import org.eclipse.ditto.services.utils.cluster.config.ClusterConfig;
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.services.utils.namespaces.BlockNamespaceBehavior;
import org.eclipse.ditto.services.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.services.utils.pubsub.DistributedSub;
import org.eclipse.ditto.signals.events.things.ThingEvent;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/NewEventForwarder.class */
final class NewEventForwarder extends AbstractActorWithTimers {
    static final String ACTOR_NAME = "newEventForwarder";
    private final ActorRef shardRegion;
    private final DistributedSub thingEventSub;
    private final BlockNamespaceBehavior namespaceBlockingBehavior;
    private final ShardRegion.GetClusterShardingStats getClusterShardingStats;
    private final ShardRegionExtractor shardRegionExtractor;
    private final DiagnosticLoggingAdapter log = Logging.apply(this);
    private Set<String> previousShardIds = Collections.emptySet();

    /* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/NewEventForwarder$Clock.class */
    private enum Clock {
        REBALANCE_TICK
    }

    private NewEventForwarder(DistributedSub distributedSub, ActorRef actorRef, BlockedNamespaces blockedNamespaces) {
        this.thingEventSub = distributedSub;
        DittoSearchConfig of = DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()));
        UpdaterConfig updaterConfig = of.getUpdaterConfig();
        ClusterConfig clusterConfig = of.getClusterConfig();
        this.shardRegion = actorRef;
        this.namespaceBlockingBehavior = BlockNamespaceBehavior.of(blockedNamespaces);
        this.getClusterShardingStats = new ShardRegion.GetClusterShardingStats(FiniteDuration.create(updaterConfig.getShardingStatePollInterval().toMillis(), TimeUnit.MILLISECONDS));
        this.shardRegionExtractor = ShardRegionExtractor.of(clusterConfig.getNumberOfShards(), getContext().getSystem());
        if (updaterConfig.isEventProcessingActive()) {
            getTimers().startPeriodicTimer(Clock.REBALANCE_TICK, Clock.REBALANCE_TICK, updaterConfig.getShardingStatePollInterval());
            getSelf().tell(Clock.REBALANCE_TICK, getSelf());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(DistributedSub distributedSub, ActorRef actorRef, BlockedNamespaces blockedNamespaces) {
        return Props.create(NewEventForwarder.class, new Object[]{distributedSub, actorRef, blockedNamespaces});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(ThingEvent.class, this::processThingEvent).matchEquals(Clock.REBALANCE_TICK, this::retrieveClusterShardingStats).match(ShardRegion.ClusterShardingStats.class, this::updateSubscriptions).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private void retrieveClusterShardingStats(Clock clock) {
        this.shardRegion.tell(this.getClusterShardingStats, getSelf());
    }

    private void updateSubscriptions(ShardRegion.ClusterShardingStats clusterShardingStats) {
        Set<String> inactiveShardIds = this.shardRegionExtractor.getInactiveShardIds(getActiveShardIds(clusterShardingStats));
        this.log.debug("Updating event subscriptions for inactive shards: <{}> -> <{}>", this.previousShardIds, inactiveShardIds);
        List list = (List) inactiveShardIds.stream().filter(str -> {
            return !this.previousShardIds.contains(str);
        }).collect(Collectors.toList());
        List list2 = (List) this.previousShardIds.stream().filter(str2 -> {
            return !inactiveShardIds.contains(str2);
        }).collect(Collectors.toList());
        this.thingEventSub.subscribeWithoutAck(list, getSelf());
        this.thingEventSub.unsubscribeWithoutAck(list2, getSelf());
        this.previousShardIds = inactiveShardIds;
    }

    private void processThingEvent(ThingEvent<?> thingEvent) {
        LogUtil.enhanceLogWithCorrelationId(this.log, thingEvent, new LogUtil.MdcField[0]);
        this.log.debug("Forwarding incoming ThingEvent for thingId '{}'", thingEvent.getThingId());
        this.namespaceBlockingBehavior.block(thingEvent).thenAccept(withDittoHeaders -> {
            this.shardRegion.tell(withDittoHeaders, getSelf());
        });
    }

    private static Collection<String> getActiveShardIds(ShardRegion.ClusterShardingStats clusterShardingStats) {
        return (Collection) clusterShardingStats.getRegions().values().stream().flatMap(shardRegionStats -> {
            return shardRegionStats.getStats().keySet().stream();
        }).collect(Collectors.toList());
    }
}
