package org.eclipse.ditto.services.gateway.proxy.actors;

import akka.actor.ActorRef;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.FI;
import akka.japi.pf.ReceiveBuilder;
import akka.routing.FromConfig;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.services.gateway.proxy.actors.handlers.CreateThingHandlerActor;
import org.eclipse.ditto.services.gateway.proxy.actors.handlers.ModifyPolicyHandlerActor;
import org.eclipse.ditto.services.gateway.proxy.actors.handlers.ModifyThingHandlerActor;
import org.eclipse.ditto.services.gateway.proxy.actors.handlers.RetrieveThingHandlerActor;
import org.eclipse.ditto.services.gateway.proxy.actors.handlers.ThingHandlerCreator;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoCommand;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveThings;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveThingsResponse;
import org.eclipse.ditto.services.models.thingsearch.commands.sudo.ThingSearchSudoCommand;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.devops.DevOpsCommand;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.policies.PolicyCommand;
import org.eclipse.ditto.signals.commands.policies.modify.ModifyPolicy;
import org.eclipse.ditto.signals.commands.things.ThingCommand;
import org.eclipse.ditto.signals.commands.things.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.signals.commands.things.query.RetrieveThings;
import org.eclipse.ditto.signals.commands.things.query.RetrieveThingsResponse;
import org.eclipse.ditto.signals.commands.thingsearch.ThingSearchCommand;
import org.eclipse.ditto.signals.events.policies.PolicyEvent;
import org.eclipse.ditto.signals.events.things.ThingDeleted;
import org.eclipse.ditto.signals.events.things.ThingEvent;

/* loaded from: input_file:org/eclipse/ditto/services/gateway/proxy/actors/AbstractThingProxyActor.class */
public abstract class AbstractThingProxyActor extends AbstractProxyActor {
    private static final String THINGS_SEARCH_ACTOR_PATH = "/user/thingsSearchRoot/thingsSearch";
    private final ActorRef pubSubMediator;
    private final ActorRef aclEnforcerShardRegion;
    private final ActorRef devOpsCommandsActor;
    private final ActorRef policyEnforcerShardRegion;
    private final ActorRef thingEnforcerLookup;
    private final ActorRef thingCacheFacade;
    private final ActorRef thingsAggregator;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractThingProxyActor(ActorRef actorRef, ActorRef actorRef2, ActorRef actorRef3, ActorRef actorRef4, ActorRef actorRef5, ActorRef actorRef6) {
        super(actorRef);
        this.pubSubMediator = actorRef;
        this.devOpsCommandsActor = actorRef2;
        this.policyEnforcerShardRegion = actorRef4;
        this.aclEnforcerShardRegion = actorRef3;
        this.thingCacheFacade = actorRef6;
        this.thingEnforcerLookup = actorRef5;
        this.thingsAggregator = getContext().actorOf(FromConfig.getInstance().props(ThingsAggregatorActor.props(getContext().self())), ThingsAggregatorActor.ACTOR_NAME);
        actorRef.tell(new DistributedPubSubMediator.Subscribe("policies.events:", AbstractProxyActor.ACTOR_NAME, getSelf()), getSelf());
        actorRef.tell(new DistributedPubSubMediator.Subscribe("things.events:", AbstractProxyActor.ACTOR_NAME, getSelf()), getSelf());
    }

