package org.eclipse.hono.adapter.mqtt;

import io.micrometer.core.instrument.Timer;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubscribeMessage;
import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.adapter.mqtt.MqttProtocolAdapterProperties;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.Command;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.CommandResponse;
import org.eclipse.hono.client.DownstreamSender;
import org.eclipse.hono.client.ProtocolAdapterCommandConsumer;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.AdapterConnectionsExceededException;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.service.AuthorizationException;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.auth.device.AuthHandler;
import org.eclipse.hono.service.auth.device.ChainAuthHandler;
import org.eclipse.hono.service.auth.device.TenantServiceBasedX509Authentication;
import org.eclipse.hono.service.auth.device.UsernamePasswordAuthProvider;
import org.eclipse.hono.service.auth.device.UsernamePasswordCredentials;
import org.eclipse.hono.service.auth.device.X509AuthProvider;
import org.eclipse.hono.service.limiting.ConnectionLimitManager;
import org.eclipse.hono.service.limiting.DefaultConnectionLimitManager;
import org.eclipse.hono.service.limiting.MemoryBasedConnectionLimitStrategy;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.tracing.TenantTraceSamplingHelper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.ExecutionContextTenantAndAuthIdProvider;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.TenantObject;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.class */
public abstract class AbstractVertxBasedMqttProtocolAdapter<T extends MqttProtocolAdapterProperties> extends AbstractProtocolAdapterBase<T> {
    protected static final int MINIMAL_MEMORY = 100000000;
    protected static final int MEMORY_PER_CONNECTION = 20000;
    private static final int IANA_MQTT_PORT = 1883;
    private static final int IANA_SECURE_MQTT_PORT = 8883;
    private static final String KEY_TOPIC_FILTER = "filter";
    private MqttAdapterMetrics metrics = MqttAdapterMetrics.NOOP;
    private MqttServer server;
    private MqttServer insecureServer;
    private AuthHandler<MqttContext> authHandler;
    private ExecutionContextTenantAndAuthIdProvider<MqttContext> tenantObjectWithAuthIdProvider;

