package org.eclipse.ditto.services.utils.pubsub.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.cluster.ddata.Replicator;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.acks.AcknowledgementRequest;
import org.eclipse.ditto.model.base.entity.id.EntityIdWithType;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.services.utils.pubsub.DistributedAcks;
import org.eclipse.ditto.services.utils.pubsub.api.PublishSignal;
import org.eclipse.ditto.services.utils.pubsub.api.RemoteAcksChanged;
import org.eclipse.ditto.services.utils.pubsub.ddata.DDataReader;
import org.eclipse.ditto.services.utils.pubsub.ddata.ack.Grouped;
import org.eclipse.ditto.services.utils.pubsub.extractors.AckExtractor;
import org.eclipse.ditto.signals.acks.base.Acknowledgements;
import org.eclipse.ditto.signals.base.Signal;
import scala.jdk.javaapi.CollectionConverters;

/* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/Publisher.class */
public final class Publisher extends AbstractActor {
    public static final String ACTOR_NAME_PREFIX = "publisher";
    private final DDataReader<ActorRef, String> ddataReader;
    private final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
    private final Counter messageCounter = DittoMetrics.counter("pubsub-published-messages");
    private final Counter topicCounter = DittoMetrics.counter("pubsub-published-topics");
    private PublisherIndex<Long> publisherIndex = PublisherIndex.empty();
    private RemoteAcksChanged remoteAcks = RemoteAcksChanged.of(Map.of());

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/Publisher$Publish.class */
    private static final class Publish implements Request {
        private final Collection<String> topics;
        private final Signal<?> message;

        private Publish(Collection<String> collection, Signal<?> signal) {
            this.topics = collection;
            this.message = signal;
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/Publisher$PublishWithAck.class */
    private static final class PublishWithAck implements Request {
        private static final AckExtractor<PublishWithAck> ACK_EXTRACTOR = AckExtractor.of(publishWithAck -> {
            return publishWithAck.entityId;
        }, publishWithAck2 -> {
            return publishWithAck2.dittoHeaders;
        });
        private final Collection<String> topics;
        private final Signal<?> message;
        private final Set<AcknowledgementRequest> ackRequests;
        private final EntityIdWithType entityId;
        private final DittoHeaders dittoHeaders;

        private PublishWithAck(Collection<String> collection, Signal<?> signal, Set<AcknowledgementRequest> set, EntityIdWithType entityIdWithType, DittoHeaders dittoHeaders) {
            this.topics = collection;
            this.message = signal;
            this.ackRequests = set;
            this.entityId = entityIdWithType;
            this.dittoHeaders = dittoHeaders;
        }

        private Acknowledgements toWeakAcks(Collection<AcknowledgementLabel> collection) {
            return ACK_EXTRACTOR.toWeakAcknowledgements(this, collection);
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/Publisher$Request.class */
    public interface Request {
    }

    private Publisher(DDataReader<ActorRef, String> dDataReader, DistributedAcks distributedAcks) {
        this.ddataReader = dDataReader;
        dDataReader.receiveChanges(getSelf());
        distributedAcks.receiveDistributedDeclaredAcks(getSelf());
    }

    public static <T> Props props(DDataReader<ActorRef, T> dDataReader, DistributedAcks distributedAcks) {
        return Props.create(Publisher.class, new Object[]{dDataReader, distributedAcks});
    }

    public static Request publish(Collection<String> collection, Signal<?> signal) {
        return new Publish(collection, signal);
    }

    public static Request publishWithAck(Collection<String> collection, Signal<?> signal, Set<AcknowledgementRequest> set, EntityIdWithType entityIdWithType, DittoHeaders dittoHeaders) {
        return new PublishWithAck(collection, signal, set, entityIdWithType, dittoHeaders);
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Publish.class, this::publish).match(PublishWithAck.class, this::publishWithAck).match(RemoteAcksChanged.class, this::declaredAcksChanged).match(Replicator.Changed.class, this::topicSubscribersChanged).matchAny(this::logUnhandled).build();
    }

    private void publish(Publish publish) {
        doPublish(publish.topics, publish.message);
    }

    private void publishWithAck(PublishWithAck publishWithAck) {
        Set set = (Set) doPublish(publishWithAck.topics, publishWithAck.message).stream().flatMap(pair -> {
            ActorRef actorRef = (ActorRef) pair.first();
            return this.remoteAcks.streamDeclaredAcksForGroup(actorRef.path().address(), ((PublishSignal) pair.second()).getGroups().keySet());
        }).collect(Collectors.toSet());
        Set<AcknowledgementRequest> set2 = publishWithAck.ackRequests;
        RemoteAcksChanged remoteAcksChanged = this.remoteAcks;
        Objects.requireNonNull(remoteAcksChanged);
        List list = (List) AckExtractor.getRequestedAndDeclaredCustomAcks(set2, remoteAcksChanged::contains).stream().filter(acknowledgementLabel -> {
            return !set.contains(acknowledgementLabel.toString());
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        getSender().tell(publishWithAck.toWeakAcks(list), ActorRef.noSender());
    }

    private List<Pair<ActorRef, PublishSignal>> doPublish(Collection<String> collection, Signal<?> signal) {
        this.messageCounter.increment();
        this.topicCounter.increment(collection.size());
        Stream<String> stream = collection.stream();
        DDataReader<ActorRef, String> dDataReader = this.ddataReader;
        Objects.requireNonNull(dDataReader);
        List list = (List) stream.map(dDataReader::approximate).collect(Collectors.toList());
        ActorRef sender = getSender();
        List<Pair<ActorRef, PublishSignal>> assignGroupsToSubscribers = this.publisherIndex.assignGroupsToSubscribers(signal, list);
        assignGroupsToSubscribers.forEach(pair -> {
            ((ActorRef) pair.first()).tell(pair.second(), sender);
        });
        return assignGroupsToSubscribers;
    }

    private void declaredAcksChanged(RemoteAcksChanged remoteAcksChanged) {
        this.remoteAcks = remoteAcksChanged;
    }

    private void topicSubscribersChanged(Replicator.Changed<?> changed) {
        this.publisherIndex = PublisherIndex.fromDeserializedMMap((Map) CollectionConverters.asJava(changed.get(this.ddataReader.getKey()).entries()).entrySet().stream().map(entry -> {
            return Pair.create((ActorRef) entry.getKey(), deserializeGroupedHashes((scala.collection.immutable.Set) entry.getValue()));
        }).collect(Collectors.toMap((v0) -> {
            return v0.first();
        }, (v0) -> {
            return v0.second();
        })));
    }

    private void logUnhandled(Object obj) {
        this.log.warning("Unhandled: <{}>", obj);
    }

    private static List<Grouped<Long>> deserializeGroupedHashes(scala.collection.immutable.Set<String> set) {
        return (List) CollectionConverters.asJava(set).stream().map(str -> {
            return Grouped.fromJson(JsonObject.of(str), (v0) -> {
                return v0.asLong();
            });
        }).collect(Collectors.toList());
    }
}
