package org.eclipse.hono.client.telemetry.kafka;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import java.util.Map;
import java.util.Objects;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.telemetry.TelemetrySender;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.TenantObject;

/* loaded from: input_file:org/eclipse/hono/client/telemetry/kafka/KafkaBasedTelemetrySender.class */
public class KafkaBasedTelemetrySender extends AbstractKafkaBasedDownstreamSender implements TelemetrySender {
    public KafkaBasedTelemetrySender(Vertx vertx, KafkaProducerFactory<String, Buffer> kafkaProducerFactory, MessagingKafkaProducerConfigProperties messagingKafkaProducerConfigProperties, boolean z, Tracer tracer) {
        super(vertx, kafkaProducerFactory, "telemetry", messagingKafkaProducerConfigProperties, z, tracer);
    }

    @Override // org.eclipse.hono.client.telemetry.kafka.AbstractKafkaBasedDownstreamSender
    protected HonoTopic.Type getTopicType() {
        return HonoTopic.Type.TELEMETRY;
    }

    public Future<Void> sendTelemetry(TenantObject tenantObject, RegistrationAssertion registrationAssertion, QoS qoS, String str, Buffer buffer, Map<String, Object> map, SpanContext spanContext) {
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(registrationAssertion);
        Objects.requireNonNull(qoS);
        if (this.log.isTraceEnabled()) {
            this.log.trace("send telemetry data [tenantId: {}, deviceId: {}, qos: {}, contentType: {}, properties: {}]", new Object[]{tenantObject.getTenantId(), registrationAssertion.getDeviceId(), qoS, str, map});
        }
        HonoTopic honoTopic = new HonoTopic(HonoTopic.Type.TELEMETRY, tenantObject.getTenantId());
        Map<String, Object> addDefaults = addDefaults(honoTopic.getType().endpoint, tenantObject, registrationAssertion, qoS, str, buffer, map);
        Span startSpan = startSpan("forward Telemetry", honoTopic.toString(), tenantObject.getTenantId(), registrationAssertion.getDeviceId(), qoS == QoS.AT_MOST_ONCE ? "follows_from" : "child_of", spanContext);
        return qoS == QoS.AT_MOST_ONCE ? Future.succeededFuture() : sendAndWaitForOutcome(honoTopic.toString(), tenantObject.getTenantId(), registrationAssertion.getDeviceId(), buffer, addDefaults, startSpan).onComplete(asyncResult -> {
            startSpan.finish();
        });
    }

    public String toString() {
        return KafkaBasedTelemetrySender.class.getName() + " via Kafka";
    }
}
