package org.eclipse.ditto.services.gateway.proxy.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorKilledException;
import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.FI;
import akka.japi.pf.ReceiveBuilder;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.ditto.json.JsonRuntimeException;
import org.eclipse.ditto.model.base.exceptions.DittoJsonException;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.json.FieldType;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.cluster.ShardedMessageEnvelope;
import org.eclipse.ditto.services.utils.distributedcache.actors.DeleteCacheEntry;
import org.eclipse.ditto.services.utils.distributedcache.actors.ReadConsistency;
import org.eclipse.ditto.services.utils.distributedcache.actors.WriteConsistency;
import org.eclipse.ditto.services.utils.distributedcache.model.CacheEntry;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.devops.RetrieveStatistics;
import org.eclipse.ditto.signals.events.base.Event;

/* loaded from: input_file:org/eclipse/ditto/services/gateway/proxy/actors/AbstractProxyActor.class */
public abstract class AbstractProxyActor extends AbstractActor {
    public static final String ACTOR_NAME = "proxy";
    static final String PUB_SUB_GROUP_NAME = "proxy";
    private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
    private final ActorRef statisticsActor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractProxyActor(ActorRef actorRef) {
        this.statisticsActor = getContext().actorOf(StatisticsActor.props(actorRef), StatisticsActor.ACTOR_NAME);
    }

    protected abstract void addCommandBehaviour(ReceiveBuilder receiveBuilder);

    protected abstract void addResponseBehaviour(ReceiveBuilder receiveBuilder);

    protected abstract void addErrorBehaviour(ReceiveBuilder receiveBuilder);

    protected abstract void deleteCacheEntry(LookupEnforcerResponse lookupEnforcerResponse);

    public SupervisorStrategy supervisorStrategy() {
        return new OneForOneStrategy(true, DeciderBuilder.match(NullPointerException.class, nullPointerException -> {
            this.log.error(nullPointerException, "NullPointer in child actor - restarting it...", nullPointerException.getMessage());
            this.log.info("Restarting child...");
            return SupervisorStrategy.restart();
        }).match(ActorKilledException.class, actorKilledException -> {
            this.log.error(actorKilledException.getCause(), "ActorKilledException in child actor - stopping it...");
            return SupervisorStrategy.stop();
        }).matchAny(th -> {
            return SupervisorStrategy.escalate();
        }).build());
    }

