package org.eclipse.ditto.services.connectivity.messaging.rabbitmq;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import com.newmotion.akka.rabbitmq.ChannelCreated;
import com.newmotion.akka.rabbitmq.ChannelMessage;
import com.rabbitmq.client.AMQP;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.connectivity.AddressMetric;
import org.eclipse.ditto.model.connectivity.ConnectionStatus;
import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory;
import org.eclipse.ditto.model.connectivity.ExternalMessage;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.services.connectivity.mapping.MessageMappers;
import org.eclipse.ditto.services.connectivity.messaging.BasePublisherActor;
import org.eclipse.ditto.services.connectivity.messaging.internal.RetrieveAddressMetric;
import org.eclipse.ditto.services.utils.akka.LogUtil;

/* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/rabbitmq/RabbitMQPublisherActor.class */
public final class RabbitMQPublisherActor extends BasePublisherActor<RabbitMQTarget> {
    static final String ACTOR_NAME = "rmqPublisherActor";
    private static final String DEFAULT_EXCHANGE = "";
    private final DiagnosticLoggingAdapter log;

    @Nullable
    private ActorRef channelActor;
    private long publishedMessages;
    private Instant lastMessagePublishedAt;

    @Nullable
    private AddressMetric addressMetric;

    private RabbitMQPublisherActor(Set<Target> set) {
        super(set);
        this.log = LogUtil.obtain(this);
        this.publishedMessages = 0L;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(final Set<Target> set) {
        return Props.create(RabbitMQPublisherActor.class, new Creator<RabbitMQPublisherActor>() { // from class: org.eclipse.ditto.services.connectivity.messaging.rabbitmq.RabbitMQPublisherActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public RabbitMQPublisherActor m22create() {
                return new RabbitMQPublisherActor(set);
            }
        });
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(ChannelCreated.class, channelCreated -> {
            this.channelActor = channelCreated.channel();
            this.addressMetric = ConnectivityModelFactory.newAddressMetric(ConnectionStatus.OPEN, "Started at " + Instant.now(), 0L, (Instant) null);
            Set set = (Set) getDestinations().values().stream().flatMap((v0) -> {
                return v0.stream();
            }).map((v0) -> {
                return v0.getExchange();
            }).collect(Collectors.toSet());
            channelCreated.channel().tell(ChannelMessage.apply(channel -> {
                set.forEach(str -> {
                    this.log.debug("Checking for existence of exchange <{}>", str);
                    try {
                        channel.exchangeDeclarePassive(str);
                    } catch (IOException e) {
                        this.log.warning("Failed to declare exchange <{}> passively", str);
                        this.addressMetric = ConnectivityModelFactory.newAddressMetric(ConnectionStatus.FAILED, "Exchange '" + str + "' was missing at " + Instant.now(), 0L, (Instant) null);
                    }
                });
                return null;
            }, false), getSelf());
        }).match(ExternalMessage.class, this::isResponseOrError, externalMessage -> {
            LogUtil.enhanceLogWithCorrelationId(this.log, (String) externalMessage.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey()));
            this.log.debug("Received mapped response {} ", externalMessage);
            String str = (String) externalMessage.getHeaders().get("replyTo");
            if (str != null) {
                publishMessage(RabbitMQTarget.of(DEFAULT_EXCHANGE, str), externalMessage);
            } else {
                this.log.info("Response dropped, missing replyTo address: {}", externalMessage);
            }
        }).match(ExternalMessage.class, externalMessage2 -> {
            LogUtil.enhanceLogWithCorrelationId(this.log, (String) externalMessage2.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey()));
            this.log.debug("Received mapped message {} ", externalMessage2);
            Set<RabbitMQTarget> destinationForMessage = getDestinationForMessage(externalMessage2);
            this.log.debug("Publishing message to targets <{}>: {} ", destinationForMessage, externalMessage2);
            destinationForMessage.forEach(rabbitMQTarget -> {
                publishMessage(rabbitMQTarget, externalMessage2);
            });
        }).match(AddressMetric.class, this::handleAddressMetric).match(RetrieveAddressMetric.class, retrieveAddressMetric -> {
            getSender().tell(ConnectivityModelFactory.newAddressMetric(this.addressMetric != null ? this.addressMetric.getStatus() : ConnectionStatus.UNKNOWN, this.addressMetric != null ? (String) this.addressMetric.getStatusDetails().orElse(null) : null, this.publishedMessages, this.lastMessagePublishedAt), getSelf());
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.eclipse.ditto.services.connectivity.messaging.BasePublisherActor
    public RabbitMQTarget toPublishTarget(String str) {
        return RabbitMQTarget.fromTargetAddress(str);
    }

    private void handleAddressMetric(AddressMetric addressMetric) {
        this.addressMetric = addressMetric;
    }

    private void publishMessage(RabbitMQTarget rabbitMQTarget, ExternalMessage externalMessage) {
        if (this.channelActor == null) {
            this.log.info("No channel available, dropping response.");
            return;
        }
        if (rabbitMQTarget.getRoutingKey() == null) {
            this.log.warning("No routing key, dropping message.");
            return;
        }
        this.publishedMessages++;
        this.lastMessagePublishedAt = Instant.now();
        String str = (String) externalMessage.getHeaders().get(ExternalMessage.CONTENT_TYPE_HEADER);
        AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().contentType(str).correlationId((String) externalMessage.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey())).headers((Map) externalMessage.getHeaders().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return entry.getValue();
        }))).build();
        byte[] bArr = externalMessage.isTextMessage() ? (byte[]) externalMessage.getTextPayload().map(str2 -> {
            return str2.getBytes(MessageMappers.determineCharset(str));
        }).orElseThrow(() -> {
            return new IllegalArgumentException("Failed to convert text to bytes.");
        }) : (byte[]) externalMessage.getBytePayload().map((v0) -> {
            return v0.array();
        }).orElse(new byte[0]);
        this.channelActor.tell(ChannelMessage.apply(channel -> {
            try {
                this.log.debug("Publishing to exchange <{}> and routing key <{}>: {}", rabbitMQTarget.getExchange(), rabbitMQTarget.getRoutingKey(), build);
                channel.basicPublish(rabbitMQTarget.getExchange(), rabbitMQTarget.getRoutingKey(), build, bArr);
                return null;
            } catch (Exception e) {
                this.log.warning("Failed to publish message to RabbitMQ: {}", e.getMessage());
                return null;
            }
        }, false), getSelf());
    }
}
