package org.eclipse.ditto.services.thingsearch.updater.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.cluster.sharding.ShardRegion;
import akka.cluster.singleton.ClusterSingletonManager;
import akka.cluster.singleton.ClusterSingletonManagerSettings;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.CircuitBreaker;
import akka.stream.ActorMaterializer;
import com.typesafe.config.Config;
import java.util.concurrent.TimeUnit;
import org.eclipse.ditto.services.thingsearch.common.util.RootSupervisorStrategyFactory;
import org.eclipse.ditto.services.thingsearch.persistence.write.impl.MongoEventToPersistenceStrategyFactory;
import org.eclipse.ditto.services.thingsearch.persistence.write.impl.MongoThingsSearchUpdaterPersistence;
import org.eclipse.ditto.services.utils.akka.streaming.StreamConsumerSettings;
import org.eclipse.ditto.services.utils.akka.streaming.StreamMetadataPersistence;
import org.eclipse.ditto.services.utils.distributedcache.actors.CacheFacadeActor;
import org.eclipse.ditto.services.utils.distributedcache.actors.CacheRole;
import org.eclipse.ditto.services.utils.persistence.mongo.MongoClientWrapper;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/SearchUpdaterRootActor.class */
public final class SearchUpdaterRootActor extends AbstractActor {
    public static final String ACTOR_NAME = "searchUpdaterRoot";
    private final LoggingAdapter log;
    private final SupervisorStrategy supervisorStrategy;
    private final ActorRef thingsUpdaterActor;

    private SearchUpdaterRootActor(Config config, ActorRef actorRef, ActorMaterializer actorMaterializer, StreamMetadataPersistence streamMetadataPersistence, StreamMetadataPersistence streamMetadataPersistence2) {
        this.log = Logging.getLogger(getContext().system(), this);
        this.supervisorStrategy = RootSupervisorStrategyFactory.createStrategy(this.log);
        int i = config.getInt("ditto.things-search.cluster.number-of-shards");
        MongoThingsSearchUpdaterPersistence mongoThingsSearchUpdaterPersistence = new MongoThingsSearchUpdaterPersistence(MongoClientWrapper.newInstance(config), this.log, MongoEventToPersistenceStrategyFactory.getInstance());
        CircuitBreaker circuitBreaker = new CircuitBreaker(getContext().dispatcher(), getContext().system().scheduler(), config.getInt("ditto.things-search.mongodb.breaker.maxFailures"), Duration.create(config.getDuration("ditto.things-search.mongodb.breaker.timeout.call").getSeconds(), TimeUnit.SECONDS), Duration.create(config.getDuration("ditto.things-search.mongodb.breaker.timeout.reset").getSeconds(), TimeUnit.SECONDS));
        circuitBreaker.onOpen(() -> {
            this.log.warning("The circuit breaker for this search updater instance is open which means that all ThingUpdaters won't process any messages until the circuit breaker is closed again");
        });
        circuitBreaker.onClose(() -> {
            this.log.info("The circuit breaker for this search updater instance is closed again. Therefore all ThingUpdaters process events again");
        });
        actorRef.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
        boolean z = config.getBoolean("ditto.things-search.updater.event-processing.active");
        if (!z) {
            this.log.warning("Event processing is disabled.");
        }
        boolean z2 = config.getBoolean("ditto.things-search.updater.cache-updates.active");
        if (!z2) {
            this.log.warning("Cache-updates are disabled.");
        }
        ActorRef startChildActor = z2 ? startChildActor(CacheFacadeActor.actorNameFor(CacheRole.THING), CacheFacadeActor.props(CacheRole.THING, config)) : null;
        ActorRef startChildActor2 = z2 ? startChildActor(CacheFacadeActor.actorNameFor(CacheRole.POLICY), CacheFacadeActor.props(CacheRole.POLICY, config)) : null;
        this.thingsUpdaterActor = startChildActor("thingsUpdater", ThingsUpdater.props(i, ShardRegionFactory.getInstance(getContext().getSystem()), mongoThingsSearchUpdaterPersistence, circuitBreaker, z, config.getDuration("ditto.things-search.updater.activity-check-interval"), config.hasPath("ditto.things-search.updater.max-bulk-size") ? config.getInt("ditto.things-search.updater.max-bulk-size") : Integer.MAX_VALUE, startChildActor, startChildActor2));
        if (config.getBoolean("ditto.things-search.updater.sync.things.active")) {
            startClusterSingletonActor("thingsStreamSupervisor", ThingsStreamSupervisorCreator.props(this.thingsUpdaterActor, actorRef, streamMetadataPersistence, actorMaterializer, createThingsStreamConsumerSettings(config)));
        } else {
            this.log.warning("Things synchronization is not active");
        }
        if (config.getBoolean("ditto.things-search.updater.sync.policies.active")) {
            startClusterSingletonActor("policiesStreamSupervisor", PoliciesStreamSupervisorCreator.props(this.thingsUpdaterActor, actorRef, streamMetadataPersistence2, actorMaterializer, createPoliciesStreamConsumerSettings(config), mongoThingsSearchUpdaterPersistence));
        } else {
            this.log.warning("Policies synchronization is not active");
        }
    }

