package org.eclipse.ditto.services.utils.pubsub.actors;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.cluster.ddata.Replicator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.eclipse.ditto.services.utils.pubsub.actors.AbstractUpdater;
import org.eclipse.ditto.services.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.services.utils.pubsub.ddata.DData;
import org.eclipse.ditto.services.utils.pubsub.ddata.DDataWriter;
import org.eclipse.ditto.services.utils.pubsub.ddata.Subscriptions;
import org.eclipse.ditto.services.utils.pubsub.ddata.SubscriptionsReader;

/* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/SubUpdater.class */
public final class SubUpdater<T> extends AbstractUpdater<T, SubscriptionsReader> {
    public static final String ACTOR_NAME_PREFIX = "subUpdater";

    private SubUpdater(PubSubConfig pubSubConfig, ActorRef actorRef, Subscriptions<T> subscriptions, DDataWriter<T> dDataWriter) {
        super(ACTOR_NAME_PREFIX, pubSubConfig, actorRef, subscriptions, dDataWriter);
    }

    public static <T> Props props(PubSubConfig pubSubConfig, ActorRef actorRef, DData<?, T> dData) {
        return Props.create(SubUpdater.class, new Object[]{pubSubConfig, actorRef, dData.createSubscriptions(), dData.getWriter()});
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.AbstractUpdater
    protected void subscribe(AbstractUpdater.Subscribe subscribe) {
        boolean subscribe2 = this.subscriptions.subscribe(subscribe.getSubscriber(), subscribe.getTopics(), subscribe.getFilter());
        enqueueRequest(subscribe, subscribe2, getSender(), this.awaitUpdate, this.awaitUpdateMetric);
        if (subscribe2) {
            getContext().watch(subscribe.getSubscriber());
        }
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.AbstractUpdater
    protected void unsubscribe(AbstractUpdater.Unsubscribe unsubscribe) {
        boolean unsubscribe2 = this.subscriptions.unsubscribe(unsubscribe.getSubscriber(), unsubscribe.getTopics());
        enqueueRequest(unsubscribe, unsubscribe2, getSender(), this.awaitUpdate, this.awaitUpdateMetric);
        if (!unsubscribe2 || this.subscriptions.contains(unsubscribe.getSubscriber())) {
            return;
        }
        getContext().unwatch(unsubscribe.getSubscriber());
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.AbstractUpdater
    protected void ddataOpSuccess(AbstractUpdater.DDataOpSuccess<SubscriptionsReader> dDataOpSuccess) {
        flushSubAcks(dDataOpSuccess.seqNr);
        this.subscriber.tell(dDataOpSuccess.payload, getSelf());
        if (this.awaitSubAck.isEmpty() && this.awaitUpdate.isEmpty()) {
            this.localSubscriptionsChanged = false;
            this.nextWriteConsistency = Replicator.writeLocal();
        }
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.AbstractUpdater
    protected void tick(AbstractUpdater.Clock clock) {
        performDDataOp(forceUpdate(), this.localSubscriptionsChanged, this.nextWriteConsistency).handle(handleDDataWriteResult(getSeqNr(), this.nextWriteConsistency));
        moveAwaitUpdateToAwaitAcknowledge();
    }

    private void flushSubAcks(int i) {
        for (AbstractUpdater.SubAck subAck : exportAwaitSubAck(i)) {
            subAck.getSender().tell(subAck, getSelf());
        }
    }

    private CompletionStage<SubscriptionsReader> performDDataOp(boolean z, boolean z2, Replicator.WriteConsistency writeConsistency) {
        SubscriptionsReader snapshot;
        CompletionStage<Void> put;
        if (!z2 && !z) {
            snapshot = this.subscriptions.snapshot();
            put = CompletableFuture.completedStage(null);
        } else if (this.subscriptions.isEmpty()) {
            snapshot = this.subscriptions.snapshot();
            put = this.topicsWriter.removeSubscriber(this.subscriber, writeConsistency);
            this.topicMetric.set(0L);
        } else {
            T export = this.subscriptions.export(z);
            snapshot = this.subscriptions.snapshot();
            put = this.topicsWriter.put(this.subscriber, export, writeConsistency);
            this.topicMetric.set(Long.valueOf(this.subscriptions.countTopics()));
        }
        SubscriptionsReader subscriptionsReader = snapshot;
        return put.thenApply(r3 -> {
            return subscriptionsReader;
        });
    }
}
