package org.eclipse.ditto.services.thingsearch.updater.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.KillSwitch;
import com.mongodb.event.CommandListener;
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.reactivestreams.client.MongoDatabase;
import javax.annotation.Nullable;
import org.eclipse.ditto.services.thingsearch.common.config.SearchConfig;
import org.eclipse.ditto.services.thingsearch.common.config.UpdaterConfig;
import org.eclipse.ditto.services.thingsearch.common.util.RootSupervisorStrategyFactory;
import org.eclipse.ditto.services.thingsearch.persistence.read.ThingsSearchPersistence;
import org.eclipse.ditto.services.thingsearch.persistence.write.ThingsSearchUpdaterPersistence;
import org.eclipse.ditto.services.thingsearch.persistence.write.impl.MongoThingsSearchUpdaterPersistence;
import org.eclipse.ditto.services.thingsearch.persistence.write.streaming.ChangeQueueActor;
import org.eclipse.ditto.services.thingsearch.persistence.write.streaming.SearchUpdaterStream;
import org.eclipse.ditto.services.utils.akka.streaming.TimestampPersistence;
import org.eclipse.ditto.services.utils.cluster.ClusterUtil;
import org.eclipse.ditto.services.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.services.utils.health.RetrieveHealth;
import org.eclipse.ditto.services.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.services.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.services.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.services.utils.persistence.mongo.config.MongoDbConfig;
import org.eclipse.ditto.services.utils.persistence.mongo.monitoring.KamonCommandListener;
import org.eclipse.ditto.services.utils.persistence.mongo.monitoring.KamonConnectionPoolListener;
import org.eclipse.ditto.services.utils.pubsub.DistributedAcks;
import org.eclipse.ditto.services.utils.pubsub.DistributedSub;
import org.eclipse.ditto.services.utils.pubsub.ThingEventPubSubFactory;
import org.eclipse.ditto.signals.commands.devops.RetrieveStatisticsDetails;

/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/SearchUpdaterRootActor.class */
public final class SearchUpdaterRootActor extends AbstractActor {
    public static final String ACTOR_NAME = "searchUpdaterRoot";
    public static final String CLUSTER_ROLE = "things-search";
    private static final String KAMON_METRICS_PREFIX = "updater";
    private static final String SEARCH_ROLE = "things-search";
    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    private final SupervisorStrategy supervisorStrategy = RootSupervisorStrategyFactory.createStrategy(this.log);
    private final KillSwitch updaterStreamKillSwitch;
    private final ActorRef thingsUpdaterActor;
    private final ActorRef backgroundSyncActorProxy;
    private final DittoMongoClient dittoMongoClient;