    private static StreamConsumerSettings createThingsStreamConsumerSettings(Config config) {
        return StreamConsumerSettings.of(config.getDuration("ditto.things-search.updater.sync.things.start-offset"), config.getDuration("ditto.things-search.updater.sync.things.stream-interval"), config.getDuration("ditto.things-search.updater.sync.things.initial-start-offset"), config.getDuration("ditto.things-search.updater.sync.things.max-idle-time"), config.getDuration("ditto.things-search.updater.sync.things.streaming-actor-timeout"), config.getInt("ditto.things-search.updater.sync.things.elements-streamed-per-batch"), config.getDuration("ditto.things-search.updater.sync.things.outdated-warning-offset"));
    }

    private static StreamConsumerSettings createPoliciesStreamConsumerSettings(Config config) {
        return StreamConsumerSettings.of(config.getDuration("ditto.things-search.updater.sync.policies.start-offset"), config.getDuration("ditto.things-search.updater.sync.policies.stream-interval"), config.getDuration("ditto.things-search.updater.sync.policies.initial-start-offset"), config.getDuration("ditto.things-search.updater.sync.policies.max-idle-time"), config.getDuration("ditto.things-search.updater.sync.policies.streaming-actor-timeout"), config.getInt("ditto.things-search.updater.sync.policies.elements-streamed-per-batch"), config.getDuration("ditto.things-search.updater.sync.policies.outdated-warning-offset"));
    }

    public static Props props(final Config config, final ActorRef actorRef, final ActorMaterializer actorMaterializer, final StreamMetadataPersistence streamMetadataPersistence, final StreamMetadataPersistence streamMetadataPersistence2) {
        return Props.create(SearchUpdaterRootActor.class, new Creator<SearchUpdaterRootActor>() { // from class: org.eclipse.ditto.services.thingsearch.updater.actors.SearchUpdaterRootActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public SearchUpdaterRootActor m0create() {
                return new SearchUpdaterRootActor(config, actorRef, actorMaterializer, streamMetadataPersistence, streamMetadataPersistence2);
            }
        });
    }

    public SupervisorStrategy supervisorStrategy() {
        return this.supervisorStrategy;
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().matchEquals(ShardRegion.getShardRegionStateInstance(), shardRegion$GetShardRegionState$ -> {
            this.thingsUpdaterActor.forward(shardRegion$GetShardRegionState$, getContext());
        }).match(Status.Failure.class, failure -> {
            this.log.error(failure.cause(), "Got failure: {}", failure);
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private ActorRef startChildActor(String str, Props props) {
        this.log.info("Starting child actor '{}'", str);
        return getContext().actorOf(props, str);
    }

    private void startClusterSingletonActor(String str, Props props) {
        getContext().actorOf(ClusterSingletonManager.props(props, PoisonPill.getInstance(), ClusterSingletonManagerSettings.create(getContext().system()).withRole("things-search")), str);
    }
}
