package org.eclipse.hono.client.command.kafka;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.producer.KafkaProducer;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import org.eclipse.hono.client.command.CommandResponse;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.KafkaProducerHelper;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.util.DownstreamMessageProperties;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.TenantObject;

/* loaded from: input_file:org/eclipse/hono/client/command/kafka/KafkaBasedCommandResponseSender.class */
public class KafkaBasedCommandResponseSender extends AbstractKafkaBasedMessageSender<Buffer> implements CommandResponseSender {
    public KafkaBasedCommandResponseSender(Vertx vertx, KafkaProducerFactory<String, Buffer> kafkaProducerFactory, MessagingKafkaProducerConfigProperties messagingKafkaProducerConfigProperties, Tracer tracer) {
        super(kafkaProducerFactory, "command_response", messagingKafkaProducerConfigProperties, tracer);
        NotificationEventBusSupport.registerConsumer(vertx, TenantChangeNotification.TYPE, tenantChangeNotification -> {
            if (LifecycleChange.DELETE.equals(tenantChangeNotification.getChange())) {
                kafkaProducerFactory.getProducer("command_response").ifPresent(kafkaProducer -> {
                    removeTenantTopicBasedProducerMetrics(kafkaProducer, tenantChangeNotification.getTenantId());
                });
            }
        });
    }

    private void removeTenantTopicBasedProducerMetrics(KafkaProducer<String, Buffer> kafkaProducer, String str) {
        KafkaProducerHelper.removeTopicMetrics(kafkaProducer, Stream.of(new HonoTopic(HonoTopic.Type.COMMAND_RESPONSE, str).toString()));
    }

    public Future<Void> sendCommandResponse(TenantObject tenantObject, RegistrationAssertion registrationAssertion, CommandResponse commandResponse, SpanContext spanContext) {
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(registrationAssertion);
        Objects.requireNonNull(commandResponse);
        if (this.log.isTraceEnabled()) {
            this.log.trace("publish command response [{}]", commandResponse);
        }
        String honoTopic = new HonoTopic(HonoTopic.Type.COMMAND_RESPONSE, commandResponse.getTenantId()).toString();
        Span startChildSpan = startChildSpan("forward Command response", honoTopic, commandResponse.getTenantId(), commandResponse.getDeviceId(), spanContext);
        if (commandResponse.getMessagingType() != getMessagingType()) {
            startChildSpan.log(String.format("using messaging type %s instead of type %s used for the original command", getMessagingType(), commandResponse.getMessagingType()));
        }
        return sendAndWaitForOutcome(honoTopic, commandResponse.getTenantId(), commandResponse.getDeviceId(), commandResponse.getPayload(), getHeaders(commandResponse, tenantObject, registrationAssertion), startChildSpan).onComplete(asyncResult -> {
            startChildSpan.finish();
        });
    }

    private Map<String, Object> getHeaders(CommandResponse commandResponse, TenantObject tenantObject, RegistrationAssertion registrationAssertion) {
        Map<String, Object> asMap = new DownstreamMessageProperties("command_response", tenantObject.getDefaults().getMap(), registrationAssertion.getDefaults(), commandResponse.getAdditionalProperties(), tenantObject.getResourceLimits()).asMap();
        asMap.put("correlation-id", commandResponse.getCorrelationId());
        asMap.put("tenant_id", commandResponse.getTenantId());
        asMap.put("device_id", commandResponse.getDeviceId());
        asMap.put("status", Integer.valueOf(commandResponse.getStatus()));
        if (commandResponse.getContentType() != null) {
            asMap.put("content-type", commandResponse.getContentType());
        } else if (commandResponse.getPayload() != null) {
            asMap.put("content-type", "application/octet-stream");
        }
        return asMap;
    }
}
