package org.eclipse.ditto.services.concierge.enforcement;

import akka.actor.ActorRef;
import akka.cluster.pubsub.DistributedPubSubMediator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.enforcers.AclEnforcer;
import org.eclipse.ditto.model.enforcers.Enforcer;
import org.eclipse.ditto.model.messages.MessageSendNotAllowedException;
import org.eclipse.ditto.model.policies.PoliciesResourceType;
import org.eclipse.ditto.model.policies.ResourceKey;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.protocoladapter.UnknownCommandException;
import org.eclipse.ditto.services.models.concierge.streaming.StreamingType;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.cache.Cache;
import org.eclipse.ditto.services.utils.cache.EntityId;
import org.eclipse.ditto.services.utils.cache.entry.Entry;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.messages.SendClaimMessage;
import org.eclipse.ditto.signals.commands.messages.SendMessageAcceptedResponse;
import org.eclipse.ditto.signals.commands.things.ThingCommand;
import org.eclipse.ditto.signals.commands.things.exceptions.EventSendNotAllowedException;
import org.eclipse.ditto.signals.commands.things.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.things.ThingEvent;

/* loaded from: input_file:org/eclipse/ditto/services/concierge/enforcement/LiveSignalEnforcement.class */
public final class LiveSignalEnforcement extends AbstractEnforcement<Signal> {
    private final EnforcerRetriever enforcerRetriever;
    private final Cache<String, ActorRef> responseReceivers;

    /* loaded from: input_file:org/eclipse/ditto/services/concierge/enforcement/LiveSignalEnforcement$Provider.class */
    public static final class Provider implements EnforcementProvider<Signal> {
        private final Cache<EntityId, Entry<EntityId>> thingIdCache;
        private final Cache<EntityId, Entry<Enforcer>> policyEnforcerCache;
        private final Cache<EntityId, Entry<Enforcer>> aclEnforcerCache;

        public Provider(Cache<EntityId, Entry<EntityId>> cache, Cache<EntityId, Entry<Enforcer>> cache2, Cache<EntityId, Entry<Enforcer>> cache3) {
            this.thingIdCache = (Cache) Objects.requireNonNull(cache);
            this.policyEnforcerCache = (Cache) Objects.requireNonNull(cache2);
            this.aclEnforcerCache = (Cache) Objects.requireNonNull(cache3);
        }

        @Override // org.eclipse.ditto.services.concierge.enforcement.EnforcementProvider
        public Class<Signal> getCommandClass() {
            return Signal.class;
        }

        @Override // org.eclipse.ditto.services.concierge.enforcement.EnforcementProvider
        public boolean isApplicable(Signal signal) {
            return LiveSignalEnforcement.isLiveSignal(signal);
        }

        @Override // org.eclipse.ditto.services.concierge.enforcement.EnforcementProvider
        public AbstractEnforcement<Signal> createEnforcement(Contextual<Signal> contextual) {
            return new LiveSignalEnforcement(contextual, this.thingIdCache, this.policyEnforcerCache, this.aclEnforcerCache);
        }
    }

    private LiveSignalEnforcement(Contextual<Signal> contextual, Cache<EntityId, Entry<EntityId>> cache, Cache<EntityId, Entry<Enforcer>> cache2, Cache<EntityId, Entry<Enforcer>> cache3) {
        super(contextual);
        Objects.requireNonNull(cache);
        Objects.requireNonNull(cache2);
        Objects.requireNonNull(cache3);
        this.enforcerRetriever = PolicyOrAclEnforcerRetrieverFactory.create(cache, cache2, cache3);
        this.responseReceivers = contextual.getResponseReceivers();
    }

    @Override // org.eclipse.ditto.services.concierge.enforcement.AbstractEnforcement
    public CompletionStage<Contextual<WithDittoHeaders>> enforce() {
        Signal signal = signal();
        ActorRef sender = sender();
        LogUtil.enhanceLogWithCorrelationIdOrRandom(signal);
        return this.enforcerRetriever.retrieve(entityId(), (entry, entry2) -> {
            try {
                return doEnforce(signal, sender, entry2).exceptionally(this::handleExceptionally);
            } catch (RuntimeException e) {
                return CompletableFuture.completedFuture(handleExceptionally(e));
            }
        });
    }

