package org.eclipse.ditto.internal.utils.pubsub.actors;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Terminated;
import akka.cluster.Cluster;
import akka.cluster.ddata.Replicator;
import akka.japi.pf.ReceiveBuilder;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.eclipse.ditto.base.model.acks.PubSubTerminatedException;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.Gauge;
import org.eclipse.ditto.internal.utils.pubsub.api.RemoveSubscriber;
import org.eclipse.ditto.internal.utils.pubsub.api.Request;
import org.eclipse.ditto.internal.utils.pubsub.api.SubAck;
import org.eclipse.ditto.internal.utils.pubsub.api.Subscribe;
import org.eclipse.ditto.internal.utils.pubsub.api.Unsubscribe;
import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DData;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DDataUpdate;
import org.eclipse.ditto.internal.utils.pubsub.ddata.Subscriptions;
import org.eclipse.ditto.internal.utils.pubsub.ddata.SubscriptionsReader;
import org.eclipse.ditto.internal.utils.pubsub.ddata.compressed.CompressedDData;
import org.eclipse.ditto.internal.utils.pubsub.ddata.compressed.CompressedSubscriptions;
import org.eclipse.ditto.internal.utils.pubsub.ddata.literal.LiteralUpdate;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/SubUpdater.class */
public final class SubUpdater extends AbstractActorWithTimers implements ClusterStateSyncBehavior<ActorRef> {
    public static final String ACTOR_NAME_PREFIX = "subUpdater";
    public static final int MAX_ERROR_COUNTER = 3;
    private final Subscriptions<LiteralUpdate> subscriptions;
    private final ActorRef subscriber;
    private final Gauge topicSizeMetric;
    private final Gauge awaitUpdateMetric;
    private final Gauge awaitSubAckMetric;
    private final DData<ActorRef, ?, LiteralUpdate> ddata;
    private final double resetProbability;
    private final Replicator.WriteConsistency writeConsistency;
    private final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
    private final List<SubAck> awaitUpdate = new ArrayList();
    private final Queue<SubAck> awaitSubAck = new ArrayDeque();
    private int seqNr = 0;
    private LiteralUpdate previousUpdate = LiteralUpdate.empty();
    private int errorCounter = 0;
    private final Cluster cluster = Cluster.get(getContext().getSystem());

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/SubUpdater$Clock.class */
    private enum Clock {
        TICK
    }

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/SubUpdater$DDataOpSuccess.class */
    private static final class DDataOpSuccess<P> {
        private final P payload;
        private final int seqNr;

        private DDataOpSuccess(P p, int i) {
            this.payload = p;
            this.seqNr = i;
        }
    }

    private SubUpdater(PubSubConfig pubSubConfig, ActorRef actorRef, Subscriptions<LiteralUpdate> subscriptions, DData<ActorRef, ?, LiteralUpdate> dData) {
        this.subscriber = actorRef;
        this.subscriptions = subscriptions;
        this.ddata = dData;
        this.resetProbability = pubSubConfig.getResetProbability();
        this.writeConsistency = dData.getConfig().getSubscriptionWriteConsistency();
        String str = getContext().getParent().path().name() + "/subUpdater";
        this.topicSizeMetric = DittoMetrics.gauge("pubsub-topics-size-bytes").tag("name", str);
        this.awaitUpdateMetric = DittoMetrics.gauge("pubsub-await-update").tag("name", str);
        this.awaitSubAckMetric = DittoMetrics.gauge("pubsub-await-acknowledge").tag("name", str);
        getTimers().startTimerAtFixedRate(Clock.TICK, Clock.TICK, pubSubConfig.getUpdateInterval());
        scheduleClusterStateSync(pubSubConfig);
    }

