package org.eclipse.ditto.services.connectivity.messaging;

import akka.actor.ActorSystem;
import akka.event.DiagnosticLoggingAdapter;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import kamon.Kamon;
import kamon.trace.Segment;
import kamon.trace.TraceContext;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.DittoHeadersBuilder;
import org.eclipse.ditto.model.connectivity.ExternalMessage;
import org.eclipse.ditto.model.connectivity.MappingContext;
import org.eclipse.ditto.model.connectivity.MessageMappingFailedException;
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.DittoProtocolAdapter;
import org.eclipse.ditto.services.connectivity.mapping.DefaultMessageMapperFactory;
import org.eclipse.ditto.services.connectivity.mapping.DittoMessageMapper;
import org.eclipse.ditto.services.connectivity.mapping.MessageMapper;
import org.eclipse.ditto.services.connectivity.mapping.MessageMapperRegistry;
import org.eclipse.ditto.services.connectivity.mapping.MessageMappers;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;
import scala.Option;

/* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/MessageMappingProcessor.class */
public final class MessageMappingProcessor {
    private static final String INBOUND_MAPPING_TRACE_SUFFIX = ".inbound";
    private static final String OUTBOUND_MAPPING_TRACE_SUFFIX = ".outbound";
    private static final String SEGMENT_CATEGORY = "payload-mapping";
    private static final String MAPPING_SEGMENT_NAME = "mapping";
    private static final String PROTOCOL_SEGMENT_NAME = "protocol";
    private static final DittoProtocolAdapter PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
    private final String connectionId;
    private final MessageMapperRegistry registry;
    private final DiagnosticLoggingAdapter log;

    private MessageMappingProcessor(String str, MessageMapperRegistry messageMapperRegistry, DiagnosticLoggingAdapter diagnosticLoggingAdapter) {
        this.connectionId = str;
        this.registry = messageMapperRegistry;
        this.log = diagnosticLoggingAdapter;
    }

