package org.eclipse.ditto.services.connectivity.messaging.rabbitmq;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BasicProperties;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.connectivity.AddressMetric;
import org.eclipse.ditto.model.connectivity.ConnectionStatus;
import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory;
import org.eclipse.ditto.model.connectivity.ExternalMessage;
import org.eclipse.ditto.model.connectivity.ExternalMessageBuilder;
import org.eclipse.ditto.services.connectivity.mapping.MessageMappers;
import org.eclipse.ditto.services.connectivity.messaging.MessageMappingProcessorActor;
import org.eclipse.ditto.services.connectivity.messaging.internal.RetrieveAddressMetric;
import org.eclipse.ditto.services.utils.akka.LogUtil;

/* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/rabbitmq/RabbitMQConsumerActor.class */
public final class RabbitMQConsumerActor extends AbstractActor {
    private static final String MESSAGE_ID_HEADER = "messageId";
    private static final Set<String> CONTENT_TYPES_INTERPRETED_AS_TEXT = Collections.unmodifiableSet(new HashSet(Arrays.asList("text/plain", "text/html", "text/yaml", "application/json", "application/xml")));
    private final DiagnosticLoggingAdapter log;
    private final String sourceAddress;
    private final ActorRef messageMappingProcessor;
    private long consumedMessages;
    private Instant lastMessageConsumedAt;

    @Nullable
    private AddressMetric addressMetric;

    private RabbitMQConsumerActor(String str, ActorRef actorRef) {
        this.log = LogUtil.obtain(this);
        this.consumedMessages = 0L;
        this.addressMetric = null;
        this.sourceAddress = (String) ConditionChecker.checkNotNull(str, "source");
        this.messageMappingProcessor = (ActorRef) ConditionChecker.checkNotNull(actorRef, MessageMappingProcessorActor.ACTOR_NAME);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(final String str, final ActorRef actorRef) {
        return Props.create(RabbitMQConsumerActor.class, new Creator<RabbitMQConsumerActor>() { // from class: org.eclipse.ditto.services.connectivity.messaging.rabbitmq.RabbitMQConsumerActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public RabbitMQConsumerActor m21create() {
                return new RabbitMQConsumerActor(str, actorRef);
            }
        });
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Delivery.class, this::handleDelivery).match(AddressMetric.class, this::handleAddressMetric).match(RetrieveAddressMetric.class, retrieveAddressMetric -> {
            getSender().tell(ConnectivityModelFactory.newAddressMetric(this.addressMetric != null ? this.addressMetric.getStatus() : ConnectionStatus.UNKNOWN, this.addressMetric != null ? (String) this.addressMetric.getStatusDetails().orElse(null) : null, this.consumedMessages, this.lastMessageConsumedAt), getSelf());
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private void handleAddressMetric(AddressMetric addressMetric) {
        this.addressMetric = addressMetric;
    }

    private void handleDelivery(Delivery delivery) {
        this.consumedMessages++;
        this.lastMessageConsumedAt = Instant.now();
        AMQP.BasicProperties properties = delivery.getProperties();
        Envelope envelope = delivery.getEnvelope();
        byte[] body = delivery.getBody();
        LogUtil.enhanceLogWithCorrelationId(this.log, properties.getCorrelationId());
        if (this.log.isDebugEnabled()) {
            this.log.debug("Received message from RabbitMQ ({}//{}): {}", envelope, properties, new String(body, StandardCharsets.UTF_8));
        }
        try {
            Map<String, String> extractHeadersFromMessage = extractHeadersFromMessage(properties, envelope);
            extractHeadersFromMessage.put(DittoHeaderDefinition.SOURCE.getKey(), this.sourceAddress);
            ExternalMessageBuilder newExternalMessageBuilder = ConnectivityModelFactory.newExternalMessageBuilder(extractHeadersFromMessage);
            String contentType = properties.getContentType();
            if (shouldBeInterpretedAsText(contentType)) {
                newExternalMessageBuilder.withText(new String(body, MessageMappers.determineCharset(contentType)));
            } else {
                newExternalMessageBuilder.withBytes(body);
            }
            this.messageMappingProcessor.forward(newExternalMessageBuilder.build(), getContext());
        } catch (Exception e) {
            this.log.warning("Processing delivery {} failed: {}", Long.valueOf(envelope.getDeliveryTag()), e.getMessage(), e);
        }
    }

    private static boolean shouldBeInterpretedAsText(@Nullable String str) {
        if (str != null) {
            Stream<String> stream = CONTENT_TYPES_INTERPRETED_AS_TEXT.stream();
            str.getClass();
            if (stream.anyMatch(str::startsWith)) {
                return true;
            }
        }
        return false;
    }

    private Map<String, String> extractHeadersFromMessage(BasicProperties basicProperties, Envelope envelope) {
        Map<String, String> map = (Map) Optional.ofNullable(basicProperties.getHeaders()).map((v0) -> {
            return v0.entrySet();
        }).map(this::setToStringStringMap).orElseGet(HashMap::new);
        if (basicProperties.getReplyTo() != null) {
            map.put("replyTo", basicProperties.getReplyTo());
        }
        if (basicProperties.getCorrelationId() != null) {
            map.put(DittoHeaderDefinition.CORRELATION_ID.getKey(), basicProperties.getCorrelationId());
        }
        if (basicProperties.getContentType() != null) {
            map.put(ExternalMessage.CONTENT_TYPE_HEADER, basicProperties.getContentType());
        }
        map.put(MESSAGE_ID_HEADER, Long.toString(envelope.getDeliveryTag()));
        return map;
    }

    private Map<String, String> setToStringStringMap(Set<Map.Entry<String, Object>> set) {
        return (Map) set.stream().filter(entry -> {
            return Objects.nonNull(entry.getValue());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return String.valueOf(entry2.getValue());
        }));
    }
}
