package org.eclipse.ditto.services.amqpbridge.messaging;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import kamon.Kamon;
import kamon.trace.TraceContext;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.model.base.auth.AuthorizationSubject;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.DittoHeadersBuilder;
import org.eclipse.ditto.protocoladapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocoladapter.JsonifiableAdaptable;
import org.eclipse.ditto.protocoladapter.ProtocolFactory;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import scala.Option;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/ditto/services/amqpbridge/messaging/CommandProcessorActor.class */
public final class CommandProcessorActor extends AbstractActor {
    static final String ACTOR_NAME_PREFIX = "amqpCommandProcessor-";
    private static final String REPLY_TO_HEADER = "replyTo";
    private static final DittoProtocolAdapter PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
    private final DiagnosticLoggingAdapter log;
    private final ActorRef pubSubMediator;
    private final String pubSubTargetActorPath;
    private final AuthorizationSubject authorizationSubject;
    private final Map<String, TraceContext> traces;

    private CommandProcessorActor(ActorRef actorRef, String str, AuthorizationSubject authorizationSubject) {
        this.log = LogUtil.obtain(this);
        this.pubSubMediator = actorRef;
        this.pubSubTargetActorPath = str;
        this.authorizationSubject = authorizationSubject;
        this.traces = new HashMap();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(final ActorRef actorRef, final String str, final AuthorizationSubject authorizationSubject) {
        return Props.create(CommandProcessorActor.class, new Creator<CommandProcessorActor>() { // from class: org.eclipse.ditto.services.amqpbridge.messaging.CommandProcessorActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public CommandProcessorActor m2create() {
                return new CommandProcessorActor(actorRef, str, authorizationSubject);
            }
        });
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Message.class, this::handleMessage).match(CommandResponse.class, this::handleCommandResponse).match(DittoRuntimeException.class, this::handleDittoRuntimeException).match(Status.Failure.class, failure -> {
            this.log.error(failure.cause(), "Got an unexpected failure.");
        }).matchAny(obj -> {
            this.log.debug("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private void handleDittoRuntimeException(DittoRuntimeException dittoRuntimeException) {
        ThingErrorResponse of = ThingErrorResponse.of(dittoRuntimeException);
        logDittoRuntimeException(dittoRuntimeException);
        handleCommandResponse(of);
    }

    private void logDittoRuntimeException(DittoRuntimeException dittoRuntimeException) {
        LogUtil.enhanceLogWithCorrelationId(this.log, dittoRuntimeException);
        this.log.info("Got DittoRuntimeException '{}' when command via AMQP was processed: {}", dittoRuntimeException.getErrorCode(), dittoRuntimeException.getMessage());
    }

    private void handleCommandResponse(CommandResponse commandResponse) {
        Optional correlationId = commandResponse.getDittoHeaders().getCorrelationId();
        LogUtil.enhanceLogWithCorrelationId(this.log, correlationId);
        if (commandResponse.getStatusCodeValue() < HttpStatusCode.BAD_REQUEST.toInt()) {
            this.log.debug("Received response: {}", commandResponse);
        } else {
            this.log.debug("Received error response: {}", commandResponse);
        }
        Map<String, TraceContext> map = this.traces;
        map.getClass();
        Optional map2 = correlationId.map((v1) -> {
            return r1.remove(v1);
        });
        if (map2.isPresent()) {
            ((TraceContext) map2.get()).finish();
        } else {
            this.log.warning("Trace missing for response: '{}'", commandResponse);
        }
    }

    private void handleMessage(Message message) throws JMSException {
        this.log.debug("Received Message: {}", message);
        Command<?> buildCommandFromPublicProtocol = buildCommandFromPublicProtocol(message);
        if (buildCommandFromPublicProtocol != null) {
            traceCommand(buildCommandFromPublicProtocol);
            this.log.info("Publishing '{}' from AMQP Message '{}'", buildCommandFromPublicProtocol.getType(), message.getJMSMessageID());
            this.pubSubMediator.tell(new DistributedPubSubMediator.Send(this.pubSubTargetActorPath, buildCommandFromPublicProtocol, true), getSelf());
        }
    }

    private void traceCommand(Command<?> command) {
        command.getDittoHeaders().getCorrelationId().ifPresent(str -> {
            TraceContext newContext = Kamon.tracer().newContext("roundtrip.amqp_" + command.getType(), Option.apply(str));
            newContext.addMetadata("command", command.getType());
            this.traces.put(str, newContext);
        });
    }

    private Command<?> buildCommandFromPublicProtocol(Message message) throws JMSException {
        String extractCommandStringFromMessage = extractCommandStringFromMessage(message);
        DittoHeadersBuilder newBuilder = DittoHeaders.newBuilder(extractHeadersMapFromJmsMessage(message));
        String valueOf = message.getJMSReplyTo() != null ? String.valueOf(message.getJMSReplyTo()) : null;
        if (valueOf != null) {
            newBuilder.putHeader(REPLY_TO_HEADER, valueOf);
        }
        String jMSCorrelationID = message.getJMSCorrelationID() != null ? message.getJMSCorrelationID() : message.getJMSMessageID();
        newBuilder.authorizationSubjects(this.authorizationSubject.getId(), new CharSequence[0]);
        try {
            JsonifiableAdaptable jsonifiableAdaptableFromJson = ProtocolFactory.jsonifiableAdaptableFromJson(JsonFactory.newObject(extractCommandStringFromMessage));
            String str = (String) jsonifiableAdaptableFromJson.getHeaders().flatMap((v0) -> {
                return v0.getCorrelationId();
            }).orElse(jMSCorrelationID);
            newBuilder.correlationId(str);
            LogUtil.enhanceLogWithCorrelationId(this.log, str);
            this.log.debug("received public command: {}", jsonifiableAdaptableFromJson.toJsonString());
            return PROTOCOL_ADAPTER.fromAdaptable(jsonifiableAdaptableFromJson).setDittoHeaders(newBuilder.build());
        } catch (DittoRuntimeException e) {
            this.log.info("Got DittoRuntimeException '{}' when command was parsed: {}", e.getErrorCode(), e.getMessage());
            return null;
        } catch (Exception e2) {
            this.log.info("Unexpected Exception: {}", e2.getMessage(), e2);
            return null;
        }
    }

    private String extractCommandStringFromMessage(Message message) throws JMSException {
        if (message instanceof TextMessage) {
            return ((TextMessage) message).getText();
        }
        if (!(message instanceof BytesMessage)) {
            throw new IllegalArgumentException("Only messages of type TEXT or BYTE are supported.");
        }
        BytesMessage bytesMessage = (BytesMessage) message;
        long bodyLength = bytesMessage.getBodyLength();
        if (bodyLength < -2147483648L || bodyLength > 2147483647L) {
            throw new IllegalArgumentException("Message too large...");
        }
        ByteBuffer allocate = ByteBuffer.allocate((int) bodyLength);
        bytesMessage.readBytes(allocate.array());
        return new String(allocate.array(), StandardCharsets.UTF_8);
    }

    private Map<String, String> extractHeadersMapFromJmsMessage(Message message) throws JMSException {
        return (Map) Collections.list(message.getPropertyNames()).stream().map(str -> {
            return getPropertyAsEntry(message, str);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private Map.Entry<String, String> getPropertyAsEntry(Message message, String str) {
        try {
            return new AbstractMap.SimpleImmutableEntry(str, message.getObjectProperty(str).toString());
        } catch (JMSException e) {
            this.log.debug("Property '{}' could not be read, dropping...", str);
            return null;
        }
    }
}
