package org.eclipse.ditto.services.connectivity.actors;

import akka.Done;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.CoordinatedShutdown;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.cluster.Cluster;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.event.DiagnosticLoggingAdapter;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.server.Directives;
import akka.http.javadsl.server.Route;
import akka.japi.pf.DeciderBuilder;
import akka.stream.ActorMaterializer;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.function.UnaryOperator;
import javax.annotation.Nullable;
import javax.jms.JMSRuntimeException;
import javax.naming.NamingException;
import org.eclipse.ditto.services.base.actors.DittoRootActor;
import org.eclipse.ditto.services.base.config.http.HttpConfig;
import org.eclipse.ditto.services.connectivity.messaging.ConnectivityProxyActor;
import org.eclipse.ditto.services.connectivity.messaging.DefaultClientActorPropsFactory;
import org.eclipse.ditto.services.connectivity.messaging.ReconnectActor;
import org.eclipse.ditto.services.connectivity.messaging.config.ConnectivityConfig;
import org.eclipse.ditto.services.connectivity.messaging.persistence.ConnectionPersistenceOperationsActor;
import org.eclipse.ditto.services.connectivity.messaging.persistence.ConnectionPersistenceStreamingActorCreator;
import org.eclipse.ditto.services.connectivity.messaging.persistence.ConnectionSupervisorActor;
import org.eclipse.ditto.services.models.concierge.actors.ConciergeEnforcerClusterRouterFactory;
import org.eclipse.ditto.services.models.concierge.actors.ConciergeForwarderActor;
import org.eclipse.ditto.services.models.concierge.pubsub.DittoProtocolSub;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.cluster.ClusterStatusSupplier;
import org.eclipse.ditto.services.utils.cluster.ClusterUtil;
import org.eclipse.ditto.services.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.services.utils.cluster.ShardRegionExtractor;
import org.eclipse.ditto.services.utils.cluster.config.ClusterConfig;
import org.eclipse.ditto.services.utils.config.LocalHostAddressSupplier;
import org.eclipse.ditto.services.utils.health.DefaultHealthCheckingActorFactory;
import org.eclipse.ditto.services.utils.health.HealthCheckingActorOptions;
import org.eclipse.ditto.services.utils.health.config.HealthCheckConfig;
import org.eclipse.ditto.services.utils.health.config.MetricsReporterConfig;
import org.eclipse.ditto.services.utils.health.routes.StatusRoute;
import org.eclipse.ditto.services.utils.persistence.mongo.MongoHealthChecker;
import org.eclipse.ditto.services.utils.persistence.mongo.MongoMetricsReporter;
import org.eclipse.ditto.services.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor;
import scala.PartialFunction;