    private SearchUpdaterRootActor(SearchConfig searchConfig, ActorRef actorRef, ThingsSearchPersistence thingsSearchPersistence, TimestampPersistence timestampPersistence) {
        int numberOfShards = searchConfig.getClusterConfig().getNumberOfShards();
        ActorSystem system = getContext().getSystem();
        MongoDbConfig mongoDbConfig = searchConfig.getMongoDbConfig();
        this.dittoMongoClient = MongoClientWrapper.getBuilder(mongoDbConfig).addCommandListener(getCommandListenerOrNull(mongoDbConfig.getMonitoringConfig())).addConnectionPoolListener(getConnectionPoolListenerOrNull(mongoDbConfig.getMonitoringConfig())).build();
        ShardRegionFactory shardRegionFactory = ShardRegionFactory.getInstance(system);
        BlockedNamespaces of = BlockedNamespaces.of(system);
        ActorRef startChildActor = startChildActor("changeQueueActor", ChangeQueueActor.props());
        ActorRef searchUpdaterShardRegion = shardRegionFactory.getSearchUpdaterShardRegion(numberOfShards, ThingUpdater.props(actorRef, startChildActor), "things-search");
        this.updaterStreamKillSwitch = startSearchUpdaterStream(searchConfig, system, shardRegionFactory, numberOfShards, searchUpdaterShardRegion, startChildActor, this.dittoMongoClient.getDefaultDatabase(), of);
        ThingsSearchUpdaterPersistence of2 = MongoThingsSearchUpdaterPersistence.of(this.dittoMongoClient.getDefaultDatabase());
        actorRef.tell(DistPubSubAccess.put(getSelf()), getSelf());
        UpdaterConfig updaterConfig = searchConfig.getUpdaterConfig();
        if (!updaterConfig.isEventProcessingActive()) {
            this.log.warning("Event processing is disabled!");
        }
        DistributedSub startDistributedSub = ThingEventPubSubFactory.shardIdOnly(getContext(), numberOfShards, DistributedAcks.empty()).startDistributedSub();
        this.thingsUpdaterActor = startChildActor("thingsUpdater", ThingsUpdater.props(startDistributedSub, searchUpdaterShardRegion, updaterConfig, of, actorRef));
        startClusterSingletonActor("newEventForwarder", NewEventForwarder.props(startDistributedSub, searchUpdaterShardRegion, of));
        startChildActor("thingsSearchPolicyEventForwarder", PolicyEventForwarder.props(actorRef, this.thingsUpdaterActor, of, of2));
        this.backgroundSyncActorProxy = ClusterUtil.startSingletonProxy(getContext(), "things-search", startClusterSingletonActor(BackgroundSyncActor.ACTOR_NAME, BackgroundSyncActor.props(updaterConfig.getBackgroundSyncConfig(), actorRef, thingsSearchPersistence, timestampPersistence, shardRegionFactory.getPoliciesShardRegion(numberOfShards), this.thingsUpdaterActor)));
        startChildActor(ThingsSearchPersistenceOperationsActor.ACTOR_NAME, ThingsSearchPersistenceOperationsActor.props(actorRef, of2, searchConfig.getPersistenceOperationsConfig()));
    }

    @Nullable
    private static CommandListener getCommandListenerOrNull(MongoDbConfig.MonitoringConfig monitoringConfig) {
        if (monitoringConfig.isCommandsEnabled()) {
            return new KamonCommandListener(KAMON_METRICS_PREFIX);
        }
        return null;
    }

    @Nullable
    private static ConnectionPoolListener getConnectionPoolListenerOrNull(MongoDbConfig.MonitoringConfig monitoringConfig) {
        if (monitoringConfig.isConnectionPoolEnabled()) {
            return new KamonConnectionPoolListener(KAMON_METRICS_PREFIX);
        }
        return null;
    }

    public static Props props(SearchConfig searchConfig, ActorRef actorRef, ThingsSearchPersistence thingsSearchPersistence, TimestampPersistence timestampPersistence) {
        return Props.create(SearchUpdaterRootActor.class, new Object[]{searchConfig, actorRef, thingsSearchPersistence, timestampPersistence});
    }

    public void postStop() throws Exception {
        this.updaterStreamKillSwitch.shutdown();
        this.dittoMongoClient.close();
        super.postStop();
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(RetrieveStatisticsDetails.class, retrieveStatisticsDetails -> {
            this.thingsUpdaterActor.forward(retrieveStatisticsDetails, getContext());
        }).match(RetrieveHealth.class, retrieveHealth -> {
            this.backgroundSyncActorProxy.forward(retrieveHealth, getContext());
        }).match(Status.Failure.class, failure -> {
            this.log.error(failure.cause(), "Got failure: {}", failure);
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    public SupervisorStrategy supervisorStrategy() {
        return this.supervisorStrategy;
    }

    private ActorRef startChildActor(String str, Props props) {
        this.log.info("Starting child actor <{}>.", str);
        return getContext().actorOf(props, str);
    }

    private ActorRef startClusterSingletonActor(String str, Props props) {
        return ClusterUtil.startSingleton(getContext(), "things-search", str, props);
    }

    private KillSwitch startSearchUpdaterStream(SearchConfig searchConfig, ActorSystem actorSystem, ShardRegionFactory shardRegionFactory, int i, ActorRef actorRef, ActorRef actorRef2, MongoDatabase mongoDatabase, BlockedNamespaces blockedNamespaces) {
        return SearchUpdaterStream.of(searchConfig, actorSystem, shardRegionFactory.getThingsShardRegion(i), shardRegionFactory.getPoliciesShardRegion(i), actorRef, actorRef2, mongoDatabase, blockedNamespaces).start(getContext());
    }
}