    @Override // org.eclipse.ditto.services.gateway.proxy.actors.AbstractProxyActor
    protected void addCommandBehaviour(ReceiveBuilder receiveBuilder) {
        receiveBuilder.match(DevOpsCommand.class, devOpsCommand -> {
            LogUtil.enhanceLogWithCorrelationId(getLogger(), devOpsCommand);
            getLogger().debug("Got 'DevOpsCommand' message <{}>, forwarding to local devOpsCommandsActor", devOpsCommand.getType());
            this.devOpsCommandsActor.forward(devOpsCommand, getContext());
        }).match(SudoRetrieveThings.class, sudoRetrieveThings -> {
            getLogger().debug("Got 'SudoRetrieveThings' message, forwarding to the Things Aggregator");
            if (sudoRetrieveThings.getThingIds().isEmpty()) {
                getLogger().debug("Got 'SudoRetrieveThings' message with no ThingIds");
                notifySender(SudoRetrieveThingsResponse.of(JsonArray.newBuilder().build(), sudoRetrieveThings.getDittoHeaders()));
            } else {
                getLogger().debug("Got 'SudoRetrieveThings' message, forwarding to the Things Aggregator");
                this.thingsAggregator.forward(sudoRetrieveThings, getContext());
            }
        }).match(SudoCommand.class, forwardToLocalEnforcerLookup(this.thingEnforcerLookup)).match(org.eclipse.ditto.services.models.policies.commands.sudo.SudoCommand.class, forwardToLocalEnforcerLookup(this.thingEnforcerLookup)).match(Signal.class, AbstractProxyActor::isLiveSignalResponse, signal -> {
            signal.getDittoHeaders().getCorrelationId().map(this::encodeActorName).map(str -> {
                return getContext().getChild(str);
            }).ifPresent(actorRef -> {
                actorRef.forward(signal, getContext());
            });
        }).match(Signal.class, AbstractProxyActor::isLiveSignal, compose(createResponseActor(), forwardToLocalEnforcerLookup(this.thingEnforcerLookup))).match(ModifyPolicy.class, modifyPolicy -> {
            getContext().actorOf(ModifyPolicyHandlerActor.props(this.policyEnforcerShardRegion)).forward(modifyPolicy, getContext());
        }).match(PolicyCommand.class, policyCommand -> {
            this.policyEnforcerShardRegion.forward(policyCommand, getContext());
        }).match(org.eclipse.ditto.services.models.policies.commands.sudo.SudoCommand.class, sudoCommand -> {
            this.policyEnforcerShardRegion.forward(sudoCommand, getContext());
        }).match(PolicyEvent.class, policyEvent -> {
            LogUtil.enhanceLogWithCorrelationId(getLogger(), policyEvent);
            getLogger().debug("Got '{}' message, forwarding to the PolicyEnforcer", policyEvent.getType());
            this.policyEnforcerShardRegion.tell(policyEvent, getSender());
        }).match(RetrieveThings.class, retrieveThings -> {
            if (retrieveThings.getThingIds().isEmpty()) {
                getLogger().debug("Got 'RetrieveThings' message with no ThingIds");
                notifySender(RetrieveThingsResponse.of(JsonFactory.newArray(), (String) retrieveThings.getNamespace().orElse(null), retrieveThings.getDittoHeaders()));
            } else {
                getLogger().debug("Got 'RetrieveThings' message, forwarding to the Things Aggregator");
                this.thingsAggregator.forward(retrieveThings, getContext());
            }
        }).match(ThingCommand.class, forwardToLocalEnforcerLookup(this.thingEnforcerLookup)).match(MessageCommand.class, forwardToLocalEnforcerLookup(this.thingEnforcerLookup)).match(ThingDeleted.class, forwardToMajorityEnforcerLookup(this.thingEnforcerLookup)).match(ThingEvent.class, forwardToLocalEnforcerLookup(this.thingEnforcerLookup)).match(ThingSearchCommand.class, thingSearchCommand -> {
            this.pubSubMediator.tell(new DistributedPubSubMediator.Send(THINGS_SEARCH_ACTOR_PATH, thingSearchCommand), getSender());
        }).match(ThingSearchSudoCommand.class, thingSearchSudoCommand -> {
            this.pubSubMediator.tell(new DistributedPubSubMediator.Send(THINGS_SEARCH_ACTOR_PATH, thingSearchSudoCommand), getSender());
        });
    }

    @Override // org.eclipse.ditto.services.gateway.proxy.actors.AbstractProxyActor
    protected void addResponseBehaviour(ReceiveBuilder receiveBuilder) {
        receiveBuilder.match(LookupEnforcerResponse.class, isOfType("things.commands:createThing"), lookupEnforcerResponse -> {
            getThingHandlerActor(lookupEnforcerResponse, CreateThingHandlerActor::props).tell(getSignal(lookupEnforcerResponse), lookupEnforcerResponse.getContext().getInitialSender());
        }).match(LookupEnforcerResponse.class, isOfType("things.commands:modifyThing"), lookupEnforcerResponse2 -> {
            getThingHandlerActor(lookupEnforcerResponse2, ModifyThingHandlerActor::props).tell(getSignal(lookupEnforcerResponse2), lookupEnforcerResponse2.getContext().getInitialSender());
        }).match(LookupEnforcerResponse.class, isRetrieveThingWithAggregationNeeded(), lookupEnforcerResponse3 -> {
            LookupContext<?> context = lookupEnforcerResponse3.getContext();
            getLogger().debug("Got 'RetrieveThing' message with a '{}' lookup: {}", "_policy", context.getInitialCommandOrEvent());
            getThingHandlerActor(lookupEnforcerResponse3, RetrieveThingHandlerActor::props).tell(getSignal(lookupEnforcerResponse3), context.getInitialSender());
        });
    }

