package org.eclipse.ditto.internal.utils.pubsub.actors;

import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.AbstractActorWithTimers;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pubsub.DistributedAcks;
import org.eclipse.ditto.internal.utils.pubsub.api.LocalAcksChanged;
import org.eclipse.ditto.internal.utils.pubsub.api.PublishSignal;
import org.eclipse.ditto.internal.utils.pubsub.ddata.SubscriptionsReader;
import org.eclipse.ditto.internal.utils.pubsub.ddata.ack.GroupedSnapshot;
import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor;
import org.eclipse.ditto.internal.utils.pubsub.extractors.PubSubTopicExtractor;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/AbstractSubscriber.class */
abstract class AbstractSubscriber<T extends Signal<?>> extends AbstractActorWithTimers {
    final Class<T> messageClass;
    final PubSubTopicExtractor<T> topicExtractor;
    final AckExtractor<T> ackExtractor;
    final DistributedAcks distributedAcks;
    final DittoDiagnosticLoggingAdapter logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

    @Nullable
    ActorRef ackUpdater = null;

    @Nullable
    ActorRef subUpdater = null;
    private final Counter truePositiveCounter = DittoMetrics.counter("pubsub-true-positive");
    private final Counter falsePositiveCounter = DittoMetrics.counter("pubsub-false-positive");
    private final Counter receivedMessagesCounter = DittoMetrics.counter("pubsub-received-messages");
    private PublisherIndex<String> publisherIndex = PublisherIndex.empty();
    private GroupedSnapshot<ActorRef, String> declaredAcks = GroupedSnapshot.empty();

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractSubscriber(Class<T> cls, PubSubTopicExtractor<T> pubSubTopicExtractor, AckExtractor<T> ackExtractor, DistributedAcks distributedAcks) {
        this.messageClass = cls;
        this.topicExtractor = pubSubTopicExtractor;
        this.ackExtractor = ackExtractor;
        this.distributedAcks = distributedAcks;
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(PublishSignal.class, this::broadcastToLocalSubscribers).match(SubscriptionsReader.class, this::updateLocalSubscriptions).match(LocalAcksChanged.class, this::updateLocalAcks).build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void broadcastToLocalSubscribers(PublishSignal publishSignal) {
        this.receivedMessagesCounter.increment();
        T cast = this.messageClass.cast(publishSignal.getSignal());
        Set<ActorRef> set = (Set) this.publisherIndex.assignGroupsToSubscribers(cast, this.topicExtractor.getTopics(cast), publishSignal.getGroups(), publishSignal.getGroupIndexKey()).stream().map((v0) -> {
            return v0.first();
        }).collect(Collectors.toSet());
        if (set.isEmpty()) {
            this.falsePositiveCounter.increment();
        } else {
            this.truePositiveCounter.increment();
            Iterator<ActorRef> it = set.iterator();
            while (it.hasNext()) {
                it.next().tell(cast, getSender());
            }
        }
        replyWeakAck(cast, publishSignal, set);
    }

    private void replyWeakAck(T t, PublishSignal publishSignal, Set<ActorRef> set) {
        Set<String> values = this.declaredAcks.getValues(publishSignal.getGroups().keySet());
        AckExtractor<T> ackExtractor = this.ackExtractor;
        Objects.requireNonNull(values);
        List<AcknowledgementLabel> list = ackExtractor.getDeclaredCustomAcksRequestedBy(t, (v1) -> {
            return r2.contains(v1);
        }).stream().filter(acknowledgementLabel -> {
            return disjoint(set, this.declaredAcks.getKeys(acknowledgementLabel.toString()));
        }).toList();
        if (list.isEmpty()) {
            return;
        }
        Acknowledgements weakAcknowledgements = this.ackExtractor.toWeakAcknowledgements(t, list);
        String str = (String) weakAcknowledgements.getDittoHeaders().get(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey());
        if (null != str) {
            getContext().actorSelection(str).tell(weakAcknowledgements, ActorRef.noSender());
        } else {
            this.logger.withCorrelationId(weakAcknowledgements).error("Issuing weak Acknowledgements to acknowledgement aggregator failed because ackgregator address was missing from headers: {}", weakAcknowledgements.getDittoHeaders());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateLocalSubscriptions(SubscriptionsReader subscriptionsReader) {
        this.publisherIndex = PublisherIndex.fromSubscriptionsReader(subscriptionsReader);
        this.subUpdater = getSender();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateLocalAcks(LocalAcksChanged localAcksChanged) {
        this.declaredAcks = localAcksChanged.getSnapshot();
        this.ackUpdater = getSender();
    }

    private static <T> boolean disjoint(Set<T> set, Set<T> set2) {
        Set<T> set3;
        Set<T> set4;
        if (set.size() < set2.size()) {
            set3 = set;
            set4 = set2;
        } else {
            set3 = set2;
            set4 = set;
        }
        Stream<T> stream = set3.stream();
        Set<T> set5 = set4;
        Objects.requireNonNull(set5);
        return stream.noneMatch(set5::contains);
    }
}
