package org.eclipse.hono.client.telemetry.pubsub;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.hono.client.pubsub.AbstractPubSubBasedMessageSender;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory;
import org.eclipse.hono.client.telemetry.EventSender;
import org.eclipse.hono.client.telemetry.TelemetrySender;
import org.eclipse.hono.client.util.DownstreamMessageProperties;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.TenantObject;

/* loaded from: input_file:org/eclipse/hono/client/telemetry/pubsub/PubSubBasedDownstreamSender.class */
public final class PubSubBasedDownstreamSender extends AbstractPubSubBasedMessageSender implements TelemetrySender, EventSender {
    private static final String PUBSUB_DOWNSTREAM_PROPERTY_TENANT_ID = "tenantId";
    private static final String PUBSUB_DOWNSTREAM_PROPERTY_DEVICE_ID = "deviceId";
    private static final String PUBSUB_DOWNSTREAM_PROPERTY_DEVICE_REGISTRY_ID = "deviceRegistryId";
    private static final String PUBSUB_PROPERTY_SUBFOLDER = "subFolder";
    private final boolean isDefaultsEnabled;

    public PubSubBasedDownstreamSender(Vertx vertx, PubSubPublisherFactory pubSubPublisherFactory, String str, String str2, boolean z, Tracer tracer) {
        super(pubSubPublisherFactory, str, str2, tracer);
        Objects.requireNonNull(vertx);
        this.isDefaultsEnabled = z;
        NotificationEventBusSupport.registerConsumer(vertx, TenantChangeNotification.TYPE, tenantChangeNotification -> {
            if (LifecycleChange.DELETE.equals(tenantChangeNotification.getChange())) {
                pubSubPublisherFactory.getPublisher(str, tenantChangeNotification.getTenantId()).ifPresent(pubSubPublisherClient -> {
                    pubSubPublisherFactory.closePublisher(str, tenantChangeNotification.getTenantId());
                });
            }
        });
    }

    public Future<Void> sendEvent(TenantObject tenantObject, RegistrationAssertion registrationAssertion, String str, Buffer buffer, Map<String, Object> map, SpanContext spanContext) {
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(registrationAssertion);
        if (this.log.isTraceEnabled()) {
            this.log.trace("sending event data [tenantId: {}, deviceId: {}, contentType: {}, properties: {}]", new Object[]{tenantObject.getTenantId(), registrationAssertion.getDeviceId(), str, map});
        }
        String tenantId = tenantObject.getTenantId();
        String deviceId = registrationAssertion.getDeviceId();
        Optional ofNullable = Optional.ofNullable(map.get("orig_address"));
        Class<String> cls = String.class;
        Objects.requireNonNull(String.class);
        Optional filter = ofNullable.filter(cls::isInstance);
        Class<String> cls2 = String.class;
        Objects.requireNonNull(String.class);
        List subtopics = PubSubMessageHelper.getSubtopics((String) filter.map(cls2::cast).orElse(""));
        Map<String, Object> addDefaults = addDefaults("event", tenantObject, registrationAssertion, QoS.AT_LEAST_ONCE, str, map, PubSubMessageHelper.getSubFolder(subtopics));
        Span startSpan = startSpan("forward event", tenantId, deviceId, "child_of", spanContext);
        return sendAndWaitForOutcome(PubSubMessageHelper.getTopicName("event", tenantId, subtopics), tenantId, deviceId, buffer, addDefaults, startSpan).onComplete(asyncResult -> {
            startSpan.finish();
        });
    }

    public Future<Void> sendTelemetry(TenantObject tenantObject, RegistrationAssertion registrationAssertion, QoS qoS, String str, Buffer buffer, Map<String, Object> map, SpanContext spanContext) {
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(registrationAssertion);
        Objects.requireNonNull(qoS);
        if (this.log.isTraceEnabled()) {
            this.log.trace("sending telemetry data [tenantId: {}, deviceId: {}, qos: {}, contentType: {}, properties: {}]", new Object[]{tenantObject.getTenantId(), registrationAssertion.getDeviceId(), qoS, str, map});
        }
        String tenantId = tenantObject.getTenantId();
        String deviceId = registrationAssertion.getDeviceId();
        Optional ofNullable = Optional.ofNullable(map.get("orig_address"));
        Class<String> cls = String.class;
        Objects.requireNonNull(String.class);
        Optional filter = ofNullable.filter(cls::isInstance);
        Class<String> cls2 = String.class;
        Objects.requireNonNull(String.class);
        List subtopics = PubSubMessageHelper.getSubtopics((String) filter.map(cls2::cast).orElse(""));
        Map<String, Object> addDefaults = addDefaults("telemetry", tenantObject, registrationAssertion, qoS, str, map, PubSubMessageHelper.getSubFolder(subtopics));
        Span startSpan = startSpan("forward telemetry", tenantId, deviceId, qoS == QoS.AT_MOST_ONCE ? "follows_from" : "child_of", spanContext);
        return qoS == QoS.AT_MOST_ONCE ? Future.succeededFuture() : sendAndWaitForOutcome(PubSubMessageHelper.getTopicName("telemetry", tenantId, subtopics), tenantId, deviceId, buffer, addDefaults, startSpan).onComplete(asyncResult -> {
            startSpan.finish();
        });
    }

    private Map<String, Object> addDefaults(String str, TenantObject tenantObject, RegistrationAssertion registrationAssertion, QoS qoS, String str2, Map<String, Object> map, String str3) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(registrationAssertion);
        Objects.requireNonNull(qoS);
        Map map2 = (Map) Optional.ofNullable(map).map(HashMap::new).orElseGet(HashMap::new);
        map2.put(PUBSUB_DOWNSTREAM_PROPERTY_DEVICE_ID, registrationAssertion.getDeviceId());
        map2.put("qos", Integer.valueOf(qoS.ordinal()));
        map2.put(PUBSUB_DOWNSTREAM_PROPERTY_TENANT_ID, tenantObject.getTenantId());
        map2.put(PUBSUB_DOWNSTREAM_PROPERTY_DEVICE_REGISTRY_ID, tenantObject.getTenantId());
        map2.put("projectId", this.projectId);
        map2.put(PUBSUB_PROPERTY_SUBFOLDER, str3);
        Optional.ofNullable(str2).ifPresent(str4 -> {
            map2.put("content-type", str4);
        });
        return new DownstreamMessageProperties(str, this.isDefaultsEnabled ? tenantObject.getDefaults().getMap() : null, this.isDefaultsEnabled ? registrationAssertion.getDefaults() : null, map2, tenantObject.getResourceLimits()).asMap();
    }
}
