package org.eclipse.ditto.internal.utils.pubsub.actors;

import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.Actor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Address;
import org.apache.pekko.actor.Timers;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.ClusterEvent;
import org.apache.pekko.cluster.Member;
import org.apache.pekko.cluster.MemberStatus;
import org.apache.pekko.cluster.ddata.ORMultiMap;
import org.apache.pekko.cluster.ddata.Replicator;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DData;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DDataWriter;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/ClusterStateSyncBehavior.class */
interface ClusterStateSyncBehavior<T> extends Actor, Timers {

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/ClusterStateSyncBehavior$Control.class */
    public enum Control {
        SYNC_CLUSTER_STATE
    }

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/ClusterStateSyncBehavior$SyncError.class */
    public static final class SyncError {
        private final Throwable error;

        private SyncError(Throwable th) {
            this.error = th;
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/ClusterStateSyncBehavior$SyncResult.class */
    public static final class SyncResult {
        private final boolean myAddressMissing;
        private final Set<Address> staleAddresses;

        private SyncResult(boolean z, Set<Address> set) {
            this.myAddressMissing = z;
            this.staleAddresses = set;
        }

        private boolean isInSync() {
            return !this.myAddressMissing && this.staleAddresses.isEmpty();
        }

        public String toString() {
            return getClass().getSimpleName() + " [myAddressMissing=" + this.myAddressMissing + ", staleAddresses=" + this.staleAddresses + "]";
        }
    }

    Cluster getCluster();

    Address toAddress(T t);

    ThreadSafeDittoLoggingAdapter log();

    DData<T, ?, ?> getDData();

    void verifyNoDDataForCurrentMember();

    default void scheduleClusterStateSync(PubSubConfig pubSubConfig) {
        Duration syncInterval = pubSubConfig.getSyncInterval();
        Duration plus = syncInterval.plus(Duration.ofMillis((long) (Math.random() * syncInterval.toMillis())));
        log().info("Scheduling cluster state sync at <{}> interval (min=<{}>)", plus, syncInterval);
        Control control = Control.SYNC_CLUSTER_STATE;
        timers().startTimerWithFixedDelay(control, control, plus);
    }

    default AbstractActor.Receive getClusterStateSyncBehavior() {
        return ReceiveBuilder.create().matchEquals(Control.SYNC_CLUSTER_STATE, this::syncClusterState).match(SyncResult.class, this::handleSyncResult).match(SyncError.class, this::handleSyncError).build();
    }

    default void syncClusterState(Control control) {
        log().info("Start to sync cluster state");
        ActorRef self = self();
        Patterns.pipe(getDData().getReader().getAllShards(Replicator.readLocal()).thenApply(list -> {
            return checkClusterState(list, self);
        }).handle((syncResult, th) -> {
            return syncResult != null ? syncResult : new SyncError(th);
        }), context().dispatcher()).to(self());
    }

    default void handleSyncError(SyncError syncError) {
        log().error(syncError.error, "Failed to sync cluster state");
    }

    default void handleSyncResult(SyncResult syncResult) {
        if (syncResult.isInSync()) {
            log().info("DData is in sync with cluster state");
            return;
        }
        log().info("Sync result: <{}>", syncResult);
        if (syncResult.myAddressMissing) {
            log().info("Checking missing info of current member <{}>", getCluster().selfMember());
            verifyNoDDataForCurrentMember();
        }
        if (syncResult.staleAddresses.isEmpty()) {
            return;
        }
        log().warning("Removing stale addresses <{}>", syncResult.staleAddresses);
        removeStaleAddresses(syncResult.staleAddresses);
    }

    default void removeStaleAddresses(Set<Address> set) {
        DDataWriter<T, ?> writer = getDData().getWriter();
        Iterator<Address> it = set.iterator();
        while (it.hasNext()) {
            writer.removeAddress(it.next(), writeLocal());
        }
    }

    default SyncResult checkClusterState(List<? extends ORMultiMap<T, ?>> list, ActorRef actorRef) {
        Set<Address> clusterMemberAddresses = getClusterMemberAddresses(getCluster().state(), actorRef);
        Set<Address> dDataAddresses = getDDataAddresses(list);
        if (isMemberStayingInCluster(getCluster().selfMember())) {
            return new SyncResult((dDataAddresses.contains(getCluster().selfAddress()) || dDataAddresses.contains(actorRef.path().address())) ? false : true, (Set) dDataAddresses.stream().filter(address -> {
                return !clusterMemberAddresses.contains(address);
            }).collect(Collectors.toSet()));
        }
        log().info("This member is leaving the cluster. Skipping sync.");
        return new SyncResult(false, Set.of());
    }

    default Set<Address> getDDataAddresses(List<? extends ORMultiMap<T, ?>> list) {
        return (Set) list.stream().flatMap(oRMultiMap -> {
            return oRMultiMap.getEntries().keySet().stream();
        }).map(this::toAddress).collect(Collectors.toSet());
    }

    default Replicator.WriteConsistency writeLocal() {
        return Replicator.writeLocal();
    }

    static boolean isMemberStayingInCluster(Member member) {
        MemberStatus status = member.status();
        return (status == MemberStatus.leaving() || status == MemberStatus.exiting() || status == MemberStatus.down() || status == MemberStatus.removed()) ? false : true;
    }

    static Set<Address> getClusterMemberAddresses(ClusterEvent.CurrentClusterState currentClusterState, ActorRef actorRef) {
        return (Set) Stream.concat(StreamSupport.stream(currentClusterState.getMembers().spliterator(), false).filter(ClusterStateSyncBehavior::isMemberStayingInCluster).map((v0) -> {
            return v0.address();
        }), Stream.of(actorRef.path().address())).collect(Collectors.toSet());
    }
}
