package org.eclipse.ditto.services.connectivity.messaging;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import kamon.Kamon;
import kamon.trace.TraceContext;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.model.base.auth.AuthorizationContext;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.DittoHeadersBuilder;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.connectivity.ExternalMessage;
import org.eclipse.ditto.services.connectivity.messaging.DittoHeadersFilter;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.cache.Cache;
import org.eclipse.ditto.services.utils.cache.CaffeineCache;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import scala.Option;

/* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/MessageMappingProcessorActor.class */
public final class MessageMappingProcessorActor extends AbstractActor {
    public static final String ACTOR_NAME = "messageMappingProcessor";
    private final DiagnosticLoggingAdapter log;
    private final ActorRef publisherActor;
    private final AuthorizationContext authorizationContext;
    private final Cache<String, TraceContext> traces;
    private final DittoHeadersFilter headerFilter;
    private final MessageMappingProcessor processor;
    private final String connectionId;
    private final ActorRef conciergeForwarder;

    private MessageMappingProcessorActor(ActorRef actorRef, ActorRef actorRef2, AuthorizationContext authorizationContext, DittoHeadersFilter dittoHeadersFilter, MessageMappingProcessor messageMappingProcessor, String str) {
        this.log = LogUtil.obtain(this);
        this.publisherActor = actorRef;
        this.conciergeForwarder = actorRef2;
        this.authorizationContext = authorizationContext;
        this.processor = messageMappingProcessor;
        this.headerFilter = dittoHeadersFilter;
        this.connectionId = str;
        this.traces = CaffeineCache.of(Caffeine.newBuilder().expireAfterWrite(5L, TimeUnit.MINUTES));
    }