/* loaded from: input_file:org/eclipse/ditto/services/connectivity/actors/ConnectivityRootActor.class */
public final class ConnectivityRootActor extends DittoRootActor {
    public static final String ACTOR_NAME = "connectivityRoot";
    private static final String CLUSTER_ROLE = "connectivity";
    private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);

    private ConnectivityRootActor(ConnectivityConfig connectivityConfig, ActorRef actorRef, ActorMaterializer actorMaterializer, UnaryOperator<Signal<?>> unaryOperator, @Nullable ConnectivityCommandInterceptor connectivityCommandInterceptor) {
        ClusterConfig clusterConfig = connectivityConfig.getClusterConfig();
        ActorSystem system = getContext().system();
        Props connectionSupervisorProps = getConnectionSupervisorProps(DittoProtocolSub.of(getContext()), startChildActor("connectivityProxyActor", ConnectivityProxyActor.props(getConciergeForwarder(clusterConfig, actorRef, unaryOperator))), connectivityCommandInterceptor, actorRef);
        actorRef.tell(DistPubSubAccess.put(startChildActor("persistenceStreamingActor", ConnectionPersistenceStreamingActorCreator.props(0))), getSelf());
        startClusterSingletonActor(ReconnectActor.props(getConnectionShardRegion(system, connectionSupervisorProps, clusterConfig), MongoReadJournal.newInstance(system)), "reconnect");
        startChildActor("connectionOps", ConnectionPersistenceOperationsActor.props(actorRef, connectivityConfig.getMongoDbConfig(), system.settings().config(), connectivityConfig.getPersistenceOperationsConfig()));
        getHttpBinding(connectivityConfig.getHttpConfig(), system, actorMaterializer, getHealthCheckingActor(connectivityConfig, actorRef)).thenAccept(serverBinding -> {
            CoordinatedShutdown.get(system).addTask(CoordinatedShutdown.PhaseServiceUnbind(), "shutdown_health_http_endpoint", () -> {
                this.log.info("Gracefully shutting down status/health HTTP endpoint ...");
                return serverBinding.terminate(Duration.ofSeconds(1L)).handle((httpTerminated, th) -> {
                    return Done.getInstance();
                });
            });
        }).exceptionally(th -> {
            this.log.error("Something very bad happened! " + th.getMessage(), th);
            system.terminate();
            return null;
        });
    }

    public static Props props(ConnectivityConfig connectivityConfig, ActorRef actorRef, ActorMaterializer actorMaterializer, UnaryOperator<Signal<?>> unaryOperator, ConnectivityCommandInterceptor connectivityCommandInterceptor) {
        return Props.create(ConnectivityRootActor.class, new Object[]{connectivityConfig, actorRef, actorMaterializer, unaryOperator, connectivityCommandInterceptor});
    }

    public static Props props(ConnectivityConfig connectivityConfig, ActorRef actorRef, ActorMaterializer actorMaterializer, UnaryOperator<Signal<?>> unaryOperator) {
        return Props.create(ConnectivityRootActor.class, new Object[]{connectivityConfig, actorRef, actorMaterializer, unaryOperator, null});
    }

    protected PartialFunction<Throwable, SupervisorStrategy.Directive> getSupervisionDecider() {
        return DeciderBuilder.match(JMSRuntimeException.class, jMSRuntimeException -> {
            this.log.warning("JMSRuntimeException '{}' occurred.", jMSRuntimeException.getMessage());
            return restartChild();
        }).match(NamingException.class, namingException -> {
            this.log.warning("NamingException '{}' occurred.", namingException.getMessage());
            return restartChild();
        }).build().orElse(super.getSupervisionDecider());
    }

    private void startClusterSingletonActor(Props props, String str) {
        ClusterUtil.startSingleton(getContext(), CLUSTER_ROLE, str, props);
    }

    private static Route createRoute(ActorSystem actorSystem, ActorRef actorRef) {
        StatusRoute statusRoute = new StatusRoute(new ClusterStatusSupplier(Cluster.get(actorSystem)), actorRef, actorSystem);
        return Directives.logRequest("http-request", () -> {
            Objects.requireNonNull(statusRoute);
            return Directives.logResult("http-response", statusRoute::buildStatusRoute);
        });
    }

    private ActorRef getHealthCheckingActor(ConnectivityConfig connectivityConfig, ActorRef actorRef) {
        HealthCheckConfig healthCheckConfig = connectivityConfig.getHealthCheckConfig();
        HealthCheckingActorOptions.Builder builder = HealthCheckingActorOptions.getBuilder(healthCheckConfig.isEnabled(), healthCheckConfig.getInterval());
        if (healthCheckConfig.getPersistenceConfig().isEnabled()) {
            builder.enablePersistenceCheck();
        }
        HealthCheckingActorOptions build = builder.build();
        MetricsReporterConfig metricsReporterConfig = healthCheckConfig.getPersistenceConfig().getMetricsReporterConfig();
        return startChildActor("healthCheckingActor", DefaultHealthCheckingActorFactory.props(build, MongoHealthChecker.props(), new Props[]{MongoMetricsReporter.props(metricsReporterConfig.getResolution(), metricsReporterConfig.getHistory(), actorRef)}));
    }

    private ActorRef getConciergeForwarder(ClusterConfig clusterConfig, ActorRef actorRef, UnaryOperator<Signal<?>> unaryOperator) {
        return startChildActor("conciergeForwarder", ConciergeForwarderActor.props(actorRef, ConciergeEnforcerClusterRouterFactory.createConciergeEnforcerClusterRouter(getContext(), clusterConfig.getNumberOfShards()), unaryOperator));
    }

    private static Props getConnectionSupervisorProps(DittoProtocolSub dittoProtocolSub, ActorRef actorRef, @Nullable ConnectivityCommandInterceptor connectivityCommandInterceptor, ActorRef actorRef2) {
        return ConnectionSupervisorActor.props(dittoProtocolSub, actorRef, DefaultClientActorPropsFactory.getInstance(), connectivityCommandInterceptor, actorRef2);
    }

    private static ActorRef getConnectionShardRegion(ActorSystem actorSystem, Props props, ClusterConfig clusterConfig) {
        return ClusterSharding.get(actorSystem).start("connection", props, ClusterShardingSettings.create(actorSystem).withRole(CLUSTER_ROLE), ShardRegionExtractor.of(clusterConfig.getNumberOfShards(), actorSystem));
    }

    private CompletionStage<ServerBinding> getHttpBinding(HttpConfig httpConfig, ActorSystem actorSystem, ActorMaterializer actorMaterializer, ActorRef actorRef) {
        String hostname = httpConfig.getHostname();
        if (hostname.isEmpty()) {
            hostname = LocalHostAddressSupplier.getInstance().get();
            this.log.info("No explicit hostname configured, using HTTP hostname <{}>.", hostname);
        }
        return Http.get(actorSystem).bindAndHandle(createRoute(actorSystem, actorRef).flow(actorSystem, actorMaterializer), ConnectHttp.toHost(hostname, httpConfig.getPort()), actorMaterializer);
    }
}
