package org.eclipse.ditto.edge.service.dispatching;

import akka.actor.AbstractExtensionId;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.actor.Extension;
import akka.cluster.pubsub.DistributedPubSubMessage;
import akka.pattern.AskTimeoutException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.exceptions.AskException;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.edge.service.EdgeServiceTimeoutException;
import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.cacheloaders.AskWithRetry;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.internal.utils.cacheloaders.config.DefaultAskWithRetryConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;

/* loaded from: input_file:org/eclipse/ditto/edge/service/dispatching/AskWithRetryCommandForwarder.class */
public final class AskWithRetryCommandForwarder implements Extension {
    private static final ExtensionId EXTENSION_ID = new ExtensionId();
    private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger(AskWithRetryCommandForwarder.class);
    private final ActorSystem system;
    private final AskWithRetryConfig askWithRetryConfig;

    /* loaded from: input_file:org/eclipse/ditto/edge/service/dispatching/AskWithRetryCommandForwarder$ExtensionId.class */
    private static final class ExtensionId extends AbstractExtensionId<AskWithRetryCommandForwarder> {
        private ExtensionId() {
        }

        /* renamed from: createExtension, reason: merged with bridge method [inline-methods] */
        public AskWithRetryCommandForwarder m13createExtension(ExtendedActorSystem extendedActorSystem) {
            return (AskWithRetryCommandForwarder) AkkaClassLoader.instantiate(extendedActorSystem, AskWithRetryCommandForwarder.class, AskWithRetryCommandForwarder.class.getName(), List.of(ActorSystem.class), List.of(extendedActorSystem));
        }
    }

    public AskWithRetryCommandForwarder(ActorSystem actorSystem) {
        this.system = actorSystem;
        this.askWithRetryConfig = DefaultAskWithRetryConfig.of(DefaultScopedConfig.dittoScoped(actorSystem.settings().config()), "ask-with-retry");
    }

    public static AskWithRetryCommandForwarder get(ActorSystem actorSystem) {
        return (AskWithRetryCommandForwarder) EXTENSION_ID.get(actorSystem);
    }

    public void forwardCommand(Command<?> command, ActorRef actorRef, ActorRef actorRef2) {
        if (shouldSendResponse(command.getDittoHeaders())) {
            AskWithRetry.askWithRetry(actorRef, command, this.askWithRetryConfig, this.system, getResponseCaster(command)).exceptionally(th -> {
                return handleException(th, actorRef2);
            }).thenAccept(commandResponse -> {
                handleResponse(commandResponse, actorRef2);
            });
        } else {
            actorRef.tell(command, actorRef2);
        }
    }

    public void forwardCommandViaPubSub(Command<?> command, DistributedPubSubMessage distributedPubSubMessage, ActorRef actorRef, ActorRef actorRef2) {
        if (shouldSendResponse(command.getDittoHeaders())) {
            AskWithRetry.askWithRetry(actorRef, distributedPubSubMessage, this.askWithRetryConfig, this.system, getResponseCaster(command)).exceptionally(th -> {
                return handleException(th, actorRef2);
            }).thenAccept(commandResponse -> {
                handleResponse(commandResponse, actorRef2);
            });
        } else {
            actorRef.tell(distributedPubSubMessage, actorRef2);
        }
    }

    private boolean shouldSendResponse(DittoHeaders dittoHeaders) {
        return dittoHeaders.isResponseRequired() || needsTwinPersistedAcknowledgement(dittoHeaders) || needsLiveResponseAcknowledgement(dittoHeaders);
    }

    private boolean needsTwinPersistedAcknowledgement(DittoHeaders dittoHeaders) {
        return dittoHeaders.getAcknowledgementRequests().stream().anyMatch(acknowledgementRequest -> {
            return DittoAcknowledgementLabel.TWIN_PERSISTED.equals(acknowledgementRequest.getLabel());
        });
    }

    private boolean needsLiveResponseAcknowledgement(DittoHeaders dittoHeaders) {
        return dittoHeaders.getAcknowledgementRequests().stream().anyMatch(acknowledgementRequest -> {
            return DittoAcknowledgementLabel.LIVE_RESPONSE.equals(acknowledgementRequest.getLabel());
        });
    }

    @Nullable
    private <T extends Signal<?>> T handleException(Throwable th, ActorRef actorRef) {
        if (!(th instanceof CompletionException) || !(th.getCause() instanceof DittoRuntimeException)) {
            throw ((RuntimeException) th);
        }
        actorRef.tell(th.getCause(), ActorRef.noSender());
        return null;
    }

    private <T extends Signal<?>> void handleResponse(@Nullable T t, ActorRef actorRef) {
        if (null != t) {
            LOGGER.withCorrelationId(t.getDittoHeaders()).debug("Forwarding response: {}", t);
            actorRef.tell(t, ActorRef.noSender());
        }
    }

    private <R extends CommandResponse<?>> Function<Object, R> getResponseCaster(Command<?> command) {
        return obj -> {
            if (CommandResponse.class.isAssignableFrom(obj.getClass())) {
                return (CommandResponse) obj;
            }
            if (!(obj instanceof AskException) && !(obj instanceof AskTimeoutException)) {
                throw reportErrorOrResponse(command, obj);
            }
            Optional<DittoRuntimeException> handleAskTimeoutForCommand = handleAskTimeoutForCommand(command, (Throwable) obj);
            if (handleAskTimeoutForCommand.isPresent()) {
                throw handleAskTimeoutForCommand.get();
            }
            return null;
        };
    }

    private DittoRuntimeException reportErrorOrResponse(Command<?> command, @Nullable Object obj) {
        return obj instanceof Throwable ? reportError(command, (Throwable) obj) : obj != null ? reportUnknownResponse(command, obj) : reportError(command, new NullPointerException("Response and error were null."));
    }

    private DittoRuntimeException reportError(Command<?> command, @Nullable Throwable th) {
        DittoRuntimeException asDittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(th == null ? new NullPointerException("Result and error are both null") : th, th2 -> {
            return reportUnexpectedError(command, th2);
        });
        LOGGER.withCorrelationId(command).info("{}: {}", asDittoRuntimeException.getClass().getSimpleName(), asDittoRuntimeException.getMessage());
        return asDittoRuntimeException;
    }

    private DittoRuntimeException reportUnexpectedError(Command<?> command, Throwable th) {
        LOGGER.error("Unexpected error", th);
        return DittoInternalErrorException.newBuilder().cause(th).dittoHeaders(command.getDittoHeaders()).build();
    }

    private DittoInternalErrorException reportUnknownResponse(Command<?> command, Object obj) {
        LOGGER.error("Unexpected response: <{}>", obj);
        return DittoInternalErrorException.newBuilder().dittoHeaders(command.getDittoHeaders()).build();
    }

    private Optional<DittoRuntimeException> handleAskTimeoutForCommand(Command<?> command, Throwable th) {
        LOGGER.withCorrelationId(command.getDittoHeaders()).error("Encountered timeout in edge forwarding", th);
        return Optional.of(EdgeServiceTimeoutException.newBuilder().dittoHeaders(command.getDittoHeaders()).build());
    }
}