    private CompletionStage<Contextual<WithDittoHeaders>> doEnforce(Signal signal, ActorRef actorRef, Entry<Enforcer> entry) {
        Optional correlationId = signal.getDittoHeaders().getCorrelationId();
        if (!entry.exists() || !correlationId.isPresent()) {
            log(signal).info("Command of type <{}> with ID <{}> could not be dispatched as no enforcer could be looked up! Answering with ThingNotAccessibleException.", signal.getType(), signal.getId());
            throw ThingNotAccessibleException.newBuilder(entityId().getId()).dittoHeaders(signal.getDittoHeaders()).build();
        }
        Enforcer enforcer = (Enforcer) entry.getValueOrThrow();
        String str = (String) correlationId.get();
        if (signal instanceof SendClaimMessage) {
            if (signal.getDittoHeaders().isResponseRequired()) {
                this.responseReceivers.put(str, actorRef);
            }
            return CompletableFuture.completedFuture(publishMessageCommand((SendClaimMessage) signal, enforcer));
        }
        if (signal instanceof CommandResponse) {
            return enforceLiveCommandResponse(signal, str);
        }
        if (signal instanceof Command) {
            return enforceLiveCommand(signal, actorRef, enforcer, str);
        }
        if (signal instanceof ThingEvent) {
            return enforceLiveEvent(signal, enforcer);
        }
        log().error("Unsupported Signal in LiveSignalEnforcement: <{}>", signal);
        throw GatewayInternalErrorException.newBuilder().dittoHeaders(signal.getDittoHeaders()).build();
    }

    private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveCommandResponse(Signal signal, String str) {
        return this.responseReceivers.get(str).thenApply(optional -> {
            if (!optional.isPresent()) {
                log(signal).warning("No outstanding responses receiver for CommandResponse <{}>", signal.getType());
                return withoutReceiver();
            }
            this.responseReceivers.invalidate(str);
            log().debug("Scheduling CommandResponse <{}> to original sender: <{}>", signal, optional.get());
            return withMessageToReceiver(signal, (ActorRef) optional.get());
        });
    }

    private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveCommand(Signal signal, ActorRef actorRef, Enforcer enforcer, String str) {
        if (signal instanceof MessageCommand) {
            Contextual<WithDittoHeaders> enforceMessageCommand = enforceMessageCommand((MessageCommand) signal, enforcer);
            if (signal.getDittoHeaders().isResponseRequired()) {
                this.responseReceivers.put(str, actorRef);
            }
            return CompletableFuture.completedFuture(enforceMessageCommand);
        }
        if (!(signal instanceof ThingCommand)) {
            log(signal).warning("Ignoring unsupported live command: <{}>", signal);
            throw UnknownCommandException.newBuilder(signal.getName()).message("The sent command is not supported as live command").dittoHeaders(signal.getDittoHeaders()).build();
        }
        if (!(enforcer instanceof AclEnforcer ? ThingCommandEnforcement.authorizeByAcl(enforcer, (ThingCommand) signal).isPresent() : ThingCommandEnforcement.authorizeByPolicy(enforcer, (ThingCommand) signal).isPresent())) {
            log(signal).info("Live Command was NOT authorized: <{}>", signal);
            throw ThingCommandEnforcement.errorForThingCommand((ThingCommand) signal);
        }
        Signal<?> signal2 = (Command) addReadSubjectsToThingSignal((Command) signal, enforcer);
        log(signal2).info("Live Command was authorized: <{}>", signal2);
        if (signal.getDittoHeaders().isResponseRequired()) {
            this.responseReceivers.put(str, actorRef);
        }
        return CompletableFuture.completedFuture(publishToMediator(signal2, StreamingType.LIVE_COMMANDS.getDistributedPubSubTopic()));
    }