    @Override // org.eclipse.ditto.services.gateway.proxy.actors.AbstractProxyActor
    protected void addErrorBehaviour(ReceiveBuilder receiveBuilder) {
        receiveBuilder.match(LookupEnforcerResponse.class, isOfType((Class<?>) ThingCommand.class), lookupEnforcerResponse -> {
            LookupContext<?> context = lookupEnforcerResponse.getContext();
            ThingCommand initialCommandOrEvent = context.getInitialCommandOrEvent();
            getLogger().info("Command of type <{}> with ID <{}> could not be dispatched as no enforcer could be looked up! Answering with ThingNotAccessibleException.", initialCommandOrEvent.getType(), initialCommandOrEvent.getId());
            context.getInitialSender().tell(ThingNotAccessibleException.newBuilder(initialCommandOrEvent.getId()).dittoHeaders(initialCommandOrEvent.getDittoHeaders()).build(), ActorRef.noSender());
        }).match(LookupEnforcerResponse.class, isOfType((Class<?>) MessageCommand.class), lookupEnforcerResponse2 -> {
            LookupContext<?> context = lookupEnforcerResponse2.getContext();
            MessageCommand initialCommandOrEvent = context.getInitialCommandOrEvent();
            getLogger().info("Command of type <{}> with ID <{}> could not be dispatched as no enforcer could be looked up! Answering with ThingNotAccessibleException.", initialCommandOrEvent.getType(), initialCommandOrEvent.getId());
            context.getInitialSender().tell(ThingNotAccessibleException.newBuilder(initialCommandOrEvent.getId()).dittoHeaders(initialCommandOrEvent.getDittoHeaders()).build(), ActorRef.noSender());
        });
    }

    @Override // org.eclipse.ditto.services.gateway.proxy.actors.AbstractProxyActor
    protected void deleteCacheEntry(LookupEnforcerResponse lookupEnforcerResponse) {
        if (isOfType("things.events:thingDeleted").defined(lookupEnforcerResponse)) {
            deleteEntryFromCache(lookupEnforcerResponse, this.thingCacheFacade);
        }
    }

    @SafeVarargs
    private final FI.UnitApply<Signal> compose(FI.UnitApply<Signal>... unitApplyArr) {
        return signal -> {
            for (FI.UnitApply unitApply : unitApplyArr) {
                unitApply.apply(signal);
            }
        };
    }

    private <T extends Signal<T>> FI.UnitApply<T> createResponseActor() {
        return signal -> {
            signal.getDittoHeaders().getCorrelationId().map(this::encodeActorName).ifPresent(this::startResponseChildActor);
        };
    }

    private String encodeActorName(String str) {
        try {
            return URLEncoder.encode(str, StandardCharsets.UTF_8.name());
        } catch (UnsupportedEncodingException e) {
            throw new IllegalStateException("Unsupported encoding", e);
        }
    }

    private ActorRef startResponseChildActor(String str) {
        return getContext().actorOf(CommandResponseActor.props(str, getSender()), str);
    }

    private static FI.TypedPredicate<LookupEnforcerResponse> isRetrieveThingWithAggregationNeeded() {
        return lookupEnforcerResponse -> {
            if (isOfType("things.commands:retrieveThing").defined(lookupEnforcerResponse)) {
                return RetrieveThingHandlerActor.checkIfAggregationIsNeeded(lookupEnforcerResponse.getContext().getInitialCommandOrEvent());
            }
            return false;
        };
    }

    private ActorRef getThingHandlerActor(LookupEnforcerResponse lookupEnforcerResponse, ThingHandlerCreator thingHandlerCreator) {
        return getContext().actorOf(thingHandlerCreator.props(lookupEnforcerResponse.getEnforcerRef().orElse(null), lookupEnforcerResponse.getShardId().orElse(null), this.aclEnforcerShardRegion, this.policyEnforcerShardRegion));
    }
}
