package org.eclipse.ditto.internal.utils.pubsub.actors;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.cluster.ddata.Replicator;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.base.model.acks.PubSubTerminatedException;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.Gauge;
import org.eclipse.ditto.internal.utils.pubsub.api.AcksDeclared;
import org.eclipse.ditto.internal.utils.pubsub.api.DeclareAcks;
import org.eclipse.ditto.internal.utils.pubsub.api.LocalAcksChanged;
import org.eclipse.ditto.internal.utils.pubsub.api.ReceiveLocalAcks;
import org.eclipse.ditto.internal.utils.pubsub.api.ReceiveRemoteAcks;
import org.eclipse.ditto.internal.utils.pubsub.api.RemoteAcksChanged;
import org.eclipse.ditto.internal.utils.pubsub.api.RemoveSubscriberAcks;
import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DData;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DDataUpdate;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DDataWriter;
import org.eclipse.ditto.internal.utils.pubsub.ddata.ack.Grouped;
import org.eclipse.ditto.internal.utils.pubsub.ddata.ack.GroupedRelation;
import org.eclipse.ditto.internal.utils.pubsub.ddata.literal.LiteralUpdate;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/AckUpdater.class */
public final class AckUpdater extends AbstractActorWithTimers implements ClusterMemberRemovedAware {
    public static final String ACTOR_NAME_PREFIX = "ackUpdater";
    private final Address ownAddress;
    private final DData<Address, String, LiteralUpdate> ackDData;
    protected final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
    private Map<String, Set<String>> remoteAckLabels = Map.of();
    private Map<String, Set<String>> remoteGroups = Map.of();
    private LiteralUpdate previousUpdate = LiteralUpdate.empty();
    private final GroupedRelation<ActorRef, String> localAckLabels = GroupedRelation.create();
    private final Set<ActorRef> ddataChangeRecipients = new HashSet();
    private final Set<ActorRef> localChangeRecipients = new HashSet();
    private final Gauge ackSizeMetric = DittoMetrics.gauge("pubsub-ack-size-bytes");

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/AckUpdater$Clock.class */
    private enum Clock {
        TICK
    }

    protected AckUpdater(PubSubConfig pubSubConfig, Address address, DData<Address, String, LiteralUpdate> dData) {
        this.ownAddress = address;
        this.ackDData = dData;
        subscribeForClusterMemberRemovedAware();
        dData.getReader().receiveChanges(getSelf());
        getTimers().startTimerAtFixedRate(Clock.TICK, Clock.TICK, pubSubConfig.getUpdateInterval());
    }

    public static Props props(PubSubConfig pubSubConfig, Address address, DData<Address, String, LiteralUpdate> dData) {
        return Props.create(AckUpdater.class, new Object[]{pubSubConfig, address, dData});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(DeclareAcks.class, this::declare).match(Terminated.class, this::terminated).match(RemoveSubscriberAcks.class, this::removeSubscriber).match(ReceiveRemoteAcks.class, this::onReceiveDDataChanges).match(ReceiveLocalAcks.class, this::onReceiveLocalChanges).matchEquals(Clock.TICK, this::tick).match(Replicator.Changed.class, this::onChanged).build().orElse(receiveClusterMemberRemoved()).orElse(ReceiveBuilder.create().matchAny(this::logUnhandled).build());
    }

    @Override // org.eclipse.ditto.internal.utils.pubsub.actors.ClusterMemberRemovedAware
    public LoggingAdapter log() {
        return this.log;
    }

    @Override // org.eclipse.ditto.internal.utils.pubsub.actors.ClusterMemberRemovedAware
    public DDataWriter<?, ?> getDDataWriter() {
        return this.ackDData.getWriter();
    }

    private void declare(DeclareAcks declareAcks) {
        ActorRef sender = getSender();
        ActorRef subscriber = declareAcks.getSubscriber();
        String orElse = declareAcks.getGroup().orElse(null);
        Set<String> ackLabels = declareAcks.getAckLabels();
        if (!isAllowedLocally(orElse, ackLabels) || !isAllowedRemotely(orElse, ackLabels)) {
            failSubscribe(sender);
            return;
        }
        this.localAckLabels.put(subscriber, orElse, ackLabels);
        getContext().watch(subscriber);
        getSender().tell(AcksDeclared.of(declareAcks, sender), getSelf());
    }

