package org.eclipse.ditto.internal.utils.pubsub.actors;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.cluster.Cluster;
import akka.japi.pf.ReceiveBuilder;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.signals.SignalWithEntityId;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.pubsub.DistributedAcks;
import org.eclipse.ditto.internal.utils.pubsub.api.LocalAcksChanged;
import org.eclipse.ditto.internal.utils.pubsub.api.PublishSignal;
import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.internal.utils.pubsub.ddata.SubscriptionsReader;
import org.eclipse.ditto.internal.utils.pubsub.ddata.ack.GroupedSnapshot;
import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor;
import org.eclipse.ditto.internal.utils.pubsub.extractors.PubSubTopicExtractor;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/Subscriber.class */
public final class Subscriber<T extends SignalWithEntityId<?>> extends AbstractActorWithTimers {
    public static final String ACTOR_NAME_PREFIX = "subscriber";
    private final Class<T> messageClass;
    private final PubSubTopicExtractor<T> topicExtractor;
    private final AckExtractor<T> ackExtractor;
    private final DistributedAcks distributedAcks;
    private final Counter truePositiveCounter = DittoMetrics.counter("pubsub-true-positive");
    private final Counter falsePositiveCounter = DittoMetrics.counter("pubsub-false-positive");
    private final Counter receivedMessagesCounter = DittoMetrics.counter("pubsub-received-messages");
    private final DittoDiagnosticLoggingAdapter logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private PublisherIndex<String> publisherIndex = PublisherIndex.empty();
    private GroupedSnapshot<ActorRef, String> declaredAcks = GroupedSnapshot.empty();

    @Nullable
    private ActorRef ackUpdater = null;

    @Nullable
    private ActorRef subUpdater = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/Subscriber$Control.class */
    public enum Control {
        RECEIVE_LOCAL_DECLARED_ACKS
    }

    private Subscriber(Class<T> cls, PubSubTopicExtractor<T> pubSubTopicExtractor, AckExtractor<T> ackExtractor, DistributedAcks distributedAcks) {
        this.messageClass = cls;
        this.topicExtractor = pubSubTopicExtractor;
        this.ackExtractor = ackExtractor;
        this.distributedAcks = distributedAcks;
        distributedAcks.receiveLocalDeclaredAcks(getSelf());
    }

    public static <T> Props props(Class<T> cls, PubSubTopicExtractor<T> pubSubTopicExtractor, AckExtractor<T> ackExtractor, DistributedAcks distributedAcks) {
        return Props.create(Subscriber.class, new Object[]{cls, pubSubTopicExtractor, ackExtractor, distributedAcks});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(PublishSignal.class, this::broadcastToLocalSubscribers).match(SubscriptionsReader.class, this::updateLocalSubscriptions).match(LocalAcksChanged.class, this::updateLocalAcks).match(Terminated.class, this::terminated).matchEquals(ActorEvent.ACK_UPDATER_NOT_AVAILABLE, this::scheduleReceiveLocalDeclaredAcks).matchEquals(Control.RECEIVE_LOCAL_DECLARED_ACKS, this::receiveLocalDeclaredAcks).build();
    }

    private void scheduleReceiveLocalDeclaredAcks(ActorEvent actorEvent) {
        if (timers().isTimerActive(Control.RECEIVE_LOCAL_DECLARED_ACKS)) {
            return;
        }
        timers().startSingleTimer(Control.RECEIVE_LOCAL_DECLARED_ACKS, Control.RECEIVE_LOCAL_DECLARED_ACKS, getRestartDelayWithBuffer());
    }

    private void receiveLocalDeclaredAcks(Control control) {
        this.distributedAcks.receiveLocalDeclaredAcks(getSelf());
    }

    private void terminated(Terminated terminated) {
        if (Cluster.get(getContext().getSystem()).isTerminated()) {
            this.logger.info("This cluster instance was terminated - no action required ..");
        } else if (terminated.getActor().equals(this.ackUpdater)) {
            this.logger.error("Notifying SubUpdater <{}> of AckUpdater termination: <{}>", this.subUpdater, terminated);
            if (this.subUpdater != null) {
                this.subUpdater.tell(ActorEvent.PUBSUB_TERMINATED, getSelf());
            }
            scheduleReceiveLocalDeclaredAcks(ActorEvent.ACK_UPDATER_NOT_AVAILABLE);
        }
    }

    private void broadcastToLocalSubscribers(PublishSignal publishSignal) {
        this.receivedMessagesCounter.increment();
        T cast = this.messageClass.cast(publishSignal.getSignal());
        Set<ActorRef> set = (Set) this.publisherIndex.assignGroupsToSubscribers(cast, this.topicExtractor.getTopics(cast), publishSignal.getGroups()).stream().map((v0) -> {
            return v0.first();
        }).collect(Collectors.toSet());
        if (set.isEmpty()) {
            this.falsePositiveCounter.increment();
        } else {
            this.truePositiveCounter.increment();
            Iterator<ActorRef> it = set.iterator();
            while (it.hasNext()) {
                it.next().tell(cast, getSender());
            }
        }
        replyWeakAck(cast, publishSignal, set, getSender());
    }

    private void replyWeakAck(T t, PublishSignal publishSignal, Set<ActorRef> set, ActorRef actorRef) {
        Set<String> values = this.declaredAcks.getValues(publishSignal.getGroups().keySet());
        AckExtractor<T> ackExtractor = this.ackExtractor;
        Objects.requireNonNull(values);
        Collection<AcknowledgementLabel> collection = (Collection) ackExtractor.getDeclaredCustomAcksRequestedBy(t, (v1) -> {
            return r2.contains(v1);
        }).stream().filter(acknowledgementLabel -> {
            return disjoint(set, this.declaredAcks.getKeys(acknowledgementLabel.toString()));
        }).collect(Collectors.toList());
        if (collection.isEmpty()) {
            return;
        }
        actorRef.tell(this.ackExtractor.toWeakAcknowledgements(t, collection), ActorRef.noSender());
    }

    private void updateLocalSubscriptions(SubscriptionsReader subscriptionsReader) {
        this.publisherIndex = PublisherIndex.fromSubscriptionsReader(subscriptionsReader);
        this.subUpdater = getSender();
    }

    private void updateLocalAcks(LocalAcksChanged localAcksChanged) {
        this.declaredAcks = localAcksChanged.getSnapshot();
        this.ackUpdater = getSender();
        getContext().watch(this.ackUpdater);
    }

    private Duration getRestartDelayWithBuffer() {
        Duration restartDelay = PubSubConfig.of(getContext().getSystem()).getRestartDelay();
        return restartDelay.plus(restartDelay.dividedBy(4L));
    }

    private static <T> boolean disjoint(Set<T> set, Set<T> set2) {
        Set<T> set3;
        Set<T> set4;
        if (set.size() < set2.size()) {
            set3 = set;
            set4 = set2;
        } else {
            set3 = set2;
            set4 = set;
        }
        Stream<T> stream = set3.stream();
        Set<T> set5 = set4;
        Objects.requireNonNull(set5);
        return stream.noneMatch(set5::contains);
    }
}
