package org.eclipse.hono.commandrouter.impl.kafka;

import io.opentracing.Span;
import io.opentracing.Tracer;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Objects;
import org.eclipse.hono.adapter.client.command.kafka.KafkaBasedCommand;
import org.eclipse.hono.adapter.client.command.kafka.KafkaBasedCommandContext;
import org.eclipse.hono.adapter.client.command.kafka.KafkaBasedInternalCommandSender;
import org.eclipse.hono.adapter.client.registry.TenantClient;
import org.eclipse.hono.client.impl.CommandConsumer;
import org.eclipse.hono.client.kafka.tracing.KafkaTracingHelper;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.commandrouter.impl.AbstractMappingAndDelegatingCommandHandler;

/* loaded from: input_file:org/eclipse/hono/commandrouter/impl/kafka/KafkaBasedMappingAndDelegatingCommandHandler.class */
public class KafkaBasedMappingAndDelegatingCommandHandler extends AbstractMappingAndDelegatingCommandHandler {
    private final Tracer tracer;

    public KafkaBasedMappingAndDelegatingCommandHandler(TenantClient tenantClient, CommandTargetMapper commandTargetMapper, KafkaBasedInternalCommandSender kafkaBasedInternalCommandSender, Tracer tracer) {
        super(tenantClient, commandTargetMapper, kafkaBasedInternalCommandSender);
        Objects.requireNonNull(tracer);
        this.tracer = tracer;
    }

    public void mapAndDelegateIncomingCommandMessage(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord) {
        Objects.requireNonNull(kafkaConsumerRecord);
        try {
            KafkaBasedCommand from = KafkaBasedCommand.from(kafkaConsumerRecord);
            Span createSpan = CommandConsumer.createSpan("map and delegate command", from.getTenant(), from.getDeviceId(), (String) null, this.tracer, KafkaTracingHelper.extractSpanContext(this.tracer, kafkaConsumerRecord));
            KafkaTracingHelper.setRecordTags(createSpan, kafkaConsumerRecord);
            KafkaBasedCommandContext kafkaBasedCommandContext = new KafkaBasedCommandContext(from, createSpan);
            from.logToSpan(createSpan);
            if (from.isValid()) {
                this.log.trace("received valid command record [{}]", from);
                mapAndDelegateIncomingCommand(kafkaBasedCommandContext);
            } else {
                this.log.debug("received invalid command record [{}]", from);
                kafkaBasedCommandContext.reject("malformed command message");
            }
        } catch (IllegalArgumentException e) {
            this.log.debug("command record is invalid", e);
        }
    }
}
