package org.eclipse.ditto.services.utils.pubsub.actors;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.Terminated;
import akka.cluster.ddata.Replicator;
import akka.japi.pf.ReceiveBuilder;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.gauge.Gauge;
import org.eclipse.ditto.services.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.services.utils.pubsub.ddata.DDataWriter;
import org.eclipse.ditto.services.utils.pubsub.ddata.Subscriptions;

/* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/AbstractUpdater.class */
public abstract class AbstractUpdater<T, P> extends AbstractActorWithTimers {
    protected final PubSubConfig config;
    protected final Subscriptions<T> subscriptions;
    protected final DDataWriter<T> topicsWriter;
    protected final ActorRef subscriber;
    protected final Gauge topicMetric;
    protected final Gauge awaitUpdateMetric;
    protected final Gauge awaitSubAckMetric;
    private final Random random = new Random();
    protected final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
    protected final List<SubAck> awaitUpdate = new ArrayList();
    protected final Queue<SubAck> awaitSubAck = new ArrayDeque();
    protected Replicator.WriteConsistency nextWriteConsistency = Replicator.writeLocal();
    protected boolean localSubscriptionsChanged = false;
    private int seqNr = 0;

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/AbstractUpdater$Clock.class */
    protected enum Clock {
        TICK
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/AbstractUpdater$DDataOpSuccess.class */
    static final class DDataOpSuccess<P> {
        final P payload;
        final int seqNr;
        final Replicator.WriteConsistency writeConsistency;