    public static MessageMappingProcessor of(String str, @Nullable MappingContext mappingContext, ActorSystem actorSystem, DiagnosticLoggingAdapter diagnosticLoggingAdapter) {
        return new MessageMappingProcessor(str, DefaultMessageMapperFactory.of(actorSystem, MessageMappers.class, diagnosticLoggingAdapter).registryOf(DittoMessageMapper.CONTEXT, mappingContext), diagnosticLoggingAdapter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageMapperRegistry getRegistry() {
        return this.registry;
    }

    public Optional<Signal<?>> process(ExternalMessage externalMessage) {
        String str = (String) DittoHeaders.of(externalMessage.getHeaders()).getCorrelationId().orElse("no-correlation-id");
        return (Optional) doApplyTraced(() -> {
            return createProcessingContext(this.connectionId + INBOUND_MAPPING_TRACE_SUFFIX, str);
        }, traceContext -> {
            return convertMessage(externalMessage, traceContext);
        });
    }

    public Optional<ExternalMessage> process(Signal<?> signal) {
        String str = (String) signal.getDittoHeaders().getCorrelationId().orElse("no-correlation-id");
        return (Optional) doApplyTraced(() -> {
            return createProcessingContext(this.connectionId + OUTBOUND_MAPPING_TRACE_SUFFIX, str);
        }, traceContext -> {
            return convertToExternalMessage(() -> {
                return PROTOCOL_ADAPTER.toAdaptable(signal);
            }, traceContext);
        });
    }

    private Optional<Signal<?>> convertMessage(ExternalMessage externalMessage, TraceContext traceContext) {
        ConditionChecker.checkNotNull(externalMessage);
        ConditionChecker.checkNotNull(traceContext);
        try {
            return ((Optional) doApplyTracedSegment(() -> {
                return createSegment(traceContext, MAPPING_SEGMENT_NAME);
            }, () -> {
                return getMapper(externalMessage).map(externalMessage);
            })).map(adaptable -> {
                doUpdateCorrelationId(adaptable);
                return (Signal) doApplyTracedSegment(() -> {
                    return createSegment(traceContext, PROTOCOL_SEGMENT_NAME);
                }, () -> {
                    Signal fromAdaptable = PROTOCOL_ADAPTER.fromAdaptable(adaptable);
                    DittoHeadersBuilder newBuilder = DittoHeaders.newBuilder(externalMessage.getHeaders());
                    newBuilder.putHeaders(fromAdaptable.getDittoHeaders());
                    return fromAdaptable.setDittoHeaders(newBuilder.build());
                });
            });
        } catch (Exception e) {
            throw MessageMappingFailedException.newBuilder((String) externalMessage.findContentType().orElse("")).description("Could not map ExternalMessage due to unknown problem: " + e.getClass().getSimpleName() + " " + e.getMessage()).dittoHeaders(DittoHeaders.of(externalMessage.getHeaders())).cause(e).build();
        } catch (DittoRuntimeException e2) {
            throw e2;
        }
    }

    private Optional<ExternalMessage> convertToExternalMessage(Supplier<Adaptable> supplier, TraceContext traceContext) {
        ConditionChecker.checkNotNull(supplier);
        ConditionChecker.checkNotNull(traceContext);
        try {
            Adaptable adaptable = (Adaptable) doApplyTracedSegment(() -> {
                return createSegment(traceContext, PROTOCOL_SEGMENT_NAME);
            }, supplier);
            doUpdateCorrelationId(adaptable);
            return (Optional) doApplyTracedSegment(() -> {
                return createSegment(traceContext, MAPPING_SEGMENT_NAME);
            }, () -> {
                return getMapper(adaptable).map(adaptable);
            });
        } catch (DittoRuntimeException e) {
            throw e;
        } catch (Exception e2) {
            Optional headers = supplier.get().getHeaders();
            throw MessageMappingFailedException.newBuilder((String) headers.map(dittoHeaders -> {
                return (String) dittoHeaders.get(ExternalMessage.CONTENT_TYPE_HEADER);
            }).orElse("")).description("Could not map Adaptable due to unknown problem: " + e2.getMessage()).dittoHeaders((DittoHeaders) headers.orElseGet(DittoHeaders::empty)).cause(e2).build();
        }
    }

    private MessageMapper getMapper(ExternalMessage externalMessage) {
        LogUtil.enhanceLogWithCorrelationId(this.log, (String) externalMessage.getHeaders().get("correlation-id"));
        LogUtil.enhanceLogWithCustomField(this.log, BaseClientData.MDC_CONNECTION_ID, this.connectionId);
        Optional findContentType = externalMessage.findContentType();
        if (findContentType.isPresent()) {
            String str = (String) findContentType.get();
            Optional contentType = this.registry.getDefaultMapper().getContentType();
            str.getClass();
            if (contentType.filter((v1) -> {
                return r1.equals(v1);
            }).isPresent()) {
                this.log.info("Selected Default MessageMapper for mapping ExternalMessage as content-type matched <{}>", str);
                return this.registry.getDefaultMapper();
            }
        }
        return (MessageMapper) this.registry.getMapper().orElseGet(() -> {
            this.log.debug("Falling back to Default MessageMapper for mapping ExternalMessage as no MessageMapper was present: {}", externalMessage);
            return this.registry.getDefaultMapper();
        });
    }

    private MessageMapper getMapper(Adaptable adaptable) {
        doUpdateCorrelationId(adaptable);
        return (MessageMapper) this.registry.getMapper().orElseGet(() -> {
            this.log.debug("Falling back to Default MessageMapper for mapping Adaptable as no MessageMapper was present: {}", adaptable);
            return this.registry.getDefaultMapper();
        });
    }

    private void doUpdateCorrelationId(Adaptable adaptable) {
        adaptable.getHeaders().flatMap((v0) -> {
            return v0.getCorrelationId();
        }).ifPresent(str -> {
            LogUtil.enhanceLogWithCorrelationId(this.log, str);
        });
        LogUtil.enhanceLogWithCustomField(this.log, BaseClientData.MDC_CONNECTION_ID, this.connectionId);
    }

    private Segment createSegment(TraceContext traceContext, String str) {
        return traceContext.startSegment(str, SEGMENT_CATEGORY, SEGMENT_CATEGORY);
    }

    private static TraceContext createProcessingContext(String str, @Nullable String str2) {
        ConditionChecker.checkNotEmpty(str, "name");
        return (Objects.isNull(str2) || str2.isEmpty()) ? Kamon.tracer().newContext(str) : Kamon.tracer().newContext(str, Option.apply(str2));
    }

    private static <T> T doApplyTraced(Supplier<TraceContext> supplier, Function<TraceContext, T> function) {
        TraceContext traceContext = supplier.get();
        try {
            T apply = function.apply(traceContext);
            traceContext.finish();
            return apply;
        } catch (Exception e) {
            traceContext.finishWithError(e);
            throw e;
        }
    }

    private static <T> T doApplyTracedSegment(Supplier<Segment> supplier, Supplier<T> supplier2) {
        Segment segment = supplier.get();
        try {
            T t = supplier2.get();
            segment.finish();
            return t;
        } catch (Exception e) {
            segment.finishWithError(e);
            throw e;
        }
    }
}