    private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveEvent(Signal signal, Enforcer enforcer) {
        if (enforcer.hasUnrestrictedPermissions(PoliciesResourceType.thingResource("/"), signal.getDittoHeaders().getAuthorizationContext(), "WRITE", new String[0])) {
            log(signal).info("Live Event was authorized: <{}>", signal);
            return CompletableFuture.completedFuture(publishToMediator(addReadSubjectsToThingSignal((Event) signal, enforcer), StreamingType.LIVE_EVENTS.getDistributedPubSubTopic()));
        }
        log(signal).info("Live Event was NOT authorized: <{}>", signal);
        throw EventSendNotAllowedException.newBuilder(((ThingEvent) signal).getThingId()).dittoHeaders(signal.getDittoHeaders()).build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isLiveSignal(Signal signal) {
        Optional channel = signal.getDittoHeaders().getChannel();
        String name = TopicPath.Channel.LIVE.getName();
        name.getClass();
        return channel.filter((v1) -> {
            return r1.equals(v1);
        }).isPresent();
    }

    private Contextual<WithDittoHeaders> enforceMessageCommand(MessageCommand messageCommand, Enforcer enforcer) {
        if (isAuthorized(messageCommand, enforcer)) {
            return publishMessageCommand(messageCommand, enforcer);
        }
        throw rejectMessageCommand(messageCommand);
    }

    private Contextual<WithDittoHeaders> publishMessageCommand(MessageCommand messageCommand, Enforcer enforcer) {
        MessageCommand dittoHeaders = messageCommand.setDittoHeaders(messageCommand.getDittoHeaders().toBuilder().readSubjects(enforcer.getSubjectIdsWithPermission(ResourceKey.newInstance("message", messageCommand.getResourcePath()), "READ", new String[0]).getGranted()).build());
        getResponseForFireAndForgetMessage(dittoHeaders).ifPresent((v1) -> {
            replyToSender(v1);
        });
        return publishToMediator(dittoHeaders, dittoHeaders.getTypePrefix());
    }

    private void replyToSender(Object obj) {
        sender().tell(obj, self());
    }

    private MessageSendNotAllowedException rejectMessageCommand(MessageCommand messageCommand) {
        MessageSendNotAllowedException build = MessageSendNotAllowedException.newBuilder(messageCommand.getThingId()).dittoHeaders(messageCommand.getDittoHeaders()).build();
        log(messageCommand).info("The command <{}> was not forwarded due to insufficient rights {}: {} - AuthorizationSubjects: {}", messageCommand.getType(), build.getClass().getSimpleName(), build.getMessage(), messageCommand.getDittoHeaders().getAuthorizationSubjects());
        return build;
    }

    private Contextual<WithDittoHeaders> publishToMediator(Signal<?> signal, String str) {
        log(signal).debug("Publish message to pub-sub: <{}>", str);
        return withMessageToReceiver(signal, pubSubMediator(), obj -> {
            return new DistributedPubSubMediator.Publish(str, obj, true);
        });
    }

    private static boolean isAuthorized(MessageCommand messageCommand, Enforcer enforcer) {
        return enforcer.hasUnrestrictedPermissions(PoliciesResourceType.messageResource(messageCommand.getResourcePath()), messageCommand.getDittoHeaders().getAuthorizationContext(), "WRITE", new String[0]);
    }

    private static Optional<SendMessageAcceptedResponse> getResponseForFireAndForgetMessage(MessageCommand<?, ?> messageCommand) {
        return isFireAndForgetMessage(messageCommand) ? Optional.of(SendMessageAcceptedResponse.newInstance(messageCommand.getThingId(), messageCommand.getMessage().getHeaders(), messageCommand.getDittoHeaders())) : Optional.empty();
    }

    private static boolean isFireAndForgetMessage(MessageCommand<?, ?> messageCommand) {
        return ((Boolean) messageCommand.getMessage().getTimeout().map((v0) -> {
            return v0.isZero();
        }).orElseGet(() -> {
            return Boolean.valueOf(!messageCommand.getDittoHeaders().isResponseRequired());
        })).booleanValue();
    }
}
