package org.eclipse.ditto.services.utils.cluster;

import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Terminated;
import akka.cluster.Cluster;
import akka.dispatch.ExecutionContexts;
import akka.dispatch.OnComplete;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.ditto.services.utils.config.ConfigUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/ditto/services/utils/cluster/ClusterUtil.class */
public final class ClusterUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClusterUtil.class);
    private static final String CONFIG_CLUSTER_BECOME_LEADER = "ditto.cluster.become-leader";
    private static final int SOCKET_CONNECT_TIMEOUT = 500;

    private ClusterUtil() {
        throw new AssertionError();
    }

    public static void joinCluster(ActorSystem actorSystem, Config config) {
        boolean z;
        String name = actorSystem.name();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        String localHostAddress = ConfigUtil.getLocalHostAddress();
        LOGGER.info("Determined my own local address: '{}'", localHostAddress);
        if (ConfigUtil.shouldManuallyJoinClusterSeedNodes(config)) {
            LOGGER.info("Manually joining cluster seed-nodes..");
            try {
                z = config.getBoolean(CONFIG_CLUSTER_BECOME_LEADER);
            } catch (ConfigException.Missing e) {
                z = false;
            }
            List clusterSeedNodesExceptOwn = ConfigUtil.getClusterSeedNodesExceptOwn(name, config);
            if (z) {
                LOGGER.info("Becoming leader, checking if there are other hosts in cluster '{}'", name);
                clusterSeedNodesExceptOwn = (List) ((Stream) clusterSeedNodesExceptOwn.stream().parallel()).filter(address -> {
                    boolean z2;
                    String str = (String) address.host().get();
                    int parseInt = Integer.parseInt(address.port().get().toString());
                    try {
                        Socket socket = new Socket();
                        socket.connect(new InetSocketAddress(str, parseInt), SOCKET_CONNECT_TIMEOUT);
                        z2 = true;
                        socket.close();
                    } catch (IOException e2) {
                        z2 = false;
                    }
                    return z2;
                }).collect(Collectors.toList());
            }
            Address buildAddress = ConfigUtil.buildAddress(name, localHostAddress);
            if (z && clusterSeedNodesExceptOwn.isEmpty()) {
                clusterSeedNodesExceptOwn.add(buildAddress);
                LOGGER.info("Creating new cluster with myself ({}) as leader: {}", localHostAddress, clusterSeedNodesExceptOwn);
            } else {
                LOGGER.info("Joining cluster seed nodes: {}", clusterSeedNodesExceptOwn);
            }
            Cluster cluster = Cluster.get(actorSystem);
            cluster.joinSeedNodes(clusterSeedNodesExceptOwn);
            CompletableFuture completableFuture = new CompletableFuture();
            cluster.registerOnMemberRemoved(() -> {
                LOGGER.info("Shutting down the ActorSystem ..");
                actorSystem.terminate().onComplete(new OnComplete<Terminated>() { // from class: org.eclipse.ditto.services.utils.cluster.ClusterUtil.1
                    public void onComplete(Throwable th, Terminated terminated) throws Throwable {
                        if (terminated != null) {
                            completableFuture.complete(terminated);
                        } else {
                            completableFuture.completeExceptionally(th);
                        }
                    }
                }, ExecutionContexts.fromExecutorService(Executors.newSingleThreadExecutor()));
            });
            Runtime.getRuntime().addShutdownHook(gracefullyLeaveClusterShutdownHook(buildAddress, cluster, atomicBoolean, completableFuture));
        }
        actorSystem.registerOnTermination(() -> {
            if (atomicBoolean.get()) {
                LOGGER.info("ActorSystem has shutdown gracefully");
            } else {
                LOGGER.warn("ActorSystem was shutdown NOT gracefully - exiting JVM with status code '-1'");
                System.exit(-1);
            }
        });
    }

    private static Thread gracefullyLeaveClusterShutdownHook(Address address, Cluster cluster, AtomicBoolean atomicBoolean, CompletableFuture<Terminated> completableFuture) {
        return new Thread(() -> {
            atomicBoolean.set(true);
            LOGGER.info("Shutdown issued from outside (e.g. SIGTERM) - gracefully shutting down..");
            LOGGER.info("Leaving the cluster - my address: {}", address);
            cluster.leave(address);
            try {
                completableFuture.get(8L, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                LOGGER.error("System termination was interrupted: {}", e.getMessage(), e);
            }
        });
    }
}
