package org.eclipse.ditto.policies.enforcement;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.ddata.ORSet;
import akka.cluster.ddata.ReplicatedData;
import akka.cluster.ddata.Replicator;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.dispatch.MessageDispatcher;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cache.config.CacheConfig;
import org.eclipse.ditto.internal.utils.cache.config.DefaultCacheConfig;
import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.policies.api.PolicyTag;
import org.eclipse.ditto.policies.model.PolicyId;
import org.slf4j.Logger;

/* loaded from: input_file:org/eclipse/ditto/policies/enforcement/CachingPolicyEnforcerProvider.class */
final class CachingPolicyEnforcerProvider extends AbstractPolicyEnforcerProvider {
    private static final Logger LOGGER = DittoLoggerFactory.getThreadSafeLogger(CachingPolicyEnforcerProvider.class);
    private static final Duration LOCAL_POLICY_RETRIEVAL_TIMEOUT = Duration.ofSeconds(60);
    private final ActorRef cachingPolicyEnforcerProviderActor;

    /* loaded from: input_file:org/eclipse/ditto/policies/enforcement/CachingPolicyEnforcerProvider$CachingPolicyEnforcerProviderActor.class */
    private static final class CachingPolicyEnforcerProviderActor extends AbstractActor {
        private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
        private final PolicyEnforcerCache policyEnforcerCache;

        CachingPolicyEnforcerProviderActor(PolicyEnforcerCache policyEnforcerCache, @Nullable BlockedNamespaces blockedNamespaces, ActorRef actorRef) {
            this.policyEnforcerCache = policyEnforcerCache;
            if (blockedNamespaces != null) {
                blockedNamespaces.subscribeForChanges(getSelf());
            }
            actorRef.tell(DistPubSubAccess.subscribe("policy-invalidate-enforcers", getSelf()), getSelf());
        }

        private static Props props(PolicyEnforcerCache policyEnforcerCache, @Nullable BlockedNamespaces blockedNamespaces, ActorRef actorRef) {
            return Props.create(CachingPolicyEnforcerProviderActor.class, new Object[]{policyEnforcerCache, blockedNamespaces, actorRef});
        }

        public AbstractActor.Receive createReceive() {
            return ReceiveBuilder.create().match(PolicyId.class, this::doGetPolicyEnforcer).match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck -> {
                this.log.debug("Got subscribeAck <{}>.", subscribeAck);
            }).match(PolicyTag.class, policyTag -> {
                this.policyEnforcerCache.invalidate((PolicyId) policyTag.getEntityId());
            }).match(Replicator.Changed.class, this::handleChangedBlockedNamespaces).build();
        }

        private void doGetPolicyEnforcer(PolicyId policyId) {
            Patterns.pipe(this.policyEnforcerCache.get(policyId).thenApply(optional -> {
                return optional.flatMap((v0) -> {
                    return v0.get();
                });
            }), getContext().dispatcher()).to(getSender());
        }

        private void handleChangedBlockedNamespaces(Replicator.Changed<?> changed) {
            ReplicatedData dataValue = changed.dataValue();
            if (!(dataValue instanceof ORSet)) {
                this.log.warning("Unhandled: <{}>", changed);
                return;
            }
            ORSet<String> oRSet = (ORSet) dataValue;
            logNamespaces("Received", oRSet);
            Stream<PolicyId> filter = this.policyEnforcerCache.asMap().keySet().stream().filter(policyId -> {
                return oRSet.contains(policyId.getNamespace());
            });
            PolicyEnforcerCache policyEnforcerCache = this.policyEnforcerCache;
            Objects.requireNonNull(policyEnforcerCache);
            filter.forEach(policyEnforcerCache::invalidate);
        }

        private void logNamespaces(String str, ORSet<String> oRSet) {
            if (oRSet.size() > 25) {
                this.log.info("{} <{}> namespaces", str, Integer.valueOf(oRSet.size()));
            } else {
                this.log.info("{} namespaces: <{}>", str, oRSet);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CachingPolicyEnforcerProvider(ActorSystem actorSystem) {
        this(actorSystem, policyEnforcerCacheLoader(actorSystem), enforcementCacheDispatcher(actorSystem), (CacheConfig) DefaultCacheConfig.of(actorSystem.settings().config(), PolicyEnforcerProvider.ENFORCER_CACHE_CONFIG_KEY));
    }

    private CachingPolicyEnforcerProvider(ActorSystem actorSystem, AsyncCacheLoader<PolicyId, Entry<PolicyEnforcer>> asyncCacheLoader, MessageDispatcher messageDispatcher, CacheConfig cacheConfig) {
        this(actorSystem, new PolicyEnforcerCache(asyncCacheLoader, messageDispatcher, cacheConfig), BlockedNamespaces.of(actorSystem), DistributedPubSub.get(actorSystem).mediator());
    }

    CachingPolicyEnforcerProvider(ActorSystem actorSystem, PolicyEnforcerCache policyEnforcerCache, BlockedNamespaces blockedNamespaces, ActorRef actorRef) {
        this.cachingPolicyEnforcerProviderActor = actorSystem.actorOf(CachingPolicyEnforcerProviderActor.props(policyEnforcerCache, blockedNamespaces, actorRef));
    }

    @Override // org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider
    public CompletionStage<Optional<PolicyEnforcer>> getPolicyEnforcer(@Nullable PolicyId policyId) {
        return policyId == null ? CompletableFuture.completedStage(Optional.empty()) : Patterns.ask(this.cachingPolicyEnforcerProviderActor, policyId, LOCAL_POLICY_RETRIEVAL_TIMEOUT).thenApply(obj -> {
            return obj instanceof Optional ? ((Optional) obj).map(obj -> {
                if (obj instanceof PolicyEnforcer) {
                    return (PolicyEnforcer) obj;
                }
                LOGGER.warn("Did receive Optional holding an unexpected type. Did expect a PolicyEnforcer but got <{}>.", obj.getClass());
                return null;
            }) : Optional.empty();
        });
    }
}