    private boolean isAllowedLocally(@Nullable String str, Set<String> set) {
        if (str != null) {
            Optional<Set<String>> valuesOfGroup = this.localAckLabels.getValuesOfGroup(str);
            if (valuesOfGroup.isPresent()) {
                return valuesOfGroup.get().equals(set);
            }
        }
        GroupedRelation<ActorRef, String> groupedRelation = this.localAckLabels;
        Objects.requireNonNull(groupedRelation);
        return noDeclaredLabelMatches(set, (v1) -> {
            return r2.containsValue(v1);
        });
    }

    private boolean isAllowedRemotely(@Nullable String str, Set<String> set) {
        return isAllowedRemotelyBy(str, set, this.remoteGroups, conflictWithOtherGroups(str, this.remoteAckLabels));
    }

    private boolean isAllowedRemotelyBy(Grouped<String> grouped, Map<String, Set<String>> map, Predicate<String> predicate) {
        return isAllowedRemotelyBy(grouped.getGroup().orElse(null), grouped.getValues(), map, predicate);
    }

    private boolean isAllowedRemotelyBy(@Nullable String str, Set<String> set, Map<String, Set<String>> map, Predicate<String> predicate) {
        Set<String> set2;
        boolean noDeclaredLabelMatches = noDeclaredLabelMatches(set, predicate);
        return (!noDeclaredLabelMatches || str == null || (set2 = map.get(str)) == null) ? noDeclaredLabelMatches : set2.equals(set);
    }

    private boolean noDeclaredLabelMatches(Set<String> set, Predicate<String> predicate) {
        return set.stream().noneMatch(predicate);
    }

    private void tick(Clock clock) {
        writeLocalDData();
        LocalAcksChanged of = LocalAcksChanged.of(this.localAckLabels.export());
        this.localChangeRecipients.forEach(actorRef -> {
            actorRef.tell(of, getSelf());
        });
        this.ackSizeMetric.set(Long.valueOf(of.getSnapshot().estimateSize()));
    }

    private void onChanged(Replicator.Changed<?> changed) {
        Map<Address, List<Grouped<String>>> deserializeORMultiMap = Grouped.deserializeORMultiMap(changed.dataValue(), (v0) -> {
            return v0.asString();
        });
        List<Grouped<String>> remoteGroupedAckLabelsOrderByAddress = getRemoteGroupedAckLabelsOrderByAddress(deserializeORMultiMap);
        this.remoteGroups = getRemoteGroups(remoteGroupedAckLabelsOrderByAddress);
        this.remoteAckLabels = getRemoteAckLabels(remoteGroupedAckLabelsOrderByAddress);
        for (ActorRef actorRef : getLocalLosers(deserializeORMultiMap)) {
            doRemoveSubscriber(actorRef);
            failSubscribe(actorRef);
        }
        RemoteAcksChanged of = RemoteAcksChanged.of(deserializeORMultiMap);
        this.ddataChangeRecipients.forEach(actorRef2 -> {
            actorRef2.tell(of, getSelf());
        });
    }