    public static Props props(final ActorRef actorRef, final ActorRef actorRef2, final AuthorizationContext authorizationContext, final DittoHeadersFilter dittoHeadersFilter, final MessageMappingProcessor messageMappingProcessor, final String str) {
        return Props.create(MessageMappingProcessorActor.class, new Creator<MessageMappingProcessorActor>() { // from class: org.eclipse.ditto.services.connectivity.messaging.MessageMappingProcessorActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public MessageMappingProcessorActor m8create() {
                return new MessageMappingProcessorActor(actorRef, actorRef2, authorizationContext, dittoHeadersFilter, messageMappingProcessor, str);
            }
        });
    }

    public static Props props(ActorRef actorRef, ActorRef actorRef2, AuthorizationContext authorizationContext, MessageMappingProcessor messageMappingProcessor, String str) {
        return props(actorRef, actorRef2, authorizationContext, new DittoHeadersFilter(DittoHeadersFilter.Mode.EXCLUDE, Collections.emptyList()), messageMappingProcessor, str);
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(ExternalMessage.class, this::handle).match(CommandResponse.class, this::handleCommandResponse).match(Signal.class, this::handleSignal).match(DittoRuntimeException.class, this::handleDittoRuntimeException).match(Status.Failure.class, failure -> {
            this.log.warning("Got failure with cause {}: {}", failure.cause().getClass().getSimpleName(), failure.cause().getMessage());
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private void handle(ExternalMessage externalMessage) {
        ConditionChecker.checkNotNull(externalMessage);
        LogUtil.enhanceLogWithCorrelationId(this.log, (String) externalMessage.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey()));
        LogUtil.enhanceLogWithCustomField(this.log, BaseClientData.MDC_CONNECTION_ID, this.connectionId);
        this.log.debug("Handling ExternalMessage: {}", externalMessage);
        try {
            this.processor.process(externalMessage.withHeader(DittoHeaderDefinition.AUTHORIZATION_SUBJECTS.getKey(), ((JsonArray) this.authorizationContext.stream().map((v0) -> {
                return v0.getId();
            }).map(JsonFactory::newValue).collect(JsonCollectors.valuesToArray())).toString())).ifPresent(signal -> {
                enhanceLogUtil(signal);
                DittoHeadersBuilder authorizationContext = signal.getDittoHeaders().toBuilder().authorizationContext(this.authorizationContext);
                if (!signal.getDittoHeaders().getOrigin().isPresent()) {
                    authorizationContext.origin(this.connectionId);
                }
                Signal<?> signal = (Signal) signal.setDittoHeaders(authorizationContext.build());
                startTrace(signal);
                this.log.info("Sending '{}' using conciergeForwarder", signal.getType());
                this.conciergeForwarder.tell(signal, getSelf());
            });
        } catch (DittoRuntimeException e) {
            handleDittoRuntimeException(e, DittoHeaders.of(externalMessage.getHeaders()));
        } catch (Exception e2) {
            this.log.warning("Got <{}> when message was processed: <{}>", e2.getClass().getSimpleName(), e2.getMessage());
        }
    }

    private void enhanceLogUtil(WithDittoHeaders<?> withDittoHeaders) {
        LogUtil.enhanceLogWithCorrelationId(this.log, withDittoHeaders);
        LogUtil.enhanceLogWithCustomField(this.log, BaseClientData.MDC_CONNECTION_ID, this.connectionId);
    }

    private void handleDittoRuntimeException(DittoRuntimeException dittoRuntimeException) {
        handleDittoRuntimeException(dittoRuntimeException, DittoHeaders.empty());
    }

    private void handleDittoRuntimeException(DittoRuntimeException dittoRuntimeException, DittoHeaders dittoHeaders) {
        ThingErrorResponse of = ThingErrorResponse.of(dittoRuntimeException, DittoHeaders.newBuilder(dittoRuntimeException.getDittoHeaders()).putHeaders(dittoHeaders).build());
        enhanceLogUtil(dittoRuntimeException);
        this.log.info("Got DittoRuntimeException '{}' when ExternalMessage was processed: {} - {}", dittoRuntimeException.getErrorCode(), dittoRuntimeException.getMessage(), dittoRuntimeException.getDescription().orElse(""));
        handleCommandResponse(of);
    }

    private void handleCommandResponse(CommandResponse<?> commandResponse) {
        enhanceLogUtil(commandResponse);
        finishTrace(commandResponse);
        if (!commandResponse.getDittoHeaders().isResponseRequired()) {
            this.log.debug("Requester did not require response (via DittoHeader '{}') - not mapping back to ExternalMessage", DittoHeaderDefinition.RESPONSE_REQUIRED);
            return;
        }
        if (commandResponse.getStatusCodeValue() < HttpStatusCode.BAD_REQUEST.toInt()) {
            this.log.debug("Received response: {}", commandResponse);
        } else {
            this.log.debug("Received error response: {}", commandResponse.toJsonString());
        }
        handleSignal(commandResponse);
    }

    private void handleSignal(Signal<?> signal) {
        enhanceLogUtil(signal);
        this.log.debug("Handling signal: {}", signal);
        try {
            this.processor.process(signal.setDittoHeaders(this.headerFilter.apply(signal.getDittoHeaders()))).ifPresent(externalMessage -> {
                this.publisherActor.forward(externalMessage, getContext());
            });
        } catch (Exception e) {
            this.log.warning("Got unexpected exception during processing Signal: {}", e.getMessage());
        } catch (DittoRuntimeException e2) {
            this.log.info("Got DittoRuntimeException during processing Signal: {} - {}", e2.getMessage(), e2.getDescription().orElse(""));
        }
    }

    private void startTrace(Signal<?> signal) {
        signal.getDittoHeaders().getCorrelationId().ifPresent(str -> {
            this.traces.put(str, createRoundtripContext(str, this.connectionId, signal.getType()));
        });
    }

    private void finishTrace(Signal<?> signal) {
        if (ThingErrorResponse.class.isAssignableFrom(signal.getClass())) {
            finishTrace(signal, (Throwable) ((ThingErrorResponse) signal).getDittoRuntimeException());
        } else {
            finishTrace(signal, (Throwable) null);
        }
    }

    private void finishTrace(Signal<?> signal, @Nullable Throwable th) {
        signal.getDittoHeaders().getCorrelationId().ifPresent(str -> {
            try {
                finishTrace(str, th);
            } catch (IllegalArgumentException e) {
                this.log.debug("Trace missing for response: '{}'", signal);
            }
        });
    }

    private void finishTrace(String str, @Nullable Throwable th) {
        Optional blocking = this.traces.getBlocking(str);
        if (!blocking.isPresent()) {
            throw new IllegalArgumentException("No trace found for correlationId: " + str);
        }
        TraceContext traceContext = (TraceContext) blocking.get();
        this.traces.invalidate(str);
        if (Objects.isNull(th)) {
            traceContext.finish();
        } else {
            traceContext.finishWithError(th);
        }
    }

    private static TraceContext createRoundtripContext(String str, String str2, String str3) {
        TraceContext newContext = Kamon.tracer().newContext("roundtrip." + str2 + "." + str3, Option.apply(str));
        newContext.addMetadata("command", str3);
        return newContext;
    }
}