    public static Props props(PubSubConfig pubSubConfig, ActorRef actorRef, CompressedDData compressedDData) {
        return Props.create(SubUpdater.class, new Object[]{pubSubConfig, actorRef, CompressedSubscriptions.of(compressedDData.getSeeds()), compressedDData});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Subscribe.class, this::subscribe).match(Unsubscribe.class, this::unsubscribe).match(Terminated.class, this::terminated).match(RemoveSubscriber.class, this::removeSubscriber).matchEquals(Clock.TICK, this::tick).match(DDataOpSuccess.class, this::ddataOpSuccess).match(Status.Failure.class, this::updateFailure).matchEquals(ActorEvent.PUBSUB_TERMINATED, this::pubSubTerminated).build().orElse(getClusterStateSyncBehavior()).orElse(ReceiveBuilder.create().matchAny(this::logUnhandled).build());
    }

    private void subscribe(Subscribe subscribe) {
        boolean subscribe2 = this.subscriptions.subscribe(subscribe.getSubscriber(), subscribe.getTopics(), subscribe.getFilter(), subscribe.getGroup().orElse(null));
        enqueueRequest(subscribe, getSender(), this.awaitUpdate, this.awaitUpdateMetric, checkForLostSubscriber(subscribe, subscribe2));
        if (subscribe2) {
            getContext().watch(subscribe.getSubscriber());
        }
    }

    private void unsubscribe(Unsubscribe unsubscribe) {
        boolean unsubscribe2 = this.subscriptions.unsubscribe(unsubscribe.getSubscriber(), unsubscribe.getTopics());
        enqueueRequest(unsubscribe, getSender(), this.awaitUpdate, this.awaitUpdateMetric, true);
        if (!unsubscribe2 || this.subscriptions.contains(unsubscribe.getSubscriber())) {
            return;
        }
        getContext().unwatch(unsubscribe.getSubscriber());
    }

    private boolean checkForLostSubscriber(Subscribe subscribe, boolean z) {
        if (subscribe.isResubscribe() && z) {
            mo8log().error("[RESUB] Subscriber was missing: <{}>", subscribe.getSubscriber());
            return false;
        }
        if (!subscribe.isResubscribe()) {
            return true;
        }
        mo8log().debug("[RESUB] Refreshed subscriber <{}>", subscribe.getSubscriber());
        return true;
    }

    private void ddataOpSuccess(DDataOpSuccess<SubscriptionsReader> dDataOpSuccess) {
        mo8log().debug("DDataOp success seqNr=<{}>", Integer.valueOf(((DDataOpSuccess) dDataOpSuccess).seqNr));
        this.errorCounter = 0;
        this.subscriber.tell(((DDataOpSuccess) dDataOpSuccess).payload, getSelf());
        flushSubAcks(((DDataOpSuccess) dDataOpSuccess).seqNr);
    }

    private void tick(Clock clock) {
        performDDataOp(this.writeConsistency).handle(handleDDataWriteResult(getSeqNr()));
        moveAwaitUpdateToAwaitAcknowledge();
    }

    private void flushSubAcks(int i) {
        for (SubAck subAck : exportAwaitSubAck(i)) {
            subAck.getSender().tell(subAck, getSelf());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private CompletionStage<SubscriptionsReader> performDDataOp(Replicator.WriteConsistency writeConsistency) {
        CompletionStage<Void> reset;
        SubscriptionsReader snapshot;
        mo8log().debug("Tick seq=<{}> empty=<{}> writeConsistency=<{}>", Integer.valueOf(this.seqNr), Boolean.valueOf(this.subscriptions.isEmpty()), writeConsistency);
        if ((this.previousUpdate.isEmpty() && !this.subscriptions.isEmpty()) || (this.resetProbability > 0.0d && Math.random() < this.resetProbability)) {
            mo8log().debug("Resetting ddata topics: <{}>", getSelf());
            LiteralUpdate export = this.subscriptions.export();
            reset = this.ddata.getWriter().reset(this.subscriber, export, writeConsistency);
            snapshot = this.subscriptions.snapshot();
            this.previousUpdate = export;
            this.topicSizeMetric.set(Long.valueOf(this.subscriptions.estimateSize()));
        } else if (this.subscriptions.isEmpty()) {
            reset = this.ddata.getWriter().removeSubscriber(this.subscriber, writeConsistency);
            snapshot = this.subscriptions.snapshot();
            this.previousUpdate = LiteralUpdate.empty();
            this.topicSizeMetric.set(0L);
        } else {
            LiteralUpdate export2 = this.subscriptions.export();
            snapshot = this.subscriptions.snapshot();
            DDataUpdate<String> diff2 = export2.diff2((DDataUpdate<String>) this.previousUpdate);
            mo8log().debug("diff.isEmpty=<{}>", Boolean.valueOf(diff2.isEmpty()));
            reset = !diff2.isEmpty() ? this.ddata.getWriter().put(this.subscriber, export2.diff2((DDataUpdate<String>) this.previousUpdate), writeConsistency) : CompletableFuture.completedStage(null);
            this.previousUpdate = export2;
            this.topicSizeMetric.set(Long.valueOf(this.subscriptions.estimateSize()));
        }
        SubscriptionsReader subscriptionsReader = snapshot;
        return reset.thenApply(r3 -> {
            return subscriptionsReader;
        });
    }

    private void updateFailure(Status.Failure failure) {
        this.errorCounter++;
        if (this.errorCounter > 3) {
            this.log.error(failure.cause(), "Failure updating Ditto pub/sub subscription - trying again next clock tick");
        } else {
            this.log.warning("Failure updating Ditto pub/sub subscription - trying again next clock tick");
        }
        this.previousUpdate = LiteralUpdate.empty();
    }

    private void enqueueRequest(Request request, ActorRef actorRef, Collection<SubAck> collection, Gauge gauge, boolean z) {
        if (request.shouldAcknowledge()) {
            int i = this.seqNr + 1;
            this.seqNr = i;
            collection.add(SubAck.of(request, actorRef, i, z));
            gauge.increment();
        }
    }

    private int getSeqNr() {
        return this.seqNr;
    }

    private List<SubAck> exportAwaitSubAck(int i) {
        ArrayList arrayList = new ArrayList(this.awaitSubAck.size());
        while (!this.awaitSubAck.isEmpty()) {
            SubAck poll = this.awaitSubAck.poll();
            arrayList.add(poll);
            if (poll.getSeqNr() == i) {
                break;
            }
        }
        this.awaitSubAckMetric.set(Long.valueOf(this.awaitSubAck.size()));
        return Collections.unmodifiableList(arrayList);
    }

    private void moveAwaitUpdateToAwaitAcknowledge() {
        if (this.awaitUpdate.isEmpty()) {
            return;
        }
        this.awaitSubAck.addAll(this.awaitUpdate);
        this.awaitUpdate.clear();
        this.awaitSubAckMetric.set(Long.valueOf(this.awaitSubAck.size()));
        this.awaitUpdateMetric.set(0L);
    }

    private BiFunction<SubscriptionsReader, Throwable, Void> handleDDataWriteResult(int i) {
        return (subscriptionsReader, th) -> {
            if (th != null) {
                getSelf().tell(new Status.Failure(th), ActorRef.noSender());
                return null;
            }
            if (subscriptionsReader == null) {
                return null;
            }
            getSelf().tell(new DDataOpSuccess(subscriptionsReader, i), ActorRef.noSender());
            return null;
        };
    }

    private void logUnhandled(Object obj) {
        this.log.warning("Unhandled: <{}>", obj);
    }

    private void terminated(Terminated terminated) {
        doRemoveSubscriber(terminated.actor());
    }

    private void removeSubscriber(RemoveSubscriber removeSubscriber) {
        doRemoveSubscriber(removeSubscriber.getSubscriber());
    }

    private void doRemoveSubscriber(ActorRef actorRef) {
        this.subscriptions.removeSubscriber(actorRef);
        getContext().unwatch(actorRef);
    }

    private void pubSubTerminated(ActorEvent actorEvent) {
        HashSet hashSet = new HashSet();
        Stream.concat(Stream.concat(this.awaitUpdate.stream(), this.awaitSubAck.stream()).map((v0) -> {
            return v0.getSender();
        }), this.subscriptions.getSubscribers().stream()).forEach(actorRef -> {
            if (hashSet.add(actorRef)) {
                actorRef.tell(PubSubTerminatedException.getInstance(), getSelf());
            }
        });
        this.subscriptions.clear();
        this.awaitUpdate.clear();
        this.awaitSubAck.clear();
    }

    @Override // org.eclipse.ditto.internal.utils.pubsub.actors.ClusterStateSyncBehavior
    public Cluster getCluster() {
        return this.cluster;
    }

    @Override // org.eclipse.ditto.internal.utils.pubsub.actors.ClusterStateSyncBehavior
    public Address toAddress(ActorRef actorRef) {
        return actorRef.path().address();
    }

    @Override // org.eclipse.ditto.internal.utils.pubsub.actors.ClusterStateSyncBehavior
    /* renamed from: log */
    public ThreadSafeDittoLoggingAdapter mo8log() {
        return this.log;
    }

    @Override // org.eclipse.ditto.internal.utils.pubsub.actors.ClusterStateSyncBehavior
    public DData<ActorRef, ?, ?> getDData() {
        return this.ddata;
    }

    @Override // org.eclipse.ditto.internal.utils.pubsub.actors.ClusterStateSyncBehavior
    public void verifyNoDDataForCurrentMember() {
        if (this.subscriptions.isEmpty()) {
            return;
        }
        this.previousUpdate = LiteralUpdate.empty();
    }
}
