package org.eclipse.ditto.services.utils.pubsub.ddata;

import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.cluster.Cluster;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ORMultiMap;
import akka.cluster.ddata.ORMultiMapKey;
import akka.cluster.ddata.Replicator;
import akka.cluster.ddata.SelfUniqueAddress;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.ditto.services.utils.ddata.DistributedData;
import org.eclipse.ditto.services.utils.ddata.DistributedDataConfig;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.gauge.Gauge;
import org.eclipse.ditto.services.utils.pubsub.ddata.IndelUpdate;
import scala.collection.JavaConverters;
import scala.collection.immutable.Set;

/* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/ddata/AbstractDDataHandler.class */
public abstract class AbstractDDataHandler<S, T extends IndelUpdate<S, T>> extends DistributedData<ORMultiMap<ActorRef, S>> implements DDataReader<S>, DDataWriter<T> {
    private final String topicType;
    private final SelfUniqueAddress selfUniqueAddress;
    private final Gauge ddataMetrics;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractDDataHandler(DistributedDataConfig distributedDataConfig, ActorRefFactory actorRefFactory, ActorSystem actorSystem, Executor executor, String str) {
        super(distributedDataConfig, actorRefFactory, executor);
        this.topicType = str;
        this.selfUniqueAddress = SelfUniqueAddress.apply(Cluster.get(actorSystem).selfUniqueAddress());
        this.ddataMetrics = DittoMetrics.gauge("pubsub-ddata-entries").tag("topic", str);
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.ddata.DDataReader
    public abstract S approximate(String str);

    @Override // org.eclipse.ditto.services.utils.pubsub.ddata.DDataReader
    public CompletionStage<Collection<ActorRef>> getSubscribers(Collection<S> collection) {
        return read().thenApply(map -> {
            return (Collection) map.entrySet().stream().filter(entry -> {
                Stream stream = collection.stream();
                Set set = (Set) entry.getValue();
                Objects.requireNonNull(set);
                return stream.anyMatch(set::contains);
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toList());
        });
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.ddata.DDataReader
    public CompletionStage<Map<ActorRef, Set<S>>> read(Replicator.ReadConsistency readConsistency) {
        return get(readConsistency).thenApply(optional -> {
            if (!optional.isPresent()) {
                this.ddataMetrics.set(0L);
                return Map.of();
            }
            ORMultiMap oRMultiMap = (ORMultiMap) optional.get();
            this.ddataMetrics.set(Long.valueOf(oRMultiMap.size()));
            return JavaConverters.mapAsJavaMap(oRMultiMap.entries());
        });
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.ddata.DDataWriter
    public CompletionStage<Void> removeAddress(Address address, Replicator.WriteConsistency writeConsistency) {
        return update(writeConsistency, oRMultiMap -> {
            ORMultiMap oRMultiMap = oRMultiMap;
            for (ActorRef actorRef : oRMultiMap.getEntries().keySet()) {
                if (actorRef.path().address().equals(address)) {
                    oRMultiMap = oRMultiMap.remove(this.selfUniqueAddress, actorRef);
                }
            }
            return oRMultiMap;
        });
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.ddata.DDataWriter
    public CompletionStage<Void> put(ActorRef actorRef, T t, Replicator.WriteConsistency writeConsistency) {
        return t.shouldReplaceAll() ? update(writeConsistency, oRMultiMap -> {
            return oRMultiMap.put(this.selfUniqueAddress, actorRef, t.getInserts());
        }) : update(writeConsistency, oRMultiMap2 -> {
            ORMultiMap oRMultiMap2 = oRMultiMap2;
            Iterator<S> it = t.getInserts().iterator();
            while (it.hasNext()) {
                oRMultiMap2 = oRMultiMap2.addBinding(this.selfUniqueAddress, actorRef, it.next());
            }
            Iterator<S> it2 = t.getDeletes().iterator();
            while (it2.hasNext()) {
                oRMultiMap2 = oRMultiMap2.removeBinding(this.selfUniqueAddress, actorRef, it2.next());
            }
            return oRMultiMap2;
        });
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.ddata.DDataWriter
    public CompletionStage<Void> removeSubscriber(ActorRef actorRef, Replicator.WriteConsistency writeConsistency) {
        return update(writeConsistency, oRMultiMap -> {
            return oRMultiMap.remove(this.selfUniqueAddress, actorRef);
        });
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.ddata.DDataReader
    public void receiveChanges(ActorRef actorRef) {
        this.replicator.tell(new Replicator.Subscribe(getKey(), actorRef), ActorRef.noSender());
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.ddata.DDataReader
    public Key<ORMultiMap<ActorRef, S>> getKey() {
        return ORMultiMapKey.create(this.topicType);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getInitialValue, reason: merged with bridge method [inline-methods] */
    public ORMultiMap<ActorRef, S> m8getInitialValue() {
        return ORMultiMap.emptyWithValueDeltas();
    }
}
