package org.eclipse.ditto.services.utils.pubsub.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.cluster.ddata.Replicator;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.services.utils.pubsub.actors.AbstractUpdater;
import org.eclipse.ditto.services.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.services.utils.pubsub.ddata.DData;
import org.eclipse.ditto.services.utils.pubsub.ddata.DDataWriter;
import org.eclipse.ditto.services.utils.pubsub.ddata.Subscriptions;
import org.eclipse.ditto.services.utils.pubsub.ddata.literal.LiteralUpdate;
import scala.collection.immutable.Set;
import scala.jdk.javaapi.CollectionConverters;

/* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/AcksUpdater.class */
public final class AcksUpdater extends AbstractUpdater<LiteralUpdate, Map<ActorRef, Set<String>>> implements ClusterMemberRemovedAware {
    public static final String ACTOR_NAME_PREFIX = "acksUpdater";
    private final DData<String, LiteralUpdate> acksDData;

    @Nullable
    private Map<ActorRef, Set<String>> mmap;

    private AcksUpdater(PubSubConfig pubSubConfig, ActorRef actorRef, DData<String, LiteralUpdate> dData) {
        super(ACTOR_NAME_PREFIX, pubSubConfig, actorRef, dData.createSubscriptions(), dData.getWriter());
        this.acksDData = dData;
        subscribeForClusterMemberRemovedAware();
        dData.getReader().receiveChanges(getSelf());
    }

    public static Props props(PubSubConfig pubSubConfig, ActorRef actorRef, DData<String, LiteralUpdate> dData) {
        return Props.create(AcksUpdater.class, new Object[]{pubSubConfig, actorRef, dData});
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.AbstractUpdater
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Replicator.Changed.class, this::onChanged).build().orElse(receiveClusterMemberRemoved()).orElse(super.createReceive());
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.ClusterMemberRemovedAware
    public LoggingAdapter log() {
        return this.log;
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.ClusterMemberRemovedAware
    public DDataWriter<?> getDDataWriter() {
        return this.acksDData.getWriter();
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.AbstractUpdater
    protected void subscribe(AbstractUpdater.Subscribe subscribe) {
        ActorRef sender = getSender();
        if (areAckLabelsDeclaredHere(subscribe) || areTopicsClaimedRemotely(subscribe)) {
            failSubscribe(sender);
            return;
        }
        if (this.subscriptions.subscribe(subscribe.getSubscriber(), subscribe.getTopics())) {
            getContext().watch(subscribe.getSubscriber());
        }
        getSender().tell(AbstractUpdater.SubAck.of(subscribe, sender, 0), getSelf());
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.AbstractUpdater
    protected void unsubscribe(AbstractUpdater.Unsubscribe unsubscribe) {
        this.log.warning("Got unexpected <{}>", unsubscribe);
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.AbstractUpdater
    protected void tick(AbstractUpdater.Clock clock) {
        writeLocalDData();
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.AbstractUpdater
    protected void ddataOpSuccess(AbstractUpdater.DDataOpSuccess<Map<ActorRef, Set<String>>> dDataOpSuccess) {
        this.log.warning("Unexpected DDataOpSuccess: sn=<{}> wc=<{}> payload=<{}>", Integer.valueOf(dDataOpSuccess.seqNr), dDataOpSuccess.writeConsistency, dDataOpSuccess.payload);
    }

    private void onChanged(Replicator.Changed<?> changed) {
        this.mmap = CollectionConverters.asJava(changed.get(this.acksDData.getReader().getKey()).entries());
        java.util.Set<ActorRef> localLosers = getLocalLosers(this.mmap);
        localLosers.forEach(this::failSubscribe);
        Subscriptions<T> subscriptions = this.subscriptions;
        Objects.requireNonNull(subscriptions);
        localLosers.forEach(subscriptions::removeSubscriber);
    }

    private void writeLocalDData() {
        this.acksDData.getWriter().put(this.subscriber, (LiteralUpdate) this.subscriptions.export(true), Replicator.writeLocal()).whenComplete((r5, th) -> {
            if (th != null) {
                this.log.error(th, "Failed to update local DData");
            }
        });
    }

    private void failSubscribe(ActorRef actorRef) {
        actorRef.tell(AcknowledgementLabelNotUniqueException.getInstance(), getSelf());
    }

    private boolean areAckLabelsDeclaredHere(AbstractUpdater.Subscribe subscribe) {
        Stream<String> stream = subscribe.getTopics().stream();
        Subscriptions<T> subscriptions = this.subscriptions;
        Objects.requireNonNull(subscriptions);
        return stream.flatMap(subscriptions::streamSubscribers).anyMatch(actorRef -> {
            return !actorRef.equals(subscribe.getSubscriber());
        });
    }

    private java.util.Set<ActorRef> getLocalLosers(Map<ActorRef, Set<String>> map) {
        return (java.util.Set) map.entrySet().stream().filter(entry -> {
            return isSmallerThanMySubscriber((ActorRef) entry.getKey());
        }).flatMap(entry2 -> {
            Stream stream = CollectionConverters.asJava((scala.collection.Set) entry2.getValue()).stream();
            Subscriptions<T> subscriptions = this.subscriptions;
            Objects.requireNonNull(subscriptions);
            return stream.flatMap(subscriptions::streamSubscribers);
        }).collect(Collectors.toSet());
    }

    private boolean areTopicsClaimedRemotely(AbstractUpdater.Subscribe subscribe) {
        if (this.mmap != null) {
            return this.mmap.entrySet().stream().anyMatch(entry -> {
                if (!this.subscriber.equals(entry.getKey())) {
                    Stream<String> stream = subscribe.getTopics().stream();
                    Set set = (Set) entry.getValue();
                    Objects.requireNonNull(set);
                    if (stream.anyMatch((v1) -> {
                        return r1.contains(v1);
                    })) {
                        return true;
                    }
                }
                return false;
            });
        }
        return false;
    }

    private boolean isSmallerThanMySubscriber(ActorRef actorRef) {
        return actorRef.compareTo(this.subscriber) < 0;
    }
}
