package org.eclipse.ditto.services.connectivity.messaging.amqp;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.connectivity.AddressMetric;
import org.eclipse.ditto.model.connectivity.ConnectionStatus;
import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory;
import org.eclipse.ditto.model.connectivity.ExternalMessage;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.services.connectivity.messaging.BasePublisherActor;
import org.eclipse.ditto.services.connectivity.messaging.internal.RetrieveAddressMetric;
import org.eclipse.ditto.services.utils.akka.LogUtil;

/* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpPublisherActor.class */
public final class AmqpPublisherActor extends BasePublisherActor<AmqpTarget> {
    static final String ACTOR_NAME = "amqpPublisherActor";
    private final DiagnosticLoggingAdapter log;
    private final Session session;
    private final Map<Destination, MessageProducer> producerMap;
    private AddressMetric addressMetric;
    private long publishedMessages;
    private Instant lastMessagePublishedAt;

    private AmqpPublisherActor(Session session, Set<Target> set) {
        super(set);
        this.log = LogUtil.obtain(this);
        this.publishedMessages = 0L;
        this.session = (Session) ConditionChecker.checkNotNull(session, "session");
        this.producerMap = new HashMap();
        this.addressMetric = ConnectivityModelFactory.newAddressMetric(ConnectionStatus.OPEN, "Started at " + Instant.now(), 0L, (Instant) null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(final Session session, final Set<Target> set) {
        return Props.create(AmqpPublisherActor.class, new Creator<AmqpPublisherActor>() { // from class: org.eclipse.ditto.services.connectivity.messaging.amqp.AmqpPublisherActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public AmqpPublisherActor m12create() {
                return new AmqpPublisherActor(session, set);
            }
        });
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(ExternalMessage.class, this::isResponseOrError, externalMessage -> {
            LogUtil.enhanceLogWithCorrelationId(this.log, (String) externalMessage.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey()));
            this.log.debug("Received response or error {} ", externalMessage);
            String str = (String) externalMessage.getHeaders().get("replyTo");
            if (str != null) {
                sendMessage(AmqpTarget.fromTargetAddress(str), externalMessage);
            } else {
                this.log.info("Response dropped, missing replyTo address: {}", externalMessage);
            }
        }).match(ExternalMessage.class, externalMessage2 -> {
            LogUtil.enhanceLogWithCorrelationId(this.log, (String) externalMessage2.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey()));
            this.log.debug("Received mapped message {} ", externalMessage2);
            getDestinationForMessage(externalMessage2).forEach(amqpTarget -> {
                sendMessage(amqpTarget, externalMessage2);
            });
        }).match(AddressMetric.class, this::handleAddressMetric).match(RetrieveAddressMetric.class, retrieveAddressMetric -> {
            getSender().tell(ConnectivityModelFactory.newAddressMetric(this.addressMetric.getStatus(), (String) this.addressMetric.getStatusDetails().orElse(null), this.publishedMessages, this.lastMessagePublishedAt), getSelf());
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.eclipse.ditto.services.connectivity.messaging.BasePublisherActor
    public AmqpTarget toPublishTarget(String str) {
        return AmqpTarget.fromTargetAddress(str);
    }

    private void handleAddressMetric(AddressMetric addressMetric) {
        this.addressMetric = addressMetric;
    }

    private void sendMessage(AmqpTarget amqpTarget, ExternalMessage externalMessage) {
        try {
            MessageProducer producer = getProducer(amqpTarget.getJmsDestination());
            if (producer != null) {
                producer.send(toJmsMessage(externalMessage));
                this.publishedMessages++;
                this.lastMessagePublishedAt = Instant.now();
            } else {
                this.log.warning("No producer for destination {} available.", amqpTarget);
            }
        } catch (JMSException e) {
            this.log.info("Failed to send JMS response: {}", e.getMessage());
        }
    }

    private Message toJmsMessage(ExternalMessage externalMessage) throws JMSException {
        TextMessage createMessage;
        Optional textPayload = externalMessage.getTextPayload();
        if (textPayload.isPresent()) {
            createMessage = this.session.createTextMessage((String) textPayload.get());
        } else if (externalMessage.getBytePayload().isPresent()) {
            TextMessage createBytesMessage = this.session.createBytesMessage();
            createBytesMessage.writeBytes((byte[]) externalMessage.getBytePayload().map((v0) -> {
                return v0.array();
            }).orElse(new byte[0]));
            createMessage = createBytesMessage;
        } else {
            createMessage = this.session.createMessage();
        }
        if (createMessage instanceof JmsMessage) {
            AmqpJmsMessageFacade facade = ((JmsMessage) createMessage).getFacade();
            if (facade instanceof AmqpJmsMessageFacade) {
                AmqpJmsMessageFacade amqpJmsMessageFacade = facade;
                externalMessage.getHeaders().forEach((str, str2) -> {
                    try {
                        amqpJmsMessageFacade.setApplicationProperty(str, str2);
                    } catch (JMSException e) {
                        this.log.warning("Could not set application-property <{}>", str);
                    }
                });
            }
        }
        createMessage.setJMSCorrelationID((String) externalMessage.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey()));
        return createMessage;
    }

    @Nullable
    private MessageProducer getProducer(Destination destination) {
        return (MessageProducer) Optional.of(destination).map(destination2 -> {
            return this.producerMap.computeIfAbsent(destination, this::createMessageProducer);
        }).orElse(null);
    }

    @Nullable
    private MessageProducer createMessageProducer(Destination destination) {
        this.log.debug("Creating AMQP Producer for '{}'", destination);
        try {
            return this.session.createProducer(destination);
        } catch (JMSException e) {
            this.log.warning("Could not create producer for {}.", destination);
            return null;
        }
    }

    public void postStop() throws Exception {
        super.postStop();
        this.producerMap.forEach((destination, messageProducer) -> {
            try {
                this.log.debug("Closing AMQP Producer for '{}'", destination);
                messageProducer.close();
            } catch (JMSException e) {
                this.log.debug("Closing consumer failed (can be ignored if connection was closed already): {}", e.getMessage());
            }
        });
    }
}