        DDataOpSuccess(P p, int i, Replicator.WriteConsistency writeConsistency) {
            this.payload = p;
            this.seqNr = i;
            this.writeConsistency = writeConsistency;
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/AbstractUpdater$DeclareAckLabels.class */
    public static final class DeclareAckLabels extends Request {
        private DeclareAckLabels(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z) {
            super(set, actorRef, writeConsistency, z);
        }

        public static DeclareAckLabels of(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z) {
            return new DeclareAckLabels(set, actorRef, writeConsistency, z);
        }

        public Subscribe toSubscribe() {
            return Subscribe.of(getTopics(), getSubscriber(), getWriteConsistency(), shouldAcknowledge());
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/AbstractUpdater$RemoveSubscriber.class */
    public static final class RemoveSubscriber extends Request {
        private final boolean forAcknowledgementLabelDeclaration;

        private RemoveSubscriber(ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z, boolean z2) {
            super(Collections.emptySet(), actorRef, writeConsistency, z);
            this.forAcknowledgementLabelDeclaration = z2;
        }

        public static RemoveSubscriber of(ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z) {
            return new RemoveSubscriber(actorRef, writeConsistency, z, false);
        }

        public RemoveSubscriber forAcknowledgementLabelDeclaration() {
            return new RemoveSubscriber(getSubscriber(), getWriteConsistency(), shouldAcknowledge(), true);
        }

        public boolean isForAcknowledgementLabelDeclaration() {
            return this.forAcknowledgementLabelDeclaration;
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/AbstractUpdater$Request.class */
    public static abstract class Request {
        private final Set<String> topics;
        private final ActorRef subscriber;
        private final Replicator.WriteConsistency writeConsistency;
        private final boolean acknowledge;

        private Request(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z) {
            this.topics = set;
            this.subscriber = actorRef;
            this.writeConsistency = writeConsistency;
            this.acknowledge = z;
        }

        public Set<String> getTopics() {
            return this.topics;
        }

        public ActorRef getSubscriber() {
            return this.subscriber;
        }

        public Replicator.WriteConsistency getWriteConsistency() {
            return this.writeConsistency;
        }

        public boolean shouldAcknowledge() {
            return this.acknowledge;
        }

        public String toString() {
            return getClass().getSimpleName() + "[topics=" + this.topics + ", subscriber=" + this.subscriber + ", writeConsistency=" + this.writeConsistency + ", acknowledge=" + this.acknowledge + "]";
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/AbstractUpdater$SubAck.class */
    public static final class SubAck {
        private final Request request;
        private final ActorRef sender;
        private final int seqNr;

        private SubAck(Request request, ActorRef actorRef, int i) {
            this.request = request;
            this.sender = actorRef;
            this.seqNr = i;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static SubAck of(Request request, ActorRef actorRef, int i) {
            return new SubAck(request, actorRef, i);
        }

        public Request getRequest() {
            return this.request;
        }

        public ActorRef getSender() {
            return this.sender;
        }

        int getSeqNr() {
            return this.seqNr;
        }

        public String toString() {
            return getClass().getSimpleName() + "[request=" + this.request + ",sender=" + this.sender + ",seqNr=" + this.seqNr + "]";
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/AbstractUpdater$Subscribe.class */
    public static final class Subscribe extends Request {
        private static final Predicate<Collection<String>> CONSTANT_TRUE = collection -> {
            return true;
        };
        private final Predicate<Collection<String>> filter;

        private Subscribe(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z, Predicate<Collection<String>> predicate) {
            super(set, actorRef, writeConsistency, z);
            this.filter = predicate;
        }

        public static Subscribe of(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z) {
            return new Subscribe(set, actorRef, writeConsistency, z, CONSTANT_TRUE);
        }

        public static Subscribe of(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z, Predicate<Collection<String>> predicate) {
            return new Subscribe(set, actorRef, writeConsistency, z, predicate);
        }

        public Predicate<Collection<String>> getFilter() {
            return this.filter;
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/AbstractUpdater$Unsubscribe.class */
    public static final class Unsubscribe extends Request {
        private Unsubscribe(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z) {
            super(set, actorRef, writeConsistency, z);
        }

        public static Unsubscribe of(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z) {
            return new Unsubscribe(set, actorRef, writeConsistency, z);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractUpdater(String str, PubSubConfig pubSubConfig, ActorRef actorRef, Subscriptions<T> subscriptions, DDataWriter<T> dDataWriter) {
        this.config = pubSubConfig;
        this.subscriber = actorRef;
        this.subscriptions = subscriptions;
        this.topicsWriter = dDataWriter;
        String str2 = getContext().getParent().path().name() + "/" + str;
        this.topicMetric = DittoMetrics.gauge("pubsub-topics").tag("name", str2);
        this.awaitUpdateMetric = DittoMetrics.gauge("pubsub-await-update").tag("name", str2);
        this.awaitSubAckMetric = DittoMetrics.gauge("pubsub-await-acknowledge").tag("name", str2);
        getTimers().startTimerAtFixedRate(Clock.TICK, Clock.TICK, pubSubConfig.getUpdateInterval());
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Subscribe.class, this::subscribe).match(Unsubscribe.class, this::unsubscribe).match(Terminated.class, this::terminated).match(RemoveSubscriber.class, this::removeSubscriber).matchEquals(Clock.TICK, this::tick).match(DDataOpSuccess.class, this::ddataOpSuccess).match(Status.Failure.class, this::updateFailure).matchAny(this::logUnhandled).build();
    }

    protected abstract void ddataOpSuccess(DDataOpSuccess<P> dDataOpSuccess);

    protected abstract void subscribe(Subscribe subscribe);

    protected abstract void unsubscribe(Unsubscribe unsubscribe);

    protected abstract void tick(Clock clock);

    protected void updateFailure(Status.Failure failure) {
        this.log.error(failure.cause(), "updateFailure");
        this.localSubscriptionsChanged = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean forceUpdate() {
        return this.random.nextDouble() < this.config.getForceUpdateProbability();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void enqueueRequest(Request request, boolean z, ActorRef actorRef, Collection<SubAck> collection, Gauge gauge) {
        this.localSubscriptionsChanged |= z;
        upgradeWriteConsistency(request.getWriteConsistency());
        if (request.shouldAcknowledge()) {
            int i = this.seqNr + 1;
            this.seqNr = i;
            collection.add(SubAck.of(request, actorRef, i));
            gauge.increment();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getSeqNr() {
        return this.seqNr;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<SubAck> exportAwaitSubAck(int i) {
        ArrayList arrayList = new ArrayList(this.awaitSubAck.size());
        while (!this.awaitSubAck.isEmpty()) {
            SubAck poll = this.awaitSubAck.poll();
            arrayList.add(poll);
            if (poll.getSeqNr() == i) {
                break;
            }
        }
        this.awaitSubAckMetric.set(Long.valueOf(this.awaitSubAck.size()));
        return Collections.unmodifiableList(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void moveAwaitUpdateToAwaitAcknowledge() {
        if (this.awaitUpdate.isEmpty()) {
            return;
        }
        this.awaitSubAck.addAll(this.awaitUpdate);
        this.awaitUpdate.clear();
        this.awaitSubAckMetric.set(Long.valueOf(this.awaitSubAck.size()));
        this.awaitUpdateMetric.set(0L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BiFunction<P, Throwable, Void> handleDDataWriteResult(int i, Replicator.WriteConsistency writeConsistency) {
        return (obj, th) -> {
            if (th != null) {
                getSelf().tell(new Status.Failure(th), ActorRef.noSender());
                return null;
            }
            if (obj == null) {
                return null;
            }
            getSelf().tell(new DDataOpSuccess(obj, i, writeConsistency), ActorRef.noSender());
            return null;
        };
    }

    private void logUnhandled(Object obj) {
        this.log.warning("Unhandled: <{}>", obj);
    }

    private void terminated(Terminated terminated) {
        doRemoveSubscriber(terminated.actor());
    }

    private void removeSubscriber(RemoveSubscriber removeSubscriber) {
        doRemoveSubscriber(removeSubscriber.getSubscriber());
    }

    private void doRemoveSubscriber(ActorRef actorRef) {
        this.localSubscriptionsChanged |= this.subscriptions.removeSubscriber(actorRef);
        getContext().unwatch(actorRef);
    }

    private void upgradeWriteConsistency(Replicator.WriteConsistency writeConsistency) {
        if (isMoreConsistent(writeConsistency, this.nextWriteConsistency)) {
            this.nextWriteConsistency = writeConsistency;
        }
    }

    private static boolean isMoreConsistent(Replicator.WriteConsistency writeConsistency, Replicator.WriteConsistency writeConsistency2) {
        return rank(writeConsistency) > rank(writeConsistency2);
    }

    private static int rank(Replicator.WriteConsistency writeConsistency) {
        if (Replicator.writeLocal().equals(writeConsistency)) {
            return Integer.MIN_VALUE;
        }
        if (writeConsistency instanceof Replicator.WriteAll) {
            return Integer.MAX_VALUE;
        }
        if (writeConsistency instanceof Replicator.WriteMajority) {
            return ((Replicator.WriteMajority) writeConsistency).minCap();
        }
        if (writeConsistency instanceof Replicator.WriteTo) {
            return ((Replicator.WriteTo) writeConsistency).n();
        }
        return 0;
    }
}
