package org.eclipse.hono.client.telemetry.kafka;

import io.opentracing.Tracer;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.producer.KafkaProducer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.KafkaProducerHelper;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.util.DownstreamMessageProperties;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.TenantObject;

/* loaded from: input_file:org/eclipse/hono/client/telemetry/kafka/AbstractKafkaBasedDownstreamSender.class */
public abstract class AbstractKafkaBasedDownstreamSender extends AbstractKafkaBasedMessageSender {
    private final boolean isDefaultsEnabled;

    public AbstractKafkaBasedDownstreamSender(Vertx vertx, KafkaProducerFactory<String, Buffer> kafkaProducerFactory, String str, MessagingKafkaProducerConfigProperties messagingKafkaProducerConfigProperties, boolean z, Tracer tracer) {
        super(kafkaProducerFactory, str, messagingKafkaProducerConfigProperties, tracer);
        Objects.requireNonNull(vertx);
        this.isDefaultsEnabled = z;
        NotificationEventBusSupport.registerConsumer(vertx, TenantChangeNotification.TYPE, tenantChangeNotification -> {
            if (LifecycleChange.DELETE.equals(tenantChangeNotification.getChange())) {
                kafkaProducerFactory.getProducer(str).ifPresent(kafkaProducer -> {
                    removeTenantTopicBasedProducerMetrics(kafkaProducer, tenantChangeNotification.getTenantId());
                });
            }
        });
    }

    private void removeTenantTopicBasedProducerMetrics(KafkaProducer<String, Buffer> kafkaProducer, String str) {
        KafkaProducerHelper.removeTopicMetrics(kafkaProducer, Stream.of(new HonoTopic(getTopicType(), str).toString()));
    }

    protected abstract HonoTopic.Type getTopicType();

    /* JADX INFO: Access modifiers changed from: protected */
    public final Map<String, Object> addDefaults(String str, TenantObject tenantObject, RegistrationAssertion registrationAssertion, QoS qoS, String str2, Buffer buffer, Map<String, Object> map) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(registrationAssertion);
        Objects.requireNonNull(qoS);
        Map map2 = (Map) Optional.ofNullable(map).map(HashMap::new).orElseGet(HashMap::new);
        map2.put("device_id", registrationAssertion.getDeviceId());
        map2.put("qos", Integer.valueOf(qoS.ordinal()));
        Map<String, Object> asMap = new DownstreamMessageProperties(str, this.isDefaultsEnabled ? tenantObject.getDefaults().getMap() : null, this.isDefaultsEnabled ? registrationAssertion.getDefaults() : null, map2, tenantObject.getResourceLimits()).asMap();
        if (str2 != null) {
            asMap.put("content-type", str2);
        } else if (buffer != null) {
            asMap.putIfAbsent("content-type", "application/octet-stream");
        }
        return asMap;
    }
}
