package io.elastic.sailor.impl;

import com.google.inject.name.Named;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.elastic.api.Module;
import io.elastic.sailor.Constants;
import io.elastic.sailor.ExecutionContext;
import io.elastic.sailor.ExecutionStats;
import io.elastic.sailor.MessageProcessor;
import io.elastic.sailor.Step;
import io.elastic.sailor.Utils;
import java.io.IOException;
import java.nio.charset.Charset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:io/elastic/sailor/impl/MessageConsumer.class */
public class MessageConsumer extends DefaultConsumer {
    private static final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
    private static final String MDC_TRACE_ID = "traceId";
    private final CryptoServiceImpl cipher;
    private final MessageProcessor processor;
    private final Module module;
    private final Step step;

    public MessageConsumer(Channel channel, CryptoServiceImpl cryptoServiceImpl, MessageProcessor messageProcessor, Module module, @Named("StepJson") Step step) {
        super(channel);
        this.cipher = cryptoServiceImpl;
        this.processor = messageProcessor;
        this.module = module;
        this.step = step;
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        long deliveryTag = envelope.getDeliveryTag();
        Object headerValue = getHeaderValue(basicProperties, Constants.AMQP_HEADER_MESSAGE_ID);
        Object headerValue2 = getHeaderValue(basicProperties, Constants.AMQP_HEADER_PARENT_MESSAGE_ID);
        Object headerValue3 = getHeaderValue(basicProperties, Constants.AMQP_META_HEADER_TRACE_ID);
        if (headerValue3 != null) {
            MDC.put(MDC_TRACE_ID, headerValue3.toString());
        }
        logger.info("Consumer {} received message: deliveryTag={}, messageId={}, parentMessageId={}, traceId={}", new Object[]{str, Long.valueOf(deliveryTag), headerValue, headerValue2, headerValue3});
        try {
            ExecutionStats executionStats = null;
            try {
                try {
                    executionStats = this.processor.processMessage(createExecutionContext(bArr, basicProperties), this.module);
                    try {
                        MDC.remove(MDC_TRACE_ID);
                    } catch (Exception e) {
                        logger.warn("Failed to remove {} from MDC", MDC_TRACE_ID, e);
                    }
                    ackOrReject(executionStats, deliveryTag);
                } catch (Exception e2) {
                    logger.error("Failed to process message for delivery tag:" + deliveryTag, e2);
                    try {
                        MDC.remove(MDC_TRACE_ID);
                    } catch (Exception e3) {
                        logger.warn("Failed to remove {} from MDC", MDC_TRACE_ID, e3);
                    }
                    ackOrReject(executionStats, deliveryTag);
                }
            } catch (Throwable th) {
                try {
                    MDC.remove(MDC_TRACE_ID);
                } catch (Exception e4) {
                    logger.warn("Failed to remove {} from MDC", MDC_TRACE_ID, e4);
                }
                ackOrReject(executionStats, deliveryTag);
                throw th;
            }
        } catch (Exception e5) {
            logger.info("Failed to parse message to process {}", Long.valueOf(deliveryTag), e5);
            getChannel().basicReject(deliveryTag, false);
        }
    }

    private ExecutionContext createExecutionContext(byte[] bArr, AMQP.BasicProperties basicProperties) {
        return new ExecutionContext(this.step, Utils.createMessage(this.cipher.decryptMessageContent(new String(bArr, Charset.forName("UTF-8")))), basicProperties);
    }

    private void ackOrReject(ExecutionStats executionStats, long j) throws IOException {
        logger.info("Execution stats: {}", executionStats);
        if (executionStats == null || executionStats.getErrorCount() > 0) {
            logger.info("Reject received messages {}", Long.valueOf(j));
            getChannel().basicReject(j, false);
        } else {
            logger.info("Acknowledging received messages {}", Long.valueOf(j));
            getChannel().basicAck(j, true);
        }
    }

    private Object getHeaderValue(AMQP.BasicProperties basicProperties, String str) {
        return basicProperties.getHeaders().getOrDefault(str, "unknown");
    }
}