    /* renamed from: org.eclipse.hono.adapter.mqtt.AbstractVertxBasedMqttProtocolAdapter$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType = new int[MetricsTags.EndpointType.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.TELEMETRY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.EVENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.COMMAND.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public final void setAuthHandler(AuthHandler<MqttContext> authHandler) {
        this.authHandler = (AuthHandler) Objects.requireNonNull(authHandler);
    }

    public void setTenantObjectWithAuthIdProvider(ExecutionContextTenantAndAuthIdProvider<MqttContext> executionContextTenantAndAuthIdProvider) {
        this.tenantObjectWithAuthIdProvider = (ExecutionContextTenantAndAuthIdProvider) Objects.requireNonNull(executionContextTenantAndAuthIdProvider);
    }

    public int getPortDefaultValue() {
        return IANA_SECURE_MQTT_PORT;
    }

    public int getInsecurePortDefaultValue() {
        return IANA_MQTT_PORT;
    }

    protected final int getActualPort() {
        if (this.server != null) {
            return this.server.actualPort();
        }
        return -1;
    }

    protected final int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }

    @Autowired
    public final void setMetrics(MqttAdapterMetrics mqttAdapterMetrics) {
        this.metrics = mqttAdapterMetrics;
    }

    protected final MqttAdapterMetrics getMetrics() {
        return this.metrics;
    }

    protected AuthHandler<MqttContext> createAuthHandler() {
        return new ChainAuthHandler().append(new X509AuthHandler(new TenantServiceBasedX509Authentication(getTenantClientFactory(), this.tracer), new X509AuthProvider(getCredentialsClientFactory(), (ServiceConfigProperties) getConfig(), this.tracer))).append(new ConnectPacketAuthHandler(new UsernamePasswordAuthProvider(getCredentialsClientFactory(), (ServiceConfigProperties) getConfig(), this.tracer)));
    }

    protected ExecutionContextTenantAndAuthIdProvider<MqttContext> createTenantAndAuthIdProvider() {
        return new MqttContextTenantAndAuthIdProvider((ProtocolAdapterProperties) getConfig(), getTenantClientFactory());
    }

    public void setMqttSecureServer(MqttServer mqttServer) {
        Objects.requireNonNull(mqttServer);
        if (mqttServer.actualPort() > 0) {
            throw new IllegalArgumentException("MQTT server must not be started already");
        }
        this.server = mqttServer;
    }

    public void setMqttInsecureServer(MqttServer mqttServer) {
        Objects.requireNonNull(mqttServer);
        if (mqttServer.actualPort() > 0) {
            throw new IllegalArgumentException("MQTT server must not be started already");
        }
        this.insecureServer = mqttServer;
    }

    private Future<Void> bindSecureMqttServer() {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        MqttServerOptions mqttServerOptions = new MqttServerOptions();
        mqttServerOptions.setHost(((MqttProtocolAdapterProperties) getConfig()).getBindAddress()).setPort(determineSecurePort()).setMaxMessageSize(((MqttProtocolAdapterProperties) getConfig()).getMaxPayloadSize());
        addTlsKeyCertOptions(mqttServerOptions);
        addTlsTrustOptions(mqttServerOptions);
        return bindMqttServer(mqttServerOptions, this.server).map(mqttServer -> {
            this.server = mqttServer;
            return (Void) null;
        }).recover(th -> {
            return Future.failedFuture(th);
        });
    }

    private Future<Void> bindInsecureMqttServer() {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        MqttServerOptions mqttServerOptions = new MqttServerOptions();
        mqttServerOptions.setHost(((MqttProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress()).setPort(determineInsecurePort()).setMaxMessageSize(((MqttProtocolAdapterProperties) getConfig()).getMaxPayloadSize());
        return bindMqttServer(mqttServerOptions, this.insecureServer).map(mqttServer -> {
            this.insecureServer = mqttServer;
            return (Void) null;
        }).recover(th -> {
            return Future.failedFuture(th);
        });
    }

    private Future<MqttServer> bindMqttServer(MqttServerOptions mqttServerOptions, MqttServer mqttServer) {
        Promise promise = Promise.promise();
        MqttServer create = mqttServer == null ? MqttServer.create(this.vertx, mqttServerOptions) : mqttServer;
        create.endpointHandler(this::handleEndpointConnection).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                this.log.info("MQTT server running on {}:{}", ((MqttProtocolAdapterProperties) getConfig()).getBindAddress(), Integer.valueOf(create.actualPort()));
                promise.complete(create);
            } else {
                this.log.error("error while starting up MQTT server", asyncResult.cause());
                promise.fail(asyncResult.cause());
            }
        });
        return promise.future();
    }

    protected final void doStart(Promise<Void> promise) {
        this.log.info("limiting size of inbound message payload to {} bytes", Integer.valueOf(((MqttProtocolAdapterProperties) getConfig()).getMaxPayloadSize()));
        if (!((MqttProtocolAdapterProperties) getConfig()).isAuthenticationRequired()) {
            this.log.warn("authentication of devices turned off");
        }
        setConnectionLimitManager((ConnectionLimitManager) Optional.ofNullable(getConnectionLimitManager()).orElse(createConnectionLimitManager()));
        checkPortConfiguration().compose(r4 -> {
            return CompositeFuture.all(bindSecureMqttServer(), bindInsecureMqttServer());
        }).compose(compositeFuture -> {
            if (this.authHandler == null) {
                this.authHandler = createAuthHandler();
            }
            if (this.tenantObjectWithAuthIdProvider == null) {
                this.tenantObjectWithAuthIdProvider = createTenantAndAuthIdProvider();
            }
            return Future.succeededFuture((Void) null);
        }).onComplete(promise);
    }

    private ConnectionLimitManager createConnectionLimitManager() {
        return new DefaultConnectionLimitManager(new MemoryBasedConnectionLimitStrategy(100000000L, 20000L), () -> {
            return Integer.valueOf(this.metrics.getNumberOfConnections());
        }, (ProtocolAdapterProperties) getConfig());
    }

    protected final void doStop(Promise<Void> promise) {
        Promise promise2 = Promise.promise();
        if (this.server != null) {
            this.server.close(promise2);
        } else {
            promise2.complete();
        }
        Promise promise3 = Promise.promise();
        if (this.insecureServer != null) {
            this.insecureServer.close(promise3);
        } else {
            promise3.complete();
        }
        CompositeFuture.all(promise2.future(), promise3.future()).map(compositeFuture -> {
            return (Void) null;
        }).onComplete(promise);
    }

    final void handleEndpointConnection(MqttEndpoint mqttEndpoint) {
        this.log.debug("connection request from client [client-id: {}]", mqttEndpoint.clientIdentifier());
        Span start = this.tracer.buildSpan("CONNECT").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).withTag(TracingHelper.TAG_CLIENT_ID.getKey(), mqttEndpoint.clientIdentifier()).start();
        if (!mqttEndpoint.isCleanSession()) {
            start.log("ignoring client's intent to resume existing session");
        }
        if (mqttEndpoint.will() != null) {
            start.log("ignoring client's last will");
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        mqttEndpoint.closeHandler(r8 -> {
            this.log.debug("client closed connection before CONNACK got sent to client [client-id: {}]", mqttEndpoint.clientIdentifier());
            TracingHelper.logError(start, "client closed connection");
            atomicBoolean.set(true);
        });
        isConnected().compose(r7 -> {
            return handleConnectionRequest(mqttEndpoint, start);
        }).compose(device -> {
            return handleConnectionRequestResult(mqttEndpoint, device, atomicBoolean, start);
        }).onSuccess(device2 -> {
            mqttEndpoint.accept(false);
            start.log("connection accepted");
            this.metrics.reportConnectionAttempt(MetricsTags.ConnectionAttemptOutcome.SUCCEEDED, (String) Optional.ofNullable(device2).map(device2 -> {
                return device2.getTenantId();
            }).orElse(null));
        }).onFailure(th -> {
            this.log.debug("rejecting connection request from client [clientId: {}], cause:", mqttEndpoint.clientIdentifier(), th);
            rejectConnectionRequest(mqttEndpoint, getConnectReturnCode(th), start);
            TracingHelper.logError(start, th);
            reportFailedConnectionAttempt(th);
        }).onComplete(asyncResult -> {
            start.finish();
        });
    }

    private void reportFailedConnectionAttempt(Throwable th) {
        this.metrics.reportConnectionAttempt(AbstractProtocolAdapterBase.getOutcome(th), th instanceof ServiceInvocationException ? ((ServiceInvocationException) th).getTenant() : null);
    }

    private Future<Device> handleConnectionRequest(MqttEndpoint mqttEndpoint, Span span) {
        if (getConnectionLimitManager() == null || !getConnectionLimitManager().isLimitExceeded()) {
            return ((MqttProtocolAdapterProperties) getConfig()).isAuthenticationRequired() ? handleEndpointConnectionWithAuthentication(mqttEndpoint, span) : handleEndpointConnectionWithoutAuthentication(mqttEndpoint);
        }
        span.log("adapter's connection limit exceeded");
        return Future.failedFuture(new AdapterConnectionsExceededException((String) null, "adapter's connection limit is exceeded", (Throwable) null));
    }

    private Future<Device> handleConnectionRequestResult(MqttEndpoint mqttEndpoint, Device device, AtomicBoolean atomicBoolean, Span span) {
        TracingHelper.TAG_AUTHENTICATED.set(span, Boolean.valueOf(device != null));
        if (device != null) {
            TracingHelper.setDeviceTags(span, device.getTenantId(), device.getDeviceId());
        }
        Promise promise = Promise.promise();
        if (atomicBoolean.get()) {
            this.log.debug("abort handling connection request, connection already closed [clientId: {}]", mqttEndpoint.clientIdentifier());
            span.log("abort connection request processing, connection already closed");
            promise.fail(new IllegalStateException("connection already closed"));
        } else {
            sendConnectedEvent(mqttEndpoint.clientIdentifier(), device).map(device).recover(th -> {
                this.log.warn("failed to send connection event for client [clientId: {}]", mqttEndpoint.clientIdentifier(), th);
                return Future.failedFuture(new ServerErrorException(503, "failed to send connection event", th));
            }).onComplete(promise);
        }
        return promise.future();
    }

    private void rejectConnectionRequest(MqttEndpoint mqttEndpoint, MqttConnectReturnCode mqttConnectReturnCode, Span span) {
        try {
            mqttEndpoint.reject(mqttConnectReturnCode);
            span.log("connection request rejected");
        } catch (IllegalStateException e) {
            if ("MQTT endpoint is closed".equals(e.getMessage())) {
                this.log.debug("skipped rejecting connection request, connection already closed [clientId: {}]", mqttEndpoint.clientIdentifier());
                span.log("skipped rejecting connection request, connection already closed");
            } else {
                this.log.debug("could not reject connection request from client [clientId: {}]: {}", mqttEndpoint.clientIdentifier(), e.toString());
                TracingHelper.logError(span, "could not reject connection request from client", e);
            }
        }
    }

    private Future<Device> handleEndpointConnectionWithoutAuthentication(MqttEndpoint mqttEndpoint) {
        registerHandlers(mqttEndpoint, null, OptionalInt.empty());
        this.log.debug("unauthenticated device [clientId: {}] connected", mqttEndpoint.clientIdentifier());
        return Future.succeededFuture((Object) null);
    }

    private Future<Device> handleEndpointConnectionWithAuthentication(MqttEndpoint mqttEndpoint, Span span) {
        MqttContext fromConnectPacket = MqttContext.fromConnectPacket(mqttEndpoint);
        Future<OptionalInt> applyTenantTraceSamplingPriority = applyTenantTraceSamplingPriority(fromConnectPacket, span);
        Future compose = applyTenantTraceSamplingPriority.compose(optionalInt -> {
            fromConnectPacket.setTracingContext(span.context());
            return authenticate(fromConnectPacket);
        });
        return compose.compose(deviceUser -> {
            return CompositeFuture.all(getTenantConfiguration(deviceUser.getTenantId(), span.context()).compose(tenantObject -> {
                return CompositeFuture.all(isAdapterEnabled(tenantObject).recover(th -> {
                    return Future.failedFuture(new AdapterDisabledException(deviceUser.getTenantId(), "adapter is disabled for tenant", th));
                }), checkConnectionLimit(tenantObject, span.context()));
            }), checkDeviceRegistration(deviceUser, span.context())).map(deviceUser);
        }).compose(deviceUser2 -> {
            return createLinks(deviceUser2, span);
        }).compose(device -> {
            return registerHandlers(mqttEndpoint, device, (OptionalInt) applyTenantTraceSamplingPriority.result());
        }).recover(th -> {
            if (compose.failed()) {
                this.log.debug("could not authenticate device", th);
            } else {
                this.log.debug("cannot establish connection with device [tenant-id: {}, device-id: {}]", new Object[]{((DeviceUser) compose.result()).getTenantId(), ((DeviceUser) compose.result()).getDeviceId(), th});
            }
            return Future.failedFuture(th);
        });
    }

    protected final Future<OptionalInt> applyTenantTraceSamplingPriority(MqttContext mqttContext, Span span) {
        Objects.requireNonNull(mqttContext);
        Objects.requireNonNull(span);
        return this.tenantObjectWithAuthIdProvider.get(mqttContext, span.context()).map(tenantObjectWithAuthId -> {
            return TenantTraceSamplingHelper.applyTraceSamplingPriority(tenantObjectWithAuthId, span);
        }).recover(th -> {
            return Future.succeededFuture(OptionalInt.empty());
        });
    }

    private Future<DeviceUser> authenticate(MqttContext mqttContext) {
        return this.authHandler.authenticateDevice(mqttContext);
    }

    protected final void onSubscribe(MqttEndpoint mqttEndpoint, Device device, MqttSubscribeMessage mqttSubscribeMessage, CommandSubscriptionsManager<T> commandSubscriptionsManager, OptionalInt optionalInt) {
        Objects.requireNonNull(mqttEndpoint);
        Objects.requireNonNull(mqttSubscribeMessage);
        Objects.requireNonNull(commandSubscriptionsManager);
        Objects.requireNonNull(optionalInt);
        HashMap hashMap = new HashMap();
        LinkedHashMap linkedHashMap = new LinkedHashMap(mqttSubscribeMessage.topicSubscriptions().size());
        Span newSpan = newSpan("SUBSCRIBE", mqttEndpoint, device, optionalInt);
        mqttSubscribeMessage.topicSubscriptions().forEach(mqttTopicSubscription -> {
            Future failedFuture;
            Future future = (Future) hashMap.get(mqttTopicSubscription.topicName());
            if (future != null) {
                linkedHashMap.put(mqttTopicSubscription, future);
                return;
            }
            CommandSubscription fromTopic = CommandSubscription.fromTopic(mqttTopicSubscription, device, mqttEndpoint.clientIdentifier());
            if (fromTopic == null) {
                newSpan.log(String.format("ignoring unsupported topic filter [%s]", mqttTopicSubscription.topicName()));
                this.log.debug("cannot create subscription [filter: {}, requested QoS: {}]: unsupported topic filter", mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService());
                failedFuture = Future.failedFuture(new IllegalArgumentException("unsupported topic filter"));
            } else {
                failedFuture = MqttQoS.EXACTLY_ONCE.equals(mqttTopicSubscription.qualityOfService()) ? Future.failedFuture(new IllegalArgumentException("QoS 2 not supported for command subscription")) : createCommandConsumer(mqttEndpoint, fromTopic, commandSubscriptionsManager, newSpan).map(protocolAdapterCommandConsumer -> {
                    HashMap hashMap2 = new HashMap(4);
                    hashMap2.put("event", "accepting subscription");
                    hashMap2.put(KEY_TOPIC_FILTER, mqttTopicSubscription.topicName());
                    hashMap2.put("requested QoS", mqttTopicSubscription.qualityOfService());
                    hashMap2.put("granted QoS", mqttTopicSubscription.qualityOfService());
                    newSpan.log(hashMap2);
                    this.log.debug("created subscription [tenant: {}, device: {}, filter: {}, requested QoS: {}, granted QoS: {}]", new Object[]{fromTopic.getTenant(), fromTopic.getDeviceId(), mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService(), mqttTopicSubscription.qualityOfService()});
                    commandSubscriptionsManager.addSubscription(fromTopic, protocolAdapterCommandConsumer);
                    return fromTopic;
                }).recover(th -> {
                    HashMap hashMap2 = new HashMap(4);
                    hashMap2.put("event", Tags.ERROR.getKey());
                    hashMap2.put(KEY_TOPIC_FILTER, mqttTopicSubscription.topicName());
                    hashMap2.put("requested QoS", mqttTopicSubscription.qualityOfService());
                    hashMap2.put("message", "rejecting subscription: " + th.getMessage());
                    TracingHelper.logError(newSpan, hashMap2);
                    this.log.debug("cannot create subscription [tenant: {}, device: {}, filter: {}, requested QoS: {}]", new Object[]{fromTopic.getTenant(), fromTopic.getDeviceId(), mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService(), th});
                    return Future.failedFuture(th);
                });
            }
            hashMap.put(mqttTopicSubscription.topicName(), failedFuture);
            linkedHashMap.put(mqttTopicSubscription, failedFuture);
        });
        CompositeFuture.join(new ArrayList(linkedHashMap.values())).onComplete(asyncResult -> {
            List list = (List) linkedHashMap.entrySet().stream().map(entry -> {
                return ((Future) entry.getValue()).failed() ? MqttQoS.FAILURE : ((MqttTopicSubscription) entry.getKey()).qualityOfService();
            }).collect(Collectors.toList());
            if (mqttEndpoint.isConnected()) {
                mqttEndpoint.subscribeAcknowledge(mqttSubscribeMessage.messageId(), list);
            }
            hashMap.values().forEach(future -> {
                if (!future.succeeded() || future.result() == null) {
                    return;
                }
                CommandSubscription commandSubscription = (CommandSubscription) future.result();
                sendConnectedTtdEvent(commandSubscription.getTenant(), commandSubscription.getDeviceId(), device, newSpan.context());
            });
            newSpan.finish();
        });
    }

    private Span newSpan(String str, MqttEndpoint mqttEndpoint, Device device, OptionalInt optionalInt) {
        Span newChildSpan = newChildSpan(null, str, mqttEndpoint, device);
        optionalInt.ifPresent(i -> {
            TracingHelper.setTraceSamplingPriority(newChildSpan, i);
        });
        return newChildSpan;
    }

    private Span newChildSpan(SpanContext spanContext, String str, MqttEndpoint mqttEndpoint, Device device) {
        Span start = TracingHelper.buildChildSpan(this.tracer, spanContext, str, getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "server").withTag(TracingHelper.TAG_CLIENT_ID.getKey(), mqttEndpoint.clientIdentifier()).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), device != null).start();
        if (device != null) {
            TracingHelper.setDeviceTags(start, device.getTenantId(), device.getDeviceId());
        }
        return start;
    }

    protected final void onUnsubscribe(MqttEndpoint mqttEndpoint, Device device, MqttUnsubscribeMessage mqttUnsubscribeMessage, CommandSubscriptionsManager<T> commandSubscriptionsManager, OptionalInt optionalInt) {
        Objects.requireNonNull(mqttEndpoint);
        Objects.requireNonNull(mqttUnsubscribeMessage);
        Objects.requireNonNull(commandSubscriptionsManager);
        Objects.requireNonNull(optionalInt);
        Span newSpan = newSpan("UNSUBSCRIBE", mqttEndpoint, device, optionalInt);
        ArrayList arrayList = new ArrayList(mqttUnsubscribeMessage.topics().size());
        mqttUnsubscribeMessage.topics().forEach(str -> {
            CommandSubscription fromTopic = CommandSubscription.fromTopic(str, device);
            if (fromTopic == null) {
                HashMap hashMap = new HashMap(2);
                hashMap.put("event", "ignoring unsupported topic filter");
                hashMap.put(KEY_TOPIC_FILTER, str);
                newSpan.log(hashMap);
                this.log.debug("ignoring unsubscribe request for unsupported topic filter [{}]", str);
                return;
            }
            String tenant = fromTopic.getTenant();
            String deviceId = fromTopic.getDeviceId();
            HashMap hashMap2 = new HashMap(2);
            hashMap2.put("event", "unsubscribing device from topic");
            hashMap2.put(KEY_TOPIC_FILTER, str);
            newSpan.log(hashMap2);
            this.log.debug("unsubscribing device [tenant-id: {}, device-id: {}] from topic [{}]", new Object[]{tenant, deviceId, str});
            arrayList.add(commandSubscriptionsManager.removeSubscription(str, (str, str2) -> {
                return sendDisconnectedTtdEvent(str, str2, device, mqttEndpoint, newSpan);
            }, newSpan.context()));
        });
        if (mqttEndpoint.isConnected()) {
            mqttEndpoint.unsubscribeAcknowledge(mqttUnsubscribeMessage.messageId());
        }
        CompositeFuture.join(arrayList).onComplete(asyncResult -> {
            newSpan.finish();
        });
    }

    private Future<ProtocolAdapterCommandConsumer> createCommandConsumer(MqttEndpoint mqttEndpoint, CommandSubscription commandSubscription, CommandSubscriptionsManager<T> commandSubscriptionsManager, Span span) {
        Handler handler = commandContext -> {
            Tags.COMPONENT.set(commandContext.getTracingSpan(), getTypeName());
            TracingHelper.TAG_CLIENT_ID.set(commandContext.getTracingSpan(), mqttEndpoint.clientIdentifier());
            Timer.Sample startTimer = this.metrics.startTimer();
            Command command = commandContext.getCommand();
            Future tenantConfiguration = getTenantConfiguration(commandSubscription.getTenant(), commandContext.getTracingContext());
            tenantConfiguration.compose(tenantObject -> {
                return command.isValid() ? checkMessageLimit(tenantObject, command.getPayloadSize(), commandContext.getTracingContext()) : Future.failedFuture(new ClientErrorException(400, "malformed command message"));
            }).compose(r14 -> {
                addMicrometerSample(commandContext, startTimer);
                onCommandReceived((TenantObject) tenantConfiguration.result(), mqttEndpoint, commandSubscription, commandContext, commandSubscriptionsManager);
                return Future.succeededFuture();
            }).onFailure(th -> {
                if (th instanceof ClientErrorException) {
                    commandContext.reject(getErrorCondition(th));
                } else {
                    commandContext.release();
                }
                this.metrics.reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, commandSubscription.getTenant(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), command.getPayloadSize(), startTimer);
            });
        };
        return commandSubscription.isGatewaySubscriptionForSpecificDevice() ? getCommandConsumerFactory().createCommandConsumer(commandSubscription.getTenant(), commandSubscription.getDeviceId(), commandSubscription.getAuthenticatedDeviceId(), handler, (Duration) null, span.context()) : getCommandConsumerFactory().createCommandConsumer(commandSubscription.getTenant(), commandSubscription.getDeviceId(), handler, (Duration) null, span.context());
    }

    void handlePublishedMessage(MqttContext mqttContext) {
        Span start = TracingHelper.buildChildSpan(this.tracer, (SpanContext) Optional.ofNullable(mqttContext.propertyBag()).map(propertyBag -> {
            Tracer tracer = this.tracer;
            Objects.requireNonNull(propertyBag);
            return TracingHelper.extractSpanContext(tracer, propertyBag::getPropertiesIterator);
        }).orElse(null), "PUBLISH", getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), mqttContext.message().topicName()).withTag(TracingHelper.TAG_QOS.getKey(), mqttContext.message().qosLevel().toString()).withTag(TracingHelper.TAG_CLIENT_ID.getKey(), mqttContext.deviceEndpoint().clientIdentifier()).start();
        mqttContext.setTimer(getMetrics().startTimer());
        applyTenantTraceSamplingPriority(mqttContext, start).compose(optionalInt -> {
            mqttContext.setTracingContext(start.context());
            return checkTopic(mqttContext);
        }).compose(r5 -> {
            return onPublishedMessage(mqttContext);
        }).onComplete(asyncResult -> {
            if (asyncResult.succeeded()) {
                Tags.HTTP_STATUS.set(start, 202);
                onMessageSent(mqttContext);
            } else {
                if (asyncResult.cause() instanceof ServiceInvocationException) {
                    Tags.HTTP_STATUS.set(start, Integer.valueOf(asyncResult.cause().getErrorCode()));
                } else {
                    Tags.HTTP_STATUS.set(start, 500);
                }
                if (!(asyncResult.cause() instanceof ClientErrorException)) {
                    onMessageUndeliverable(mqttContext);
                }
                TracingHelper.logError(start, asyncResult.cause());
                if (mqttContext.deviceEndpoint().isConnected()) {
                    start.log("closing connection to device");
                    mqttContext.deviceEndpoint().close();
                }
            }
            start.finish();
        });
    }

    private Future<Void> checkTopic(MqttContext mqttContext) {
        return mqttContext.topic() == null ? Future.failedFuture(new ClientErrorException(400, "malformed topic name")) : Future.succeededFuture();
    }

    public final Future<Void> uploadMessage(MqttContext mqttContext, ResourceIdentifier resourceIdentifier, MqttPublishMessage mqttPublishMessage) {
        Objects.requireNonNull(mqttContext);
        Objects.requireNonNull(resourceIdentifier);
        Objects.requireNonNull(mqttPublishMessage);
        switch (AnonymousClass1.$SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.fromString(resourceIdentifier.getEndpoint()).ordinal()]) {
            case 1:
                return uploadTelemetryMessage(mqttContext, resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), mqttPublishMessage.payload());
            case 2:
                return uploadEventMessage(mqttContext, resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), mqttPublishMessage.payload());
            case 3:
                return uploadCommandResponseMessage(mqttContext, resourceIdentifier);
            default:
                return Future.failedFuture(new ClientErrorException(400, "unsupported endpoint"));
        }
    }

    public final Future<Void> uploadTelemetryMessage(MqttContext mqttContext, String str, String str2, Buffer buffer) {
        Objects.requireNonNull(mqttContext);
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(buffer);
        if (mqttContext.endpoint() != MetricsTags.EndpointType.TELEMETRY) {
            throw new IllegalArgumentException("context does not contain telemetry message but " + mqttContext.endpoint().getCanonicalName());
        }
        MetricsTags.QoS from = MetricsTags.QoS.from(mqttContext.message().qosLevel().value());
        Future tenantConfiguration = getTenantConfiguration(str, mqttContext.getTracingContext());
        return tenantConfiguration.compose(tenantObject -> {
            return uploadMessage(mqttContext, tenantObject, str2, buffer, getTelemetrySender(str), mqttContext.endpoint());
        }).compose(r14 -> {
            this.metrics.reportTelemetry(mqttContext.endpoint(), mqttContext.tenant(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, from, buffer.length(), mqttContext.getTimer());
            return Future.succeededFuture();
        }).recover(th -> {
            this.metrics.reportTelemetry(mqttContext.endpoint(), mqttContext.tenant(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), from, buffer.length(), mqttContext.getTimer());
            return Future.failedFuture(th);
        });
    }

    public final Future<Void> uploadEventMessage(MqttContext mqttContext, String str, String str2, Buffer buffer) {
        Objects.requireNonNull(mqttContext);
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(buffer);
        if (mqttContext.endpoint() != MetricsTags.EndpointType.EVENT) {
            throw new IllegalArgumentException("context does not contain event but " + mqttContext.endpoint().getCanonicalName());
        }
        MetricsTags.QoS from = MetricsTags.QoS.from(mqttContext.message().qosLevel().value());
        Future tenantConfiguration = getTenantConfiguration(str, mqttContext.getTracingContext());
        return tenantConfiguration.compose(tenantObject -> {
            return uploadMessage(mqttContext, tenantObject, str2, buffer, getEventSender(str), mqttContext.endpoint());
        }).compose(r14 -> {
            this.metrics.reportTelemetry(mqttContext.endpoint(), mqttContext.tenant(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, from, buffer.length(), mqttContext.getTimer());
            return Future.succeededFuture();
        }).recover(th -> {
            this.metrics.reportTelemetry(mqttContext.endpoint(), mqttContext.tenant(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), from, buffer.length(), mqttContext.getTimer());
            return Future.failedFuture(th);
        });
    }

    public final Future<Void> uploadCommandResponseMessage(MqttContext mqttContext, ResourceIdentifier resourceIdentifier) {
        Future failedFuture;
        Objects.requireNonNull(mqttContext);
        Objects.requireNonNull(resourceIdentifier);
        String[] resourcePath = resourceIdentifier.getResourcePath();
        Integer num = null;
        String str = null;
        if (resourcePath.length <= 5) {
            failedFuture = Future.failedFuture(new ClientErrorException(400, "command response topic has too few segments"));
        } else {
            try {
                num = Integer.valueOf(Integer.parseInt(resourcePath[5]));
            } catch (NumberFormatException e) {
                this.log.trace("got invalid status code [{}] [tenant-id: {}, device-id: {}]", new Object[]{resourcePath[5], resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId()});
            }
            if (num != null) {
                str = resourcePath[4];
                CommandResponse from = CommandResponse.from(str, resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), mqttContext.message().payload(), mqttContext.contentType(), num);
                failedFuture = from != null ? Future.succeededFuture(from) : Future.failedFuture(new ClientErrorException(400, "command response topic contains invalid data"));
            } else {
                failedFuture = Future.failedFuture(new ClientErrorException(400, "invalid status code"));
            }
        }
        Span start = TracingHelper.buildChildSpan(this.tracer, mqttContext.getTracingContext(), "upload Command response", getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag(TracingHelper.TAG_TENANT_ID, resourceIdentifier.getTenantId()).withTag(TracingHelper.TAG_DEVICE_ID, resourceIdentifier.getResourceId()).withTag("hono-cmd-status", num).withTag("hono-cmd-req-id", str).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), mqttContext.authenticatedDevice() != null).start();
        int intValue = ((Integer) Optional.ofNullable(mqttContext.message().payload()).map((v0) -> {
            return v0.length();
        }).orElse(0)).intValue();
        Future registrationAssertion = getRegistrationAssertion(resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), mqttContext.authenticatedDevice(), start.context());
        Future tenantConfiguration = getTenantConfiguration(resourceIdentifier.getTenantId(), mqttContext.getTracingContext());
        Future map = CompositeFuture.all(isAdapterEnabled((TenantObject) tenantConfiguration.result()), checkMessageLimit((TenantObject) tenantConfiguration.result(), intValue, start.context())).map(compositeFuture -> {
            return (TenantObject) tenantConfiguration.result();
        });
        Future future = failedFuture;
        return CompositeFuture.all(tenantConfiguration, failedFuture).compose(compositeFuture2 -> {
            return CompositeFuture.all(registrationAssertion, map);
        }).compose(compositeFuture3 -> {
            return sendCommandResponse(resourceIdentifier.getTenantId(), (CommandResponse) future.result(), start.context());
        }).compose(protonDelivery -> {
            this.log.trace("successfully forwarded command response from device [tenant-id: {}, device-id: {}]", resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId());
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, resourceIdentifier.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, intValue, mqttContext.getTimer());
            if (mqttContext.deviceEndpoint().isConnected() && mqttContext.message().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
                mqttContext.deviceEndpoint().publishAcknowledge(mqttContext.message().messageId());
            }
            start.finish();
            return Future.succeededFuture();
        }).recover(th -> {
            TracingHelper.logError(start, th);
            start.finish();
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, resourceIdentifier.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), intValue, mqttContext.getTimer());
            return Future.failedFuture(th);
        });
    }

    private Future<Void> uploadMessage(MqttContext mqttContext, TenantObject tenantObject, String str, Buffer buffer, Future<DownstreamSender> future, MetricsTags.EndpointType endpointType) {
        if (!isPayloadOfIndicatedType(buffer, mqttContext.contentType())) {
            return Future.failedFuture(new ClientErrorException(400, String.format("Content-Type %s does not match payload", mqttContext.contentType())));
        }
        Span start = TracingHelper.buildChildSpan(this.tracer, mqttContext.getTracingContext(), "upload " + endpointType, getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag(TracingHelper.TAG_TENANT_ID, tenantObject.getTenantId()).withTag(TracingHelper.TAG_DEVICE_ID, str).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), mqttContext.authenticatedDevice() != null).start();
        Future registrationAssertion = getRegistrationAssertion(tenantObject.getTenantId(), str, mqttContext.authenticatedDevice(), start.context());
        return CompositeFuture.all(registrationAssertion, CompositeFuture.all(isAdapterEnabled(tenantObject), checkMessageLimit(tenantObject, buffer.length(), start.context())), future).compose(compositeFuture -> {
            DownstreamSender downstreamSender = (DownstreamSender) future.result();
            Message newMessage = newMessage(mqttContext.getRequestedQos(), ResourceIdentifier.from(endpointType.getCanonicalName(), tenantObject.getTenantId(), str), mqttContext.message().topicName(), mqttContext.contentType(), buffer, tenantObject, (JsonObject) registrationAssertion.result(), null, MetricsTags.EndpointType.EVENT.equals(endpointType) ? getTimeToLive(mqttContext.propertyBag()) : null);
            addRetainAnnotation(mqttContext, newMessage, start);
            customizeDownstreamMessage(newMessage, mqttContext);
            return mqttContext.isAtLeastOnce() ? downstreamSender.sendAndWaitForOutcome(newMessage, start.context()) : downstreamSender.send(newMessage, start.context());
        }).compose(protonDelivery -> {
            this.log.trace("successfully processed message [topic: {}, QoS: {}] from device [tenantId: {}, deviceId: {}]", new Object[]{mqttContext.message().topicName(), mqttContext.message().qosLevel(), tenantObject.getTenantId(), str});
            if (mqttContext.isAtLeastOnce() && mqttContext.deviceEndpoint().isConnected()) {
                start.log("sending PUBACK");
                mqttContext.acknowledge();
            }
            start.finish();
            return Future.succeededFuture();
        }).recover(th -> {
            if (ClientErrorException.class.isInstance(th)) {
                ClientErrorException clientErrorException = (ClientErrorException) th;
                this.log.debug("cannot process message [endpoint: {}] from device [tenantId: {}, deviceId: {}]: {} - {}", new Object[]{endpointType, tenantObject.getTenantId(), str, Integer.valueOf(clientErrorException.getErrorCode()), clientErrorException.getMessage()});
            } else {
                this.log.debug("cannot process message [endpoint: {}] from device [tenantId: {}, deviceId: {}]", new Object[]{endpointType, tenantObject.getTenantId(), str, th});
            }
            TracingHelper.logError(start, th);
            start.finish();
            return Future.failedFuture(th);
        });
    }

    protected final void close(MqttEndpoint mqttEndpoint, Device device, CommandSubscriptionsManager<T> commandSubscriptionsManager, OptionalInt optionalInt) {
        Objects.requireNonNull(mqttEndpoint);
        Objects.requireNonNull(commandSubscriptionsManager);
        Objects.requireNonNull(optionalInt);
        Span newSpan = newSpan("CLOSE", mqttEndpoint, device, optionalInt);
        onClose(mqttEndpoint);
        CompositeFuture removeAllSubscriptions = commandSubscriptionsManager.removeAllSubscriptions((str, str2) -> {
            return sendDisconnectedTtdEvent(str, str2, device, mqttEndpoint, newSpan);
        }, newSpan.context());
        sendDisconnectedEvent(mqttEndpoint.clientIdentifier(), device);
        if (device == null) {
            this.log.debug("connection to anonymous device [clientId: {}] closed", mqttEndpoint.clientIdentifier());
            this.metrics.decrementUnauthenticatedConnections();
        } else {
            this.log.debug("connection to device [tenant-id: {}, device-id: {}] closed", device.getTenantId(), device.getDeviceId());
            this.metrics.decrementConnections(device.getTenantId());
        }
        if (mqttEndpoint.isConnected()) {
            this.log.debug("closing connection with client [client ID: {}]", mqttEndpoint.clientIdentifier());
            mqttEndpoint.close();
        } else {
            this.log.trace("connection to client is already closed");
        }
        removeAllSubscriptions.onComplete(asyncResult -> {
            newSpan.finish();
        });
    }

    private Future<Void> sendDisconnectedTtdEvent(String str, String str2, Device device, MqttEndpoint mqttEndpoint, Span span) {
        Span newChildSpan = newChildSpan(span.context(), "Send Disconnected Event", mqttEndpoint, device);
        return sendDisconnectedTtdEvent(str, str2, device, newChildSpan.context()).onComplete(asyncResult -> {
            newChildSpan.finish();
        }).mapEmpty();
    }

    protected void onClose(MqttEndpoint mqttEndpoint) {
    }

    protected final Future<UsernamePasswordCredentials> getCredentials(MqttEndpoint mqttEndpoint) {
        if (mqttEndpoint.auth() == null) {
            return Future.failedFuture(new ClientErrorException(401, "device did not provide credentials in CONNECT packet"));
        }
        if (mqttEndpoint.auth().getUsername() == null || mqttEndpoint.auth().getPassword() == null) {
            return Future.failedFuture(new ClientErrorException(401, "device provided malformed credentials in CONNECT packet"));
        }
        UsernamePasswordCredentials create = UsernamePasswordCredentials.create(mqttEndpoint.auth().getUsername(), mqttEndpoint.auth().getPassword(), ((MqttProtocolAdapterProperties) getConfig()).isSingleTenant());
        return create == null ? Future.failedFuture(new ClientErrorException(401, "device provided malformed credentials in CONNECT packet")) : Future.succeededFuture(create);
    }

    protected abstract Future<Void> onPublishedMessage(MqttContext mqttContext);

    protected void customizeDownstreamMessage(Message message, MqttContext mqttContext) {
    }

    protected void onMessageSent(MqttContext mqttContext) {
    }

    protected void onMessageUndeliverable(MqttContext mqttContext) {
    }

    protected final void onCommandReceived(TenantObject tenantObject, MqttEndpoint mqttEndpoint, CommandSubscription commandSubscription, CommandContext commandContext, CommandSubscriptionsManager<T> commandSubscriptionsManager) {
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(mqttEndpoint);
        Objects.requireNonNull(commandSubscription);
        Objects.requireNonNull(commandContext);
        Objects.requireNonNull(commandSubscriptionsManager);
        Command command = commandContext.getCommand();
        String format = String.format("%s/%s/%s/%s/%s/%s", commandSubscription.getEndpoint(), commandSubscription.isAuthenticated() ? "" : commandSubscription.getTenant(), command.isTargetedAtGateway() ? command.getOriginalDeviceId() : commandSubscription.isAuthenticated() ? "" : commandSubscription.getDeviceId(), commandSubscription.getRequestPart(), command.isOneWay() ? "" : command.getRequestId(), command.getName());
        Tags.MESSAGE_BUS_DESTINATION.set(commandContext.getTracingSpan(), format);
        TracingHelper.TAG_QOS.set(commandContext.getTracingSpan(), commandSubscription.getQos().name());
        mqttEndpoint.publish(format, command.getPayload(), commandSubscription.getQos(), false, false, asyncResult -> {
            if (asyncResult.succeeded()) {
                if (command.isTargetedAtGateway()) {
                    this.log.debug("published command [packet-id: {}] to gateway [tenant-id: {}, gateway-id: {}, device-id: {}, MQTT client-id: {}, QoS: {}, topic: {}]", new Object[]{asyncResult.result(), commandSubscription.getTenant(), commandSubscription.getDeviceId(), command.getOriginalDeviceId(), commandSubscription.getClientId(), commandSubscription.getQos(), format});
                } else {
                    this.log.debug("published command [packet-id: {}] to device [tenant-id: {}, device-id: {}, MQTT client-id: {}, QoS: {}, topic: {}]", new Object[]{asyncResult.result(), commandSubscription.getTenant(), commandSubscription.getDeviceId(), commandSubscription.getClientId(), commandSubscription.getQos(), format});
                }
                commandContext.getTracingSpan().log("published command");
                afterCommandPublished((Integer) asyncResult.result(), commandContext, tenantObject, commandSubscription, commandSubscriptionsManager);
                return;
            }
            if (command.isTargetedAtGateway()) {
                this.log.debug("error publishing command to gateway [tenant-id: {}, gateway-id: {}, device-id: {}, MQTT client-id: {}, QoS: {}, topic: {}]", new Object[]{commandSubscription.getTenant(), commandSubscription.getDeviceId(), command.getOriginalDeviceId(), commandSubscription.getClientId(), commandSubscription.getQos(), format, asyncResult.cause()});
            } else {
                this.log.debug("error publishing command to device [tenant-id: {}, device-id: {}, MQTT client-id: {}, QoS: {}, topic: {}]", new Object[]{commandSubscription.getTenant(), commandSubscription.getDeviceId(), commandSubscription.getClientId(), commandSubscription.getQos(), format, asyncResult.cause()});
            }
            TracingHelper.logError(commandContext.getTracingSpan(), "failed to publish command", asyncResult.cause());
            reportPublishedCommand(tenantObject, commandSubscription, commandContext, MetricsTags.ProcessingOutcome.from(asyncResult.cause()));
            commandContext.release();
        });
    }

    private void afterCommandPublished(Integer num, CommandContext commandContext, TenantObject tenantObject, CommandSubscription commandSubscription, CommandSubscriptionsManager<T> commandSubscriptionsManager) {
        if (MqttQoS.AT_LEAST_ONCE.equals(commandSubscription.getQos())) {
            commandSubscriptionsManager.addToWaitingForAcknowledgement(num, num2 -> {
                reportPublishedCommand(tenantObject, commandSubscription, commandContext, MetricsTags.ProcessingOutcome.FORWARDED);
                this.log.debug("received PUBACK [packet-id: {}] for command [tenant-id: {}, device-id: {}, MQTT client-id: {}]", new Object[]{num2, commandSubscription.getTenant(), commandSubscription.getDeviceId(), commandSubscription.getClientId()});
                commandContext.getTracingSpan().log("received PUBACK from device");
                commandContext.accept();
            }, r12 -> {
                this.log.debug("did not receive PUBACK [packet-id: {}] for command [tenant-id: {}, device-id: {}, MQTT client-id: {}]", new Object[]{num, commandSubscription.getTenant(), commandSubscription.getDeviceId(), commandSubscription.getClientId()});
                TracingHelper.logError(commandContext.getTracingSpan(), "did not receive PUBACK from device");
                commandContext.release();
                reportPublishedCommand(tenantObject, commandSubscription, commandContext, MetricsTags.ProcessingOutcome.UNDELIVERABLE);
            });
        } else {
            reportPublishedCommand(tenantObject, commandSubscription, commandContext, MetricsTags.ProcessingOutcome.FORWARDED);
            commandContext.accept();
        }
    }

    private void reportPublishedCommand(TenantObject tenantObject, CommandSubscription commandSubscription, CommandContext commandContext, MetricsTags.ProcessingOutcome processingOutcome) {
        this.metrics.reportCommand(commandContext.getCommand().isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, commandSubscription.getTenant(), tenantObject, processingOutcome, commandContext.getCommand().getPayloadSize(), getMicrometerSample(commandContext));
    }

    protected final Duration getTimeToLive(PropertyBag propertyBag) {
        try {
            return (Duration) Optional.ofNullable(propertyBag).map(propertyBag2 -> {
                return propertyBag2.getProperty("hono-ttl");
            }).map(Long::parseLong).map(l -> {
                if (l.longValue() < 0) {
                    return null;
                }
                return Duration.ofSeconds(l.longValue());
            }).orElse(null);
        } catch (NumberFormatException e) {
            return null;
        }
    }

    private static void addRetainAnnotation(MqttContext mqttContext, Message message, Span span) {
        if (mqttContext.message().isRetain()) {
            span.log("device wants to retain message");
            MessageHelper.addAnnotation(message, "x-opt-retain", Boolean.TRUE);
        }
    }

    private Future<Device> createLinks(Device device, Span span) {
        Future telemetrySender = getTelemetrySender(device.getTenantId());
        Future eventSender = getEventSender(device.getTenantId());
        return CompositeFuture.all(telemetrySender, eventSender).compose(compositeFuture -> {
            span.log("opened downstream links");
            this.log.debug("providently opened downstream links [credit telemetry: {}, credit event: {}] for tenant [{}]", new Object[]{Integer.valueOf(((DownstreamSender) telemetrySender.result()).getCredit()), Integer.valueOf(((DownstreamSender) eventSender.result()).getCredit()), device.getTenantId()});
            return Future.succeededFuture(device);
        });
    }

    private Future<Device> registerHandlers(MqttEndpoint mqttEndpoint, Device device, OptionalInt optionalInt) {
        CommandSubscriptionsManager commandSubscriptionsManager = new CommandSubscriptionsManager(this.vertx, (MqttProtocolAdapterProperties) getConfig());
        mqttEndpoint.closeHandler(r11 -> {
            close(mqttEndpoint, device, commandSubscriptionsManager, optionalInt);
        });
        mqttEndpoint.publishHandler(mqttPublishMessage -> {
            handlePublishedMessage(MqttContext.fromPublishPacket(mqttPublishMessage, mqttEndpoint, device));
        });
        Objects.requireNonNull(commandSubscriptionsManager);
        mqttEndpoint.publishAcknowledgeHandler(commandSubscriptionsManager::handlePubAck);
        mqttEndpoint.subscribeHandler(mqttSubscribeMessage -> {
            onSubscribe(mqttEndpoint, device, mqttSubscribeMessage, commandSubscriptionsManager, optionalInt);
        });
        mqttEndpoint.unsubscribeHandler(mqttUnsubscribeMessage -> {
            onUnsubscribe(mqttEndpoint, device, mqttUnsubscribeMessage, commandSubscriptionsManager, optionalInt);
        });
        if (device == null) {
            this.metrics.incrementUnauthenticatedConnections();
        } else {
            this.metrics.incrementConnections(device.getTenantId());
        }
        return Future.succeededFuture(device);
    }

    private static MqttConnectReturnCode getConnectReturnCode(Throwable th) {
        if (th instanceof MqttConnectionException) {
            return ((MqttConnectionException) th).code();
        }
        if (th instanceof AuthorizationException) {
            return th instanceof AdapterConnectionsExceededException ? MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE : MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
        }
        if (!(th instanceof ServiceInvocationException)) {
            return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
        }
        switch (((ServiceInvocationException) th).getErrorCode()) {
            case 401:
            case 404:
                return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
            case 503:
                return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
            default:
                return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
        }
    }
}
