package tech.corefinance.kafka.common.interceptor;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.corefinance.common.context.ApplicationContextHolder;
import tech.corefinance.common.context.JwtContext;
import tech.corefinance.common.context.TenantContext;
import tech.corefinance.common.context.TraceIdContext;
import tech.corefinance.common.dto.JwtTokenDto;
import tech.corefinance.common.service.JwtService;

/* loaded from: input_file:tech/corefinance/kafka/common/interceptor/CommonKafkaInterceptor.class */
public class CommonKafkaInterceptor implements ProducerInterceptor<String, Object>, ConsumerInterceptor<String, Object> {
    private static final Logger log = LoggerFactory.getLogger(CommonKafkaInterceptor.class);

    public void applyCommonHeader(Headers headers) {
        JwtTokenDto jwt = JwtContext.getInstance().getJwt();
        String tenantId = TenantContext.getInstance().getTenantId();
        log.debug("Tenant ID [{}]", tenantId);
        checkNullAndAddHeader(headers, "x-tenant-id", tenantId);
        String traceId = TraceIdContext.getInstance().getTraceId();
        log.debug("Trace ID [{}]", tenantId);
        checkNullAndAddHeader(headers, "x-trace-id", traceId);
        if (jwt == null) {
            log.debug("Didn't found JWT token in context!");
        } else {
            log.debug("Found JWT token in context!");
            checkNullAndAddHeader(headers, "Authorization", "Bearer " + jwt.getOriginalToken()).checkNullAndAddHeader(headers, "x-device-id", jwt.getDeviceId()).checkNullAndAddHeader(headers, "x-external-ip", jwt.getLoginIpAddr());
        }
    }

    private CommonKafkaInterceptor checkNullAndAddHeader(Headers headers, String str, String str2) {
        if (str2 != null) {
            headers.add(str, str2.getBytes(StandardCharsets.UTF_8));
        }
        return this;
    }

    public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> producerRecord) {
        applyCommonHeader(producerRecord.headers());
        return producerRecord;
    }

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception exc) {
    }

    public ConsumerRecords<String, Object> onConsume(ConsumerRecords<String, Object> consumerRecords) {
        JwtService jwtService = (JwtService) ApplicationContextHolder.getInstance().getApplicationContext().getBean(JwtService.class);
        consumerRecords.forEach(consumerRecord -> {
            log.debug("Received Message from topic [{}], key [{}], value [{}]", new Object[]{consumerRecord.topic(), consumerRecord.key(), consumerRecord.value()});
            Headers headers = consumerRecord.headers();
            String extractLastStringHeaderValue = extractLastStringHeaderValue(headers, "x-tenant-id");
            log.debug("Tenant ID from message header [{}]", extractLastStringHeaderValue);
            TenantContext.getInstance().setTenantId(extractLastStringHeaderValue);
            String extractLastStringHeaderValue2 = extractLastStringHeaderValue(headers, "x-tenant-id");
            log.debug("Trace ID from message header [{}]", extractLastStringHeaderValue2);
            TraceIdContext.getInstance().setTraceId(extractLastStringHeaderValue2);
            String extractLastStringHeaderValue3 = extractLastStringHeaderValue(headers, "Authorization");
            String extractLastStringHeaderValue4 = extractLastStringHeaderValue(headers, "x-external-ip");
            String extractLastStringHeaderValue5 = extractLastStringHeaderValue(headers, "x-device-id");
            log.debug("Toekn from message header [{}]", extractLastStringHeaderValue3);
            if (extractLastStringHeaderValue3 != null) {
                try {
                    JwtContext.getInstance().setJwt(jwtService.decodeToken(extractLastStringHeaderValue3.substring("Bearer ".length()).trim(), extractLastStringHeaderValue5, extractLastStringHeaderValue4));
                } catch (JsonProcessingException e) {
                    log.error("Error when extract JWT from kafka message. {}", e.getMessage(), e);
                }
            }
        });
        return consumerRecords;
    }

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    public void close() {
    }

    public void configure(Map<String, ?> map) {
    }

    private String extractLastStringHeaderValue(Headers headers, String str) {
        byte[] value;
        Header lastHeader = headers.lastHeader(str);
        if (lastHeader == null || (value = lastHeader.value()) == null) {
            return null;
        }
        return new String(value, StandardCharsets.UTF_8);
    }
}
