package io.atomix.cluster.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.atomix.cluster.ClusterEvent;
import io.atomix.cluster.ClusterEventListener;
import io.atomix.cluster.ClusterMetadata;
import io.atomix.cluster.ClusterMetadataEvent;
import io.atomix.cluster.ClusterMetadataEventListener;
import io.atomix.cluster.ClusterMetadataService;
import io.atomix.cluster.ClusterService;
import io.atomix.cluster.ManagedClusterService;
import io.atomix.cluster.Node;
import io.atomix.cluster.NodeId;
import io.atomix.cluster.impl.DefaultClusterMetadataService;
import io.atomix.messaging.Endpoint;
import io.atomix.messaging.MessagingService;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.event.AbstractListenerManager;
import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces;
import io.atomix.utils.serializer.Serializer;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/atomix/cluster/impl/DefaultClusterService.class */
public class DefaultClusterService extends AbstractListenerManager<ClusterEvent, ClusterEventListener> implements ManagedClusterService {
    private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
    private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
    private static final long DEFAULT_FAILURE_TIME = 1000;
    private static final String HEARTBEAT_MESSAGE = "atomix-cluster-heartbeat";
    private final MessagingService messagingService;
    private final ClusterMetadataService metadataService;
    private final StatefulNode localNode;
    private ScheduledFuture<?> heartbeatFuture;
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClusterService.class);
    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder().register(KryoNamespaces.BASIC).nextId(500).register(new Class[]{NodeId.class}).register(new Class[]{Node.Type.class}).register(new Class[]{Node.State.class}).register(new Class[]{ClusterHeartbeat.class}).register(new Class[]{StatefulNode.class}).register(new DefaultClusterMetadataService.EndpointSerializer(), new Class[]{Endpoint.class}).build("ClusterService"));
    private int heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
    private int phiFailureThreshold = DEFAULT_PHI_FAILURE_THRESHOLD;
    private final AtomicBoolean started = new AtomicBoolean();
    private final Map<NodeId, StatefulNode> nodes = Maps.newConcurrentMap();
    private final Map<NodeId, PhiAccrualFailureDetector> failureDetectors = Maps.newConcurrentMap();
    private final ClusterMetadataEventListener metadataEventListener = this::handleMetadataEvent;
    private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(Threads.namedThreads("atomix-cluster-heartbeat-sender", LOGGER));
    private final ExecutorService heartbeatExecutor = Executors.newSingleThreadExecutor(Threads.namedThreads("atomix-cluster-heartbeat-receiver", LOGGER));

    public DefaultClusterService(Node node, ClusterMetadataService clusterMetadataService, MessagingService messagingService) {
        this.metadataService = (ClusterMetadataService) Preconditions.checkNotNull(clusterMetadataService, "metadataService cannot be null");
        this.messagingService = (MessagingService) Preconditions.checkNotNull(messagingService, "messagingService cannot be null");
        this.localNode = new StatefulNode(node.id(), node.type(), node.endpoint());
    }

    @Override // io.atomix.cluster.ClusterService
    public Node getLocalNode() {
        return this.localNode;
    }

    @Override // io.atomix.cluster.ClusterService
    public Set<Node> getNodes() {
        return ImmutableSet.copyOf((Collection) this.nodes.values().stream().filter(statefulNode -> {
            return statefulNode.type() == Node.Type.DATA || statefulNode.getState() == Node.State.ACTIVE;
        }).collect(Collectors.toList()));
    }

    @Override // io.atomix.cluster.ClusterService
    public Node getNode(NodeId nodeId) {
        StatefulNode statefulNode = this.nodes.get(nodeId);
        if (statefulNode == null) {
            return null;
        }
        if (statefulNode.type() == Node.Type.DATA || statefulNode.getState() == Node.State.ACTIVE) {
            return statefulNode;
        }
        return null;
    }

    private void sendHeartbeats() {
        try {
            Set set = (Set) this.nodes.values().stream().filter(statefulNode -> {
                return !statefulNode.id().equals(getLocalNode().id());
            }).collect(Collectors.toSet());
            byte[] encode = SERIALIZER.encode(new ClusterHeartbeat(this.localNode.id(), this.localNode.type()));
            set.forEach(statefulNode2 -> {
                sendHeartbeat(statefulNode2.endpoint(), encode);
                PhiAccrualFailureDetector computeIfAbsent = this.failureDetectors.computeIfAbsent(statefulNode2.id(), nodeId -> {
                    return new PhiAccrualFailureDetector();
                });
                if (computeIfAbsent.phi() >= this.phiFailureThreshold || System.currentTimeMillis() - computeIfAbsent.lastUpdated() > DEFAULT_FAILURE_TIME) {
                    if (statefulNode2.getState() == Node.State.ACTIVE) {
                        deactivateNode(statefulNode2);
                    }
                } else if (statefulNode2.getState() == Node.State.INACTIVE) {
                    activateNode(statefulNode2);
                }
            });
        } catch (Exception e) {
            LOGGER.debug("Failed to send heartbeat", e);
        }
    }

    private void sendHeartbeat(Endpoint endpoint, byte[] bArr) {
        this.messagingService.sendAndReceive(endpoint, HEARTBEAT_MESSAGE, bArr).whenComplete((bArr2, th) -> {
            if (th != null) {
                LOGGER.trace("Sending heartbeat to {} failed", endpoint, th);
                return;
            }
            boolean z = false;
            for (StatefulNode statefulNode : (Collection) SERIALIZER.decode(bArr2)) {
                if (this.nodes.putIfAbsent(statefulNode.id(), statefulNode) == null) {
                    post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, statefulNode));
                    post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, statefulNode));
                    z = true;
                }
            }
            if (z) {
                sendHeartbeats();
            }
        });
    }

    private byte[] handleHeartbeat(Endpoint endpoint, byte[] bArr) {
        ClusterHeartbeat clusterHeartbeat = (ClusterHeartbeat) SERIALIZER.decode(bArr);
        this.failureDetectors.computeIfAbsent(clusterHeartbeat.nodeId(), nodeId -> {
            return new PhiAccrualFailureDetector();
        }).report();
        activateNode(new StatefulNode(clusterHeartbeat.nodeId(), clusterHeartbeat.nodeType(), endpoint));
        return SERIALIZER.encode(this.nodes.values().stream().filter(statefulNode -> {
            return statefulNode.type() == Node.Type.CLIENT;
        }).collect(Collectors.toList()));
    }

    private void activateNode(StatefulNode statefulNode) {
        StatefulNode statefulNode2 = this.nodes.get(statefulNode.id());
        if (statefulNode2 != null) {
            if (statefulNode2.getState() == Node.State.INACTIVE) {
                statefulNode2.setState(Node.State.ACTIVE);
                post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, statefulNode2));
                return;
            }
            return;
        }
        statefulNode.setState(Node.State.ACTIVE);
        this.nodes.put(statefulNode.id(), statefulNode);
        post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, statefulNode));
        post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, statefulNode));
        sendHeartbeat(statefulNode.endpoint(), SERIALIZER.encode(new ClusterHeartbeat(this.localNode.id(), this.localNode.type())));
    }

    private void deactivateNode(StatefulNode statefulNode) {
        StatefulNode statefulNode2 = this.nodes.get(statefulNode.id());
        if (statefulNode2 == null || statefulNode2.getState() != Node.State.ACTIVE) {
            return;
        }
        statefulNode2.setState(Node.State.INACTIVE);
        switch (statefulNode2.type()) {
            case DATA:
                post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, statefulNode2));
                return;
            case CLIENT:
                post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, statefulNode2));
                post(new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, statefulNode2));
                return;
            default:
                throw new AssertionError();
        }
    }

    private void handleMetadataEvent(ClusterMetadataEvent clusterMetadataEvent) {
        Iterator it = Sets.difference((Set) this.nodes.entrySet().stream().filter(entry -> {
            return ((StatefulNode) entry.getValue()).type() == Node.Type.DATA;
        }).map(entry2 -> {
            return (NodeId) entry2.getKey();
        }).collect(Collectors.toSet()), (Set) ((ClusterMetadata) clusterMetadataEvent.subject()).bootstrapNodes().stream().map(node -> {
            if (this.nodes.get(node.id()) == null) {
                StatefulNode statefulNode = new StatefulNode(node.id(), node.type(), node.endpoint());
                this.nodes.put(statefulNode.id(), statefulNode);
                post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, statefulNode));
            }
            return node.id();
        }).collect(Collectors.toSet())).iterator();
        while (it.hasNext()) {
            StatefulNode remove = this.nodes.remove((NodeId) it.next());
            if (remove != null) {
                post(new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, remove));
            }
        }
    }

    public CompletableFuture<ClusterService> start() {
        if (this.started.compareAndSet(false, true)) {
            this.metadataService.addListener(this.metadataEventListener);
            this.localNode.setState(Node.State.ACTIVE);
            this.nodes.put(this.localNode.id(), this.localNode);
            this.metadataService.getMetadata().bootstrapNodes().forEach(node -> {
                this.nodes.putIfAbsent(node.id(), new StatefulNode(node.id(), node.type(), node.endpoint()));
            });
            this.messagingService.registerHandler(HEARTBEAT_MESSAGE, this::handleHeartbeat, this.heartbeatExecutor);
            this.heartbeatFuture = this.heartbeatScheduler.scheduleWithFixedDelay(this::sendHeartbeats, 0L, this.heartbeatInterval, TimeUnit.MILLISECONDS);
            LOGGER.info("Started");
        }
        return CompletableFuture.completedFuture(this);
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<Void> stop() {
        if (this.started.compareAndSet(true, false)) {
            this.heartbeatScheduler.shutdownNow();
            this.heartbeatExecutor.shutdownNow();
            this.localNode.setState(Node.State.INACTIVE);
            this.nodes.clear();
            this.heartbeatFuture.cancel(true);
            this.messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
            this.metadataService.removeListener(this.metadataEventListener);
            LOGGER.info("Stopped");
        }
        return CompletableFuture.completedFuture(null);
    }
}
