package org.eclipse.ditto.services.connectivity.messaging.amqp;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.connectivity.AddressMetric;
import org.eclipse.ditto.model.connectivity.ConnectionStatus;
import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory;
import org.eclipse.ditto.model.connectivity.ExternalMessage;
import org.eclipse.ditto.model.connectivity.ExternalMessageBuilder;
import org.eclipse.ditto.services.connectivity.messaging.MessageMappingProcessorActor;
import org.eclipse.ditto.services.connectivity.messaging.internal.RetrieveAddressMetric;
import org.eclipse.ditto.services.utils.akka.LogUtil;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpConsumerActor.class */
public final class AmqpConsumerActor extends AbstractActor implements MessageListener {
    static final String ACTOR_NAME_PREFIX = "amqpConsumerActor-";
    private final DiagnosticLoggingAdapter log;
    private final String sourceAddress;
    private final MessageConsumer messageConsumer;
    private final ActorRef messageMappingProcessor;
    private AddressMetric addressMetric;
    private long consumedMessages;
    private Instant lastMessageConsumedAt;

    private AmqpConsumerActor(String str, MessageConsumer messageConsumer, ActorRef actorRef) {
        this.log = LogUtil.obtain(this);
        this.consumedMessages = 0L;
        this.sourceAddress = (String) ConditionChecker.checkNotNull(str, "source");
        this.messageConsumer = (MessageConsumer) ConditionChecker.checkNotNull(messageConsumer);
        this.messageMappingProcessor = (ActorRef) ConditionChecker.checkNotNull(actorRef, MessageMappingProcessorActor.ACTOR_NAME);
        this.addressMetric = ConnectivityModelFactory.newAddressMetric(ConnectionStatus.OPEN, "Started at " + Instant.now(), 0L, (Instant) null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(final String str, final MessageConsumer messageConsumer, final ActorRef actorRef) {
        return Props.create(AmqpConsumerActor.class, new Creator<AmqpConsumerActor>() { // from class: org.eclipse.ditto.services.connectivity.messaging.amqp.AmqpConsumerActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public AmqpConsumerActor m11create() {
                return new AmqpConsumerActor(str, messageConsumer, actorRef);
            }
        });
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(JmsMessage.class, this::handleJmsMessage).match(AddressMetric.class, this::handleAddressMetric).match(RetrieveAddressMetric.class, retrieveAddressMetric -> {
            getSender().tell(ConnectivityModelFactory.newAddressMetric(this.addressMetric.getStatus(), (String) this.addressMetric.getStatusDetails().orElse(null), this.consumedMessages, this.lastMessageConsumedAt), getSelf());
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    public void preStart() throws JMSException {
        this.messageConsumer.setMessageListener(this);
    }

    public void postStop() throws Exception {
        super.postStop();
        try {
            this.log.debug("Closing AMQP Consumer for '{}'", this.sourceAddress);
            this.messageConsumer.close();
        } catch (JMSException e) {
            this.log.debug("Closing consumer failed (can be ignored if connection was closed already): {}", e.getMessage());
        }
    }

    public void onMessage(Message message) {
        getSelf().tell(message, ActorRef.noSender());
    }

    private void handleAddressMetric(AddressMetric addressMetric) {
        this.addressMetric = addressMetric;
    }

    private void handleJmsMessage(JmsMessage jmsMessage) {
        this.consumedMessages++;
        this.lastMessageConsumedAt = Instant.now();
        try {
            try {
                try {
                    Map<String, String> extractHeadersMapFromJmsMessage = extractHeadersMapFromJmsMessage(jmsMessage);
                    extractHeadersMapFromJmsMessage.put(DittoHeaderDefinition.SOURCE.getKey(), this.sourceAddress);
                    ExternalMessageBuilder newExternalMessageBuilder = ConnectivityModelFactory.newExternalMessageBuilder(extractHeadersMapFromJmsMessage);
                    extractPayloadFromMessage(jmsMessage, newExternalMessageBuilder);
                    ExternalMessage build = newExternalMessageBuilder.build();
                    LogUtil.enhanceLogWithCorrelationId(this.log, build.findHeader(DittoHeaderDefinition.CORRELATION_ID.getKey()));
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Received message from AMQP 1.0 ({}): {}", build.getHeaders(), build.getTextPayload().orElse("binary"));
                    }
                    this.messageMappingProcessor.forward(build, getContext());
                } catch (Exception e) {
                    this.log.info("Unexpected {}: {}", e.getClass().getName(), e.getMessage());
                    try {
                        jmsMessage.acknowledge();
                    } catch (JMSException e2) {
                        this.log.error(e2, "Failed to ack an AMQP message");
                    }
                }
            } catch (DittoRuntimeException e3) {
                this.log.info("Got DittoRuntimeException '{}' when command was parsed: {}", e3.getErrorCode(), e3.getMessage());
                try {
                    jmsMessage.acknowledge();
                } catch (JMSException e4) {
                    this.log.error(e4, "Failed to ack an AMQP message");
                }
            }
        } finally {
            try {
                jmsMessage.acknowledge();
            } catch (JMSException e5) {
                this.log.error(e5, "Failed to ack an AMQP message");
            }
        }
    }

    private void extractPayloadFromMessage(Message message, ExternalMessageBuilder externalMessageBuilder) throws JMSException {
        if (message instanceof TextMessage) {
            externalMessageBuilder.withText(((TextMessage) message).getText());
            return;
        }
        if (!(message instanceof BytesMessage)) {
            throw new IllegalArgumentException("Only messages of type TEXT or BYTE are supported.");
        }
        BytesMessage bytesMessage = (BytesMessage) message;
        long bodyLength = bytesMessage.getBodyLength();
        if (bodyLength < -2147483648L || bodyLength > 2147483647L) {
            throw new IllegalArgumentException("Message too large...");
        }
        ByteBuffer allocate = ByteBuffer.allocate((int) bodyLength);
        bytesMessage.readBytes(allocate.array());
        externalMessageBuilder.withBytes(allocate);
    }

    private Map<String, String> extractHeadersMapFromJmsMessage(JmsMessage jmsMessage) throws JMSException {
        AmqpJmsMessageFacade facade = jmsMessage.getFacade();
        if (!(facade instanceof AmqpJmsMessageFacade)) {
            throw new JMSException("Message facade was not of type AmqpJmsMessageFacade");
        }
        AmqpJmsMessageFacade amqpJmsMessageFacade = facade;
        HashMap hashMap = new HashMap((Map) amqpJmsMessageFacade.getApplicationPropertyNames(amqpJmsMessageFacade.getPropertyNames()).stream().map(str -> {
            return getPropertyAsEntry(amqpJmsMessageFacade, str);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
        hashMap.put(ExternalMessage.CONTENT_TYPE_HEADER, amqpJmsMessageFacade.getContentType());
        String valueOf = jmsMessage.getJMSReplyTo() != null ? String.valueOf(jmsMessage.getJMSReplyTo()) : null;
        if (valueOf != null) {
            hashMap.put("replyTo", valueOf);
        }
        String jMSCorrelationID = jmsMessage.getJMSCorrelationID() != null ? jmsMessage.getJMSCorrelationID() : jmsMessage.getJMSMessageID();
        if (jMSCorrelationID != null) {
            hashMap.put(DittoHeaderDefinition.CORRELATION_ID.getKey(), jMSCorrelationID);
        }
        return hashMap;
    }

    @Nullable
    private Map.Entry<String, String> getPropertyAsEntry(AmqpJmsMessageFacade amqpJmsMessageFacade, String str) {
        try {
            return new AbstractMap.SimpleImmutableEntry(str, amqpJmsMessageFacade.getApplicationProperty(str).toString());
        } catch (JMSException e) {
            this.log.debug("Property '{}' could not be read, dropping...", str);
            return null;
        }
    }
}