    public AbstractActor.Receive createReceive() {
        ReceiveBuilder create = ReceiveBuilder.create();
        create.match(RetrieveStatistics.class, retrieveStatistics -> {
            this.log.debug("Got 'RetrieveStatistics' message");
            this.statisticsActor.forward(retrieveStatistics, getContext());
        });
        addCommandBehaviour(create);
        addResponseBehaviour(create);
        create.match(LookupEnforcerResponse.class, lookupEnforcerResponse -> {
            return lookupEnforcerResponse.getEnforcerRef().isPresent();
        }, lookupEnforcerResponse2 -> {
            lookupEnforcerResponse2.getEnforcerRef().get().tell(createShardedMessage(lookupEnforcerResponse2), lookupEnforcerResponse2.getContext().getInitialSender());
            deleteCacheEntry(lookupEnforcerResponse2);
        }).match(LookupEnforcerResponse.class, lookupEnforcerResponse3 -> {
            return lookupEnforcerResponse3.getError().isPresent();
        }, lookupEnforcerResponse4 -> {
            Throwable th = lookupEnforcerResponse4.getError().get();
            getLogger().info("Received Error during lookup of enforcer: {}", th.getMessage(), th);
            lookupEnforcerResponse4.getContext().getInitialSender().tell(th, ActorRef.noSender());
        });
        addErrorBehaviour(create);
        create.match(LookupEnforcerResponse.class, isOfType((Class<?>) Event.class), lookupEnforcerResponse5 -> {
            Event event = getEvent(lookupEnforcerResponse5);
            getLogger().warning("Event of type <{}> with ID <{}> could not be dispatched as no enforcer could be looked up! This should not happen and it most likely a bug.", event.getType(), event.getId());
        }).match(LookupEnforcerResponse.class, lookupEnforcerResponse6 -> {
            getLogger().warning("EnforcerLookupActor.LookupEnforcerResponse could not be handled: {}", lookupEnforcerResponse6);
        }).match(Status.Failure.class, failure -> {
            DittoJsonException cause = failure.cause();
            if (cause instanceof JsonRuntimeException) {
                cause = new DittoJsonException((RuntimeException) cause);
            }
            getSender().tell(cause, getSelf());
        }).match(DittoRuntimeException.class, dittoRuntimeException -> {
            getSender().tell(dittoRuntimeException, getSelf());
        }).match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck -> {
            getLogger().debug("Successfully subscribed to distributed pub/sub on topic '{}'", subscribeAck.subscribe().topic());
        }).matchAny(obj -> {
            getLogger().warning("Got unknown message, expected a 'Command': {}", obj);
        });
        return create.build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DiagnosticLoggingAdapter getLogger() {
        return this.log;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static FI.TypedPredicate<LookupEnforcerResponse> isOfType(CharSequence charSequence) {
        return lookupEnforcerResponse -> {
            Signal<?> initialCommandOrEvent = lookupEnforcerResponse.getContext().getInitialCommandOrEvent();
            return !isLiveSignal(initialCommandOrEvent) && Objects.equals(charSequence.toString(), initialCommandOrEvent.getType());
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static FI.TypedPredicate<LookupEnforcerResponse> isOfType(Class<?> cls) {
        return lookupEnforcerResponse -> {
            return cls.isAssignableFrom(lookupEnforcerResponse.getContext().getInitialCommandOrEvent().getClass());
        };
    }

    protected static Object createShardedMessage(LookupEnforcerResponse lookupEnforcerResponse) {
        ShardedMessageEnvelope initialCommandOrEvent = lookupEnforcerResponse.getContext().getInitialCommandOrEvent();
        ShardedMessageEnvelope shardedMessageEnvelope = initialCommandOrEvent;
        Optional<String> shardId = lookupEnforcerResponse.getShardId();
        if (shardId.isPresent()) {
            shardedMessageEnvelope = ShardedMessageEnvelope.of(shardId.get(), initialCommandOrEvent.getType(), initialCommandOrEvent.toJson(initialCommandOrEvent.getImplementedSchemaVersion(), FieldType.regularOrSpecial()), initialCommandOrEvent.getDittoHeaders());
        }
        return shardedMessageEnvelope;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T extends Signal<T>> FI.UnitApply<T> forwardToLocalEnforcerLookup(ActorRef actorRef) {
        return forwardToEnforcerLookup(actorRef, ReadConsistency.LOCAL);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T extends Signal<T>> FI.UnitApply<T> forwardToMajorityEnforcerLookup(ActorRef actorRef) {
        return forwardToEnforcerLookup(actorRef, ReadConsistency.MAJORITY);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isLiveSignal(Signal<?> signal) {
        Optional channel = signal.getDittoHeaders().getChannel();
        String name = TopicPath.Channel.LIVE.getName();
        name.getClass();
        return channel.filter((v1) -> {
            return r1.equals(v1);
        }).isPresent();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Object getSignal(LookupEnforcerResponse lookupEnforcerResponse) {
        return lookupEnforcerResponse.getContext().getInitialCommandOrEvent();
    }

    private static <T extends Event<T>> T getEvent(LookupEnforcerResponse lookupEnforcerResponse) {
        return lookupEnforcerResponse.getContext().getInitialCommandOrEvent();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void deleteEntryFromCache(LookupEnforcerResponse lookupEnforcerResponse, ActorRef actorRef) {
        Event event = getEvent(lookupEnforcerResponse);
        Optional<CacheEntry> cacheEntry = lookupEnforcerResponse.getCacheEntry();
        if (cacheEntry.isPresent()) {
            actorRef.tell(new DeleteCacheEntry(event.getId(), cacheEntry.get(), event.getRevision(), WriteConsistency.LOCAL), ActorRef.noSender());
        } else {
            this.log.error("Attempting to delete nonexistent cache entry <{}>!", lookupEnforcerResponse);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifySender(Object obj) {
        getSender().tell(obj, getSelf());
    }

    private <T extends Signal<T>> FI.UnitApply<T> forwardToEnforcerLookup(ActorRef actorRef, ReadConsistency readConsistency) {
        return signal -> {
            LogUtil.enhanceLogWithCorrelationId(this.log, signal);
            this.log.debug("Got <{}>. Forwarding to <{}>.", signal.getName(), actorRef.path().name());
            actorRef.tell(new LookupEnforcer(signal.getId(), LookupContext.getInstance(signal, getSender(), getSelf()), readConsistency), getSelf());
        };
    }
}
