package org.eclipse.hono.client.command.kafka;

import io.opentracing.Span;
import io.opentracing.Tracer;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.producer.KafkaHeader;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.InternalCommandSender;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;

/* loaded from: input_file:org/eclipse/hono/client/command/kafka/KafkaBasedInternalCommandSender.class */
public class KafkaBasedInternalCommandSender extends AbstractKafkaBasedMessageSender<Buffer> implements InternalCommandSender {
    public KafkaBasedInternalCommandSender(KafkaProducerFactory<String, Buffer> kafkaProducerFactory, MessagingKafkaProducerConfigProperties messagingKafkaProducerConfigProperties, Tracer tracer) {
        super(kafkaProducerFactory, "internal-cmd-sender", messagingKafkaProducerConfigProperties, tracer);
    }

    public Future<Void> sendCommand(CommandContext commandContext, String str) {
        Objects.requireNonNull(commandContext);
        Objects.requireNonNull(str);
        Command command = commandContext.getCommand();
        if (command instanceof KafkaBasedCommand) {
            String internalCommandTopic = getInternalCommandTopic(str);
            Span startChildSpan = startChildSpan("delegate Command request", internalCommandTopic, command.getTenant(), command.getDeviceId(), commandContext.getTracingContext());
            return sendAndWaitForOutcome(internalCommandTopic, command.getTenant(), command.getDeviceId(), command.getPayload(), getHeaders((KafkaBasedCommand) command), startChildSpan).onSuccess(r3 -> {
                commandContext.accept();
            }).onFailure(th -> {
                commandContext.release(new ServerErrorException(command.getTenant(), 503, "failed to publish command message on internal command topic", th));
            }).onComplete(asyncResult -> {
                startChildSpan.finish();
            });
        }
        commandContext.release();
        this.log.error("command is not an instance of KafkaBasedCommand");
        throw new IllegalArgumentException("command is not an instance of KafkaBasedCommand");
    }

    private static String getInternalCommandTopic(String str) {
        return new HonoTopic(HonoTopic.Type.COMMAND_INTERNAL, str).toString();
    }

    private static List<KafkaHeader> getHeaders(KafkaBasedCommand kafkaBasedCommand) {
        ArrayList arrayList = new ArrayList(kafkaBasedCommand.getRecord().headers());
        arrayList.add(KafkaRecordHelper.createTenantIdHeader(kafkaBasedCommand.getTenant()));
        Optional.ofNullable(kafkaBasedCommand.getGatewayId()).ifPresent(str -> {
            arrayList.add(KafkaRecordHelper.createViaHeader(str));
        });
        arrayList.add(KafkaRecordHelper.createOriginalPartitionHeader(kafkaBasedCommand.getRecord().partition()));
        arrayList.add(KafkaRecordHelper.createOriginalOffsetHeader(kafkaBasedCommand.getRecord().offset()));
        return arrayList;
    }
}