    private List<Grouped<String>> getRemoteGroupedAckLabelsOrderByAddress(Map<Address, List<Grouped<String>>> map) {
        return (List) map.entrySet().stream().filter(this::isNotOwnAddress).sorted(entryKeyAddressComparator()).map((v0) -> {
            return v0.getValue();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    private boolean isNotOwnAddress(Map.Entry<Address, ?> entry) {
        return !this.ownAddress.equals(entry.getKey());
    }

    private Map<String, Set<String>> getRemoteGroups(List<Grouped<String>> list) {
        HashMap hashMap = new HashMap();
        list.stream().flatMap((v0) -> {
            return v0.streamAsGroupedPair();
        }).forEach(pair -> {
            hashMap.computeIfAbsent((String) pair.first(), str -> {
                return (Set) pair.second();
            });
        });
        return Collections.unmodifiableMap(hashMap);
    }

    private Map<String, Set<String>> getRemoteAckLabels(List<Grouped<String>> list) {
        HashMap hashMap = new HashMap();
        for (Grouped<String> grouped : list) {
            String orElse = grouped.getGroup().orElse("");
            Iterator<String> it = grouped.getValues().iterator();
            while (it.hasNext()) {
                hashMap.compute(it.next(), (str, set) -> {
                    Set hashSet = set == null ? new HashSet() : set;
                    hashSet.add(orElse);
                    return hashSet;
                });
            }
        }
        return hashMap;
    }

    private void logUnhandled(Object obj) {
        this.log.warning("Unhandled: <{}>", obj);
    }

    private void terminated(Terminated terminated) {
        ActorRef actor = terminated.actor();
        doRemoveSubscriber(actor);
        this.ddataChangeRecipients.remove(actor);
        if (this.localChangeRecipients.remove(actor)) {
            reportLocalDataLoss();
        }
    }

    private void reportLocalDataLoss() {
        this.localAckLabels.entrySet().forEach(entry -> {
            ((ActorRef) entry.getKey()).tell(PubSubTerminatedException.getInstance(), getSelf());
        });
        this.localAckLabels.clear();
    }

    private void removeSubscriber(RemoveSubscriberAcks removeSubscriberAcks) {
        doRemoveSubscriber(removeSubscriberAcks.getSubscriber());
    }

    private void doRemoveSubscriber(ActorRef actorRef) {
        this.localAckLabels.removeKey(actorRef);
        getContext().unwatch(actorRef);
    }

    private void writeLocalDData() {
        this.ackDData.getWriter().put(this.ownAddress, createAndSetDDataUpdate(), Replicator.writeLocal()).whenComplete((r5, th) -> {
            if (th != null) {
                this.log.error(th, "Failed to update local DData");
            }
        });
    }

    /* JADX WARN: Type inference failed for: r0v10, types: [org.eclipse.ditto.internal.utils.pubsub.ddata.literal.LiteralUpdate] */
    private LiteralUpdate createAndSetDDataUpdate() {
        LiteralUpdate withInserts = LiteralUpdate.withInserts((Set) this.localAckLabels.exportValuesByGroup().stream().map((v0) -> {
            return v0.toJsonString();
        }).collect(Collectors.toSet()));
        ?? diff2 = withInserts.diff2((DDataUpdate<String>) this.previousUpdate);
        this.previousUpdate = withInserts;
        return diff2;
    }

    private void failSubscribe(ActorRef actorRef) {
        actorRef.tell(AcknowledgementLabelNotUniqueException.getInstance(), getSelf());
    }

    private List<ActorRef> getLocalLosers(Map<Address, List<Grouped<String>>> map) {
        List<Grouped<String>> remoteGroupedAckLabelsOrderByAddress = getRemoteGroupedAckLabelsOrderByAddress((Map) map.entrySet().stream().filter(entry -> {
            return Address.addressOrdering().compare((Address) entry.getKey(), this.ownAddress) < 0;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
        Map<String, Set<String>> remoteGroups = getRemoteGroups(remoteGroupedAckLabelsOrderByAddress);
        Map<String, Set<String>> remoteAckLabels = getRemoteAckLabels(remoteGroupedAckLabelsOrderByAddress);
        return (List) this.localAckLabels.entrySet().stream().filter(entry2 -> {
            return !isAllowedRemotelyBy((Grouped) entry2.getValue(), remoteGroups, conflictWithOtherGroups(((Grouped) entry2.getValue()).getGroup().orElse(null), remoteAckLabels));
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
    }

    private void onReceiveDDataChanges(ReceiveRemoteAcks receiveRemoteAcks) {
        this.ddataChangeRecipients.add(receiveRemoteAcks.getReceiver());
        getContext().watch(receiveRemoteAcks.getReceiver());
    }

    private void onReceiveLocalChanges(ReceiveLocalAcks receiveLocalAcks) {
        this.localChangeRecipients.add(receiveLocalAcks.getReceiver());
        getContext().watch(receiveLocalAcks.getReceiver());
    }

    private static <T> Comparator<Map.Entry<Address, T>> entryKeyAddressComparator() {
        return (entry, entry2) -> {
            return Address.addressOrdering().compare((Address) entry.getKey(), (Address) entry2.getKey());
        };
    }

    private static Predicate<String> conflictWithOtherGroups(@Nullable String str, Map<String, Set<String>> map) {
        if (str != null) {
            return str2 -> {
                Set set = (Set) map.getOrDefault(str2, Set.of());
                return (set.isEmpty() || (set.size() == 1 && set.contains(str))) ? false : true;
            };
        }
        Objects.requireNonNull(map);
        return (v1) -> {
            return r0.containsKey(v1);
        };
    }
}
