package org.eclipse.hono.adapter.mqtt;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttAuth;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubscribeMessage;
import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.auth.device.Device;
import org.eclipse.hono.service.auth.device.DeviceCredentials;
import org.eclipse.hono.service.auth.device.HonoClientBasedAuthProvider;
import org.eclipse.hono.service.auth.device.UsernamePasswordAuthProvider;
import org.eclipse.hono.service.auth.device.UsernamePasswordCredentials;
import org.eclipse.hono.service.command.Command;
import org.eclipse.hono.service.command.CommandContext;
import org.eclipse.hono.service.command.CommandResponse;
import org.eclipse.hono.service.command.CommandSubscription;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.EndpointType;
import org.eclipse.hono.util.ExecutionContext;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.TenantObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.class */
public abstract class AbstractVertxBasedMqttProtocolAdapter<T extends ProtocolAdapterProperties> extends AbstractProtocolAdapterBase<T> {
    public static final String KEY_CURRENT_SPAN = MqttContext.class.getName() + ".serverSpan";
    private static final int IANA_MQTT_PORT = 1883;
    private static final int IANA_SECURE_MQTT_PORT = 8883;
    protected final Logger LOG = LoggerFactory.getLogger(getClass());
    private MqttAdapterMetrics metrics = MqttAdapterMetrics.NOOP;
    private MqttServer server;
    private MqttServer insecureServer;
    private HonoClientBasedAuthProvider usernamePasswordAuthProvider;

    /* renamed from: org.eclipse.hono.adapter.mqtt.AbstractVertxBasedMqttProtocolAdapter$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$hono$util$EndpointType = new int[EndpointType.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$hono$util$EndpointType[EndpointType.TELEMETRY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$hono$util$EndpointType[EndpointType.EVENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$eclipse$hono$util$EndpointType[EndpointType.CONTROL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public final void setUsernamePasswordAuthProvider(HonoClientBasedAuthProvider honoClientBasedAuthProvider) {
        this.usernamePasswordAuthProvider = (HonoClientBasedAuthProvider) Objects.requireNonNull(honoClientBasedAuthProvider);
    }

    public int getPortDefaultValue() {
        return IANA_SECURE_MQTT_PORT;
    }

    public int getInsecurePortDefaultValue() {
        return IANA_MQTT_PORT;
    }

    protected final int getActualPort() {
        if (this.server != null) {
            return this.server.actualPort();
        }
        return -1;
    }

    protected final int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }

    @Autowired
    public final void setMetrics(MqttAdapterMetrics mqttAdapterMetrics) {
        this.metrics = mqttAdapterMetrics;
    }

    public void setMqttSecureServer(MqttServer mqttServer) {
        Objects.requireNonNull(mqttServer);
        if (mqttServer.actualPort() > 0) {
            throw new IllegalArgumentException("MQTT server must not be started already");
        }
        this.server = mqttServer;
    }

    public void setMqttInsecureServer(MqttServer mqttServer) {
        Objects.requireNonNull(mqttServer);
        if (mqttServer.actualPort() > 0) {
            throw new IllegalArgumentException("MQTT server must not be started already");
        }
        this.insecureServer = mqttServer;
    }

    private Future<Void> bindSecureMqttServer() {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        MqttServerOptions mqttServerOptions = new MqttServerOptions();
        mqttServerOptions.setHost(((ProtocolAdapterProperties) getConfig()).getBindAddress()).setPort(determineSecurePort()).setMaxMessageSize(((ProtocolAdapterProperties) getConfig()).getMaxPayloadSize());
        addTlsKeyCertOptions(mqttServerOptions);
        addTlsTrustOptions(mqttServerOptions);
        return bindMqttServer(mqttServerOptions, this.server).map(mqttServer -> {
            this.server = mqttServer;
            return (Void) null;
        }).recover(th -> {
            return Future.failedFuture(th);
        });
    }

    private Future<Void> bindInsecureMqttServer() {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        MqttServerOptions mqttServerOptions = new MqttServerOptions();
        mqttServerOptions.setHost(((ProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress()).setPort(determineInsecurePort()).setMaxMessageSize(((ProtocolAdapterProperties) getConfig()).getMaxPayloadSize());
        return bindMqttServer(mqttServerOptions, this.insecureServer).map(mqttServer -> {
            this.insecureServer = mqttServer;
            return (Void) null;
        }).recover(th -> {
            return Future.failedFuture(th);
        });
    }

    private Future<MqttServer> bindMqttServer(MqttServerOptions mqttServerOptions, MqttServer mqttServer) {
        Future<MqttServer> future = Future.future();
        MqttServer create = mqttServer == null ? MqttServer.create(this.vertx, mqttServerOptions) : mqttServer;
        create.endpointHandler(this::handleEndpointConnection).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                this.LOG.info("MQTT server running on {}:{}", ((ProtocolAdapterProperties) getConfig()).getBindAddress(), Integer.valueOf(create.actualPort()));
                future.complete(create);
            } else {
                this.LOG.error("error while starting up MQTT server", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    public void doStart(Future<Void> future) {
        this.LOG.info("limiting size of inbound message payload to {} bytes", Integer.valueOf(((ProtocolAdapterProperties) getConfig()).getMaxPayloadSize()));
        if (!((ProtocolAdapterProperties) getConfig()).isAuthenticationRequired()) {
            this.LOG.warn("authentication of devices turned off");
        }
        checkPortConfiguration().compose(r4 -> {
            return CompositeFuture.all(bindSecureMqttServer(), bindInsecureMqttServer());
        }).compose(compositeFuture -> {
            if (this.usernamePasswordAuthProvider == null) {
                this.usernamePasswordAuthProvider = new UsernamePasswordAuthProvider(getCredentialsServiceClient(), (ServiceConfigProperties) getConfig());
            }
            future.complete();
        }, future);
    }

    public void doStop(Future<Void> future) {
        Future future2 = Future.future();
        if (this.server != null) {
            this.server.close(future2.completer());
        } else {
            future2.complete();
        }
        Future future3 = Future.future();
        if (this.insecureServer != null) {
            this.insecureServer.close(future3.completer());
        } else {
            future3.complete();
        }
        CompositeFuture.all(future2, future3).compose(compositeFuture -> {
            future.complete();
        }, future);
    }

    protected static <T> Future<T> rejected(MqttConnectReturnCode mqttConnectReturnCode) {
        return Future.failedFuture(new MqttConnectionException(mqttConnectReturnCode != null ? mqttConnectReturnCode : MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Future<Device> accepted(Device device) {
        return Future.succeededFuture(device);
    }

    protected static Future<Device> accepted() {
        return Future.succeededFuture((Object) null);
    }

    final void handleEndpointConnection(MqttEndpoint mqttEndpoint) {
        this.LOG.debug("connection request from client [client-id: {}]", mqttEndpoint.clientIdentifier());
        Span start = this.tracer.buildSpan("CONNECT").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).withTag(TracingHelper.TAG_CLIENT_ID.getKey(), mqttEndpoint.clientIdentifier()).start();
        if (!mqttEndpoint.isCleanSession()) {
            start.log("ignoring client's intent to resume existing session");
        }
        if (mqttEndpoint.will() != null) {
            start.log("ignoring client's last will");
        }
        isConnected().compose(r7 -> {
            return handleConnectionRequest(mqttEndpoint, start);
        }).setHandler(asyncResult -> {
            handleConnectionRequestResult(mqttEndpoint, start, asyncResult);
        });
    }

    private Future<Device> handleConnectionRequest(MqttEndpoint mqttEndpoint, Span span) {
        return ((ProtocolAdapterProperties) getConfig()).isAuthenticationRequired() ? handleEndpointConnectionWithAuthentication(mqttEndpoint, span) : handleEndpointConnectionWithoutAuthentication(mqttEndpoint);
    }

    private void handleConnectionRequestResult(MqttEndpoint mqttEndpoint, Span span, AsyncResult<Device> asyncResult) {
        if (asyncResult.succeeded()) {
            Device device = (Device) asyncResult.result();
            TracingHelper.TAG_AUTHENTICATED.set(span, Boolean.valueOf(device != null));
            sendConnectedEvent(mqttEndpoint.clientIdentifier(), device).setHandler(asyncResult2 -> {
                if (!asyncResult2.succeeded()) {
                    this.LOG.warn("connection request from client [clientId: {}] rejected due to connection event failure: {}", new Object[]{mqttEndpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, asyncResult2.cause()});
                    mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                    TracingHelper.logError(span, asyncResult2.cause());
                } else {
                    mqttEndpoint.accept(false);
                    if (device != null) {
                        span.setTag("tenant_id", ((Device) asyncResult.result()).getTenantId());
                        span.setTag("device_id", ((Device) asyncResult.result()).getDeviceId());
                    }
                    span.log("connection accepted");
                }
            });
        } else {
            TracingHelper.TAG_AUTHENTICATED.set(span, false);
            MqttConnectionException cause = asyncResult.cause();
            if (cause instanceof MqttConnectionException) {
                MqttConnectReturnCode code = cause.code();
                this.LOG.debug("connection request from client [clientId: {}] rejected with code: {}", mqttEndpoint.clientIdentifier(), code);
                mqttEndpoint.reject(code);
            } else {
                this.LOG.debug("connection request from client [clientId: {}] rejected: {}", mqttEndpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
            }
            TracingHelper.logError(span, cause);
        }
        span.finish();
    }

    private Future<Device> handleEndpointConnectionWithoutAuthentication(MqttEndpoint mqttEndpoint) {
        mqttEndpoint.closeHandler(r6 -> {
            close(mqttEndpoint, null);
        });
        mqttEndpoint.publishHandler(mqttPublishMessage -> {
            handlePublishedMessage(new MqttContext(mqttPublishMessage, mqttEndpoint));
        });
        mqttEndpoint.subscribeHandler(mqttSubscribeMessage -> {
            onSubscribe(mqttEndpoint, null, mqttSubscribeMessage);
        });
        mqttEndpoint.unsubscribeHandler(mqttUnsubscribeMessage -> {
            onUnsubscribe(mqttEndpoint, null, mqttUnsubscribeMessage);
        });
        this.LOG.debug("unauthenticated device [clientId: {}] connected", mqttEndpoint.clientIdentifier());
        this.metrics.incrementUnauthenticatedConnections();
        return accepted();
    }

    private Future<Device> handleEndpointConnectionWithAuthentication(MqttEndpoint mqttEndpoint, Span span) {
        if (mqttEndpoint.auth() == null) {
            this.LOG.debug("connection request from device [clientId: {}] rejected: {}", mqttEndpoint.clientIdentifier(), "device did not provide credentials in CONNECT packet");
            return rejected(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
        }
        DeviceCredentials credentials = getCredentials(mqttEndpoint.auth());
        if (credentials != null) {
            return getTenantConfiguration(credentials.getTenantId(), span.context()).compose(tenantObject -> {
                if (tenantObject.isAdapterEnabled(getTypeName())) {
                    this.LOG.debug("protocol adapter [{}] is enabled for tenant [{}]", getTypeName(), credentials.getTenantId());
                    return Future.succeededFuture(tenantObject);
                }
                this.LOG.debug("protocol adapter [{}] is disabled for tenant [{}]", getTypeName(), credentials.getTenantId());
                return Future.failedFuture(new ClientErrorException(403, "adapter disabled for tenant"));
            }).compose(tenantObject2 -> {
                Future future = Future.future();
                this.usernamePasswordAuthProvider.authenticate(credentials, future.completer());
                return future;
            }).compose(device -> {
                span.log(String.format("device authenticated", new Object[0]));
                this.LOG.debug("successfully authenticated device [tenant-id: {}, auth-id: {}, device-id: {}]", new Object[]{device.getTenantId(), credentials.getAuthId(), device.getDeviceId()});
                return triggerLinkCreation(device.getTenantId()).map(r8 -> {
                    span.log(String.format("opened downstream links", new Object[0]));
                    onAuthenticationSuccess(mqttEndpoint, device);
                    return null;
                }).compose(obj -> {
                    return accepted(device);
                });
            }).recover(th -> {
                this.LOG.debug("cannot establish connection with device [tenant-id: {}, auth-id: {}]", new Object[]{credentials.getTenantId(), credentials.getAuthId(), th});
                return th instanceof ServerErrorException ? rejected(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE) : rejected(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
            });
        }
        this.LOG.debug("connection request from device [clientId: {}] rejected: {}", mqttEndpoint.clientIdentifier(), "device provided malformed credentials in CONNECT packet");
        return rejected(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
    }

    private void onAuthenticationSuccess(MqttEndpoint mqttEndpoint, Device device) {
        mqttEndpoint.closeHandler(r7 -> {
            close(mqttEndpoint, device);
        });
        mqttEndpoint.publishHandler(mqttPublishMessage -> {
            handlePublishedMessage(new MqttContext(mqttPublishMessage, mqttEndpoint, device));
        });
        mqttEndpoint.subscribeHandler(mqttSubscribeMessage -> {
            onSubscribe(mqttEndpoint, device, mqttSubscribeMessage);
        });
        mqttEndpoint.unsubscribeHandler(mqttUnsubscribeMessage -> {
            onUnsubscribe(mqttEndpoint, device, mqttUnsubscribeMessage);
        });
        this.metrics.incrementConnections(device.getTenantId());
    }

    protected final void onSubscribe(MqttEndpoint mqttEndpoint, Device device, MqttSubscribeMessage mqttSubscribeMessage) {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList(mqttSubscribeMessage.topicSubscriptions().size());
        Span start = this.tracer.buildSpan("SUBSCRIBE").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).withTag(TracingHelper.TAG_CLIENT_ID.getKey(), mqttEndpoint.clientIdentifier()).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), device != null).start();
        if (device != null) {
            start.setTag("tenant_id", device.getTenantId());
            start.setTag("device_id", device.getDeviceId());
        }
        mqttSubscribeMessage.topicSubscriptions().forEach(mqttTopicSubscription -> {
            Future recover;
            Future future = (Future) hashMap.get(mqttTopicSubscription.topicName());
            if (future != null) {
                arrayList.add(future);
                return;
            }
            CommandSubscription fromTopic = CommandSubscription.fromTopic(mqttTopicSubscription.topicName(), device);
            if (fromTopic == null) {
                start.log(String.format("ignoring unsupported topic filter [%s]", mqttTopicSubscription.topicName()));
                this.LOG.debug("cannot create subscription [filter: {}, requested QoS: {}]: unsupported topic filter", mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService());
                recover = Future.failedFuture(new IllegalArgumentException("unsupported topic filter"));
            } else {
                recover = createCommandConsumer(mqttEndpoint, fromTopic).map(messageConsumer -> {
                    mqttEndpoint.closeHandler(r9 -> {
                        sendDisconnectedTtdEvent(fromTopic.getTenant(), fromTopic.getDeviceId(), device);
                        closeCommandConsumer(fromTopic.getTenant(), fromTopic.getDeviceId());
                        close(mqttEndpoint, device);
                    });
                    start.log(String.format("accepting subscription [filter: %s, requested QoS: %s, granted QoS: %s]", mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService(), MqttQoS.AT_MOST_ONCE));
                    this.LOG.debug("created subscription [tenant: {}, device: {}, filter: {}, requested QoS: {}, granted QoS: {}]", new Object[]{fromTopic.getTenant(), fromTopic.getDeviceId(), mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService(), MqttQoS.AT_MOST_ONCE});
                    return fromTopic;
                }).recover(th -> {
                    TracingHelper.logError(start, String.format("error creating subscription [filter: %s, requested QoS: %s]: %s", mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService(), th.getMessage()));
                    this.LOG.debug("cannot create subscription [tenant: {}, device: {}, filter: {}, requested QoS: {}]", new Object[]{fromTopic.getTenant(), fromTopic.getDeviceId(), mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService(), th});
                    return Future.failedFuture(th);
                });
            }
            hashMap.put(mqttTopicSubscription.topicName(), recover);
            arrayList.add(recover);
        });
        CompositeFuture.join(arrayList).setHandler(asyncResult -> {
            List list = (List) arrayList.stream().map(future -> {
                return future.failed() ? MqttQoS.FAILURE : MqttQoS.AT_MOST_ONCE;
            }).collect(Collectors.toList());
            if (mqttEndpoint.isConnected()) {
                mqttEndpoint.subscribeAcknowledge(mqttSubscribeMessage.messageId(), list);
            }
            hashMap.values().forEach(future2 -> {
                if (future2.succeeded() && (future2.result() instanceof CommandSubscription)) {
                    CommandSubscription commandSubscription = (CommandSubscription) future2.result();
                    sendConnectedTtdEvent(commandSubscription.getTenant(), commandSubscription.getDeviceId(), device);
                }
            });
            start.finish();
        });
    }

    protected final void onUnsubscribe(MqttEndpoint mqttEndpoint, Device device, MqttUnsubscribeMessage mqttUnsubscribeMessage) {
        Span start = this.tracer.buildSpan("UNSUBSCRIBE").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).withTag(TracingHelper.TAG_CLIENT_ID.getKey(), mqttEndpoint.clientIdentifier()).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), device != null).start();
        if (device != null) {
            start.setTag("tenant_id", device.getTenantId());
            start.setTag("device_id", device.getDeviceId());
        }
        mqttUnsubscribeMessage.topics().forEach(str -> {
            CommandSubscription fromTopic = CommandSubscription.fromTopic(str, device);
            if (fromTopic == null) {
                start.log(String.format("ignoring unsupported topic filter [%s]", str));
                this.LOG.debug("ignoring unsubscribe request for unsupported topic filter [{}]", str);
                return;
            }
            String tenant = fromTopic.getTenant();
            String deviceId = fromTopic.getDeviceId();
            start.log(String.format("unsubscribing device from topic [filter: %s]", str));
            this.LOG.debug("unsubscribing device [tenant-id: {}, device-id: {}] from topic [{}]", new Object[]{tenant, deviceId, str});
            closeCommandConsumer(tenant, deviceId);
            sendDisconnectedTtdEvent(tenant, deviceId, device);
        });
        if (mqttEndpoint.isConnected()) {
            mqttEndpoint.unsubscribeAcknowledge(mqttUnsubscribeMessage.messageId());
        }
        start.finish();
    }

    private Future<MessageConsumer> createCommandConsumer(MqttEndpoint mqttEndpoint, CommandSubscription commandSubscription) {
        AtomicReference atomicReference = new AtomicReference();
        return createCommandConsumer(commandSubscription.getTenant(), commandSubscription.getDeviceId(), commandContext -> {
            Tags.COMPONENT.set(commandContext.getCurrentSpan(), getTypeName());
            if (commandContext.getCommand().isValid()) {
                onCommandReceived(mqttEndpoint, commandSubscription, commandContext);
            } else {
                commandContext.reject(new ErrorCondition(Constants.AMQP_BAD_REQUEST, "malformed command message"));
                commandContext.flow(1);
            }
        }, r10 -> {
            this.LOG.debug("command receiver link closed remotely for [tenant-id: {}, device-id: {}]", commandSubscription.getTenant(), commandSubscription.getDeviceId());
            closeCommandConsumer(commandSubscription.getTenant(), commandSubscription.getDeviceId());
            close(mqttEndpoint, new Device(commandSubscription.getTenant(), commandSubscription.getDeviceId()));
        }).map(messageConsumer -> {
            atomicReference.set(messageConsumer);
            return messageConsumer;
        });
    }

    private Future<Void> triggerLinkCreation(String str) {
        Future telemetrySender = getTelemetrySender(str);
        Future eventSender = getEventSender(str);
        return CompositeFuture.all(getRegistrationClient(str), telemetrySender, eventSender).map(compositeFuture -> {
            this.LOG.debug("providently opened downstream links [credit telemetry: {}, credit event: {}] for tenant [{}]", new Object[]{Integer.valueOf(((MessageSender) telemetrySender.result()).getCredit()), Integer.valueOf(((MessageSender) eventSender.result()).getCredit()), str});
            return null;
        });
    }

    void handlePublishedMessage(MqttContext mqttContext) {
        Span start = this.tracer.buildSpan("PUBLISH").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), mqttContext.message().topicName()).withTag(TracingHelper.TAG_QOS.getKey(), mqttContext.message().qosLevel().toString()).withTag(Tags.COMPONENT.getKey(), getTypeName()).withTag(TracingHelper.TAG_CLIENT_ID.getKey(), mqttContext.deviceEndpoint().clientIdentifier()).start();
        mqttContext.put(KEY_CURRENT_SPAN, start);
        checkTopic(mqttContext).compose(r5 -> {
            return onPublishedMessage(mqttContext);
        }).setHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                Tags.HTTP_STATUS.set(start, 202);
                onMessageSent(mqttContext);
            } else {
                if (asyncResult.cause() instanceof ServiceInvocationException) {
                    Tags.HTTP_STATUS.set(start, Integer.valueOf(asyncResult.cause().getErrorCode()));
                } else {
                    Tags.HTTP_STATUS.set(start, 500);
                }
                if (!(asyncResult.cause() instanceof ClientErrorException)) {
                    this.metrics.incrementUndeliverableMessages(mqttContext.endpoint(), mqttContext.tenant());
                    onMessageUndeliverable(mqttContext);
                }
            }
            start.finish();
        });
    }

    private Future<Void> checkTopic(MqttContext mqttContext) {
        return mqttContext.topic() == null ? Future.failedFuture(new ClientErrorException(400, "malformed topic name")) : Future.succeededFuture();
    }

    public final Future<Void> uploadMessage(MqttContext mqttContext, ResourceIdentifier resourceIdentifier, MqttPublishMessage mqttPublishMessage) {
        Objects.requireNonNull(mqttContext);
        Objects.requireNonNull(resourceIdentifier);
        Objects.requireNonNull(mqttPublishMessage);
        switch (AnonymousClass1.$SwitchMap$org$eclipse$hono$util$EndpointType[EndpointType.fromString(resourceIdentifier.getEndpoint()).ordinal()]) {
            case 1:
                return uploadTelemetryMessage(mqttContext, resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), mqttPublishMessage.payload());
            case 2:
                return uploadEventMessage(mqttContext, resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), mqttPublishMessage.payload());
            case 3:
                return uploadCommandResponseMessage(mqttContext, resourceIdentifier);
            default:
                return Future.failedFuture(new ClientErrorException(400, "unsupported endpoint"));
        }
    }

    public final Future<Void> uploadTelemetryMessage(MqttContext mqttContext, String str, String str2, Buffer buffer) {
        return uploadMessage((MqttContext) Objects.requireNonNull(mqttContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), (Buffer) Objects.requireNonNull(buffer), getTelemetrySender(str), "telemetry").map(r9 -> {
            this.metrics.incrementProcessedMessages("telemetry", str);
            this.metrics.incrementProcessedPayload("telemetry", str, messagePayloadSize(mqttContext.message()));
            return (Void) null;
        });
    }

    public final Future<Void> uploadEventMessage(MqttContext mqttContext, String str, String str2, Buffer buffer) {
        return uploadMessage((MqttContext) Objects.requireNonNull(mqttContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), (Buffer) Objects.requireNonNull(buffer), getEventSender(str), "event").map(r9 -> {
            this.metrics.incrementProcessedMessages("event", str);
            this.metrics.incrementProcessedPayload("event", str, messagePayloadSize(mqttContext.message()));
            return (Void) null;
        });
    }

    public final Future<Void> uploadCommandResponseMessage(MqttContext mqttContext, ResourceIdentifier resourceIdentifier) {
        Objects.requireNonNull(mqttContext);
        Objects.requireNonNull(resourceIdentifier);
        String[] resourcePath = resourceIdentifier.getResourcePath();
        if (resourcePath.length <= 5) {
            return Future.failedFuture(new ClientErrorException(400, "malformed topic name"));
        }
        try {
            Integer valueOf = Integer.valueOf(Integer.parseInt(resourcePath[5]));
            String str = resourcePath[4];
            CommandResponse from = CommandResponse.from(str, resourceIdentifier.getResourceId(), mqttContext.message().payload(), mqttContext.contentType(), valueOf);
            if (from == null) {
                return Future.failedFuture(new ClientErrorException(400, "malformed topic name"));
            }
            Span start = this.tracer.buildSpan("upload Command response").asChildOf(getCurrentSpan(mqttContext)).ignoreActiveSpan().withTag(Tags.COMPONENT.getKey(), getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag("tenant_id", resourceIdentifier.getTenantId()).withTag("device_id", resourceIdentifier.getResourceId()).withTag("hono-cmd-status", valueOf).withTag("hono-cmd-req-id", str).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), mqttContext.authenticatedDevice() != null).start();
            return sendCommandResponse(resourceIdentifier.getTenantId(), from, start.context()).map(protonDelivery -> {
                this.LOG.trace("successfully forwarded command response from device [tenant-id: {}, device-id: {}]", resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId());
                this.metrics.incrementCommandResponseDeliveredToApplication(resourceIdentifier.getTenantId());
                if (mqttContext.deviceEndpoint().isConnected() && mqttContext.message().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
                    mqttContext.deviceEndpoint().publishAcknowledge(mqttContext.message().messageId());
                }
                start.finish();
                return (Void) null;
            }).recover(th -> {
                TracingHelper.logError(start, th);
                start.finish();
                return Future.failedFuture(th);
            });
        } catch (NumberFormatException e) {
            return Future.failedFuture(new ClientErrorException(400, "invalid status code"));
        }
    }

    private Future<Void> uploadMessage(MqttContext mqttContext, String str, String str2, Buffer buffer, Future<? extends MessageSender> future, String str3) {
        if (!isPayloadOfIndicatedType(buffer, mqttContext.contentType())) {
            return Future.failedFuture(new ClientErrorException(400, String.format("Content-Type %s does not match payload", mqttContext.contentType())));
        }
        Span start = this.tracer.buildSpan("upload " + str3).asChildOf(getCurrentSpan(mqttContext)).ignoreActiveSpan().withTag(Tags.COMPONENT.getKey(), getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag("tenant_id", str).withTag("device_id", str2).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), mqttContext.authenticatedDevice() != null).start();
        Future registrationAssertion = getRegistrationAssertion(str, str2, mqttContext.authenticatedDevice(), start.context());
        Future tenantConfiguration = getTenantConfiguration(str, start.context());
        return CompositeFuture.all(registrationAssertion, tenantConfiguration, future).compose(compositeFuture -> {
            if (!((TenantObject) tenantConfiguration.result()).isAdapterEnabled(getTypeName())) {
                return Future.failedFuture(new ClientErrorException(403, "adapter is not enabled for tenant"));
            }
            MessageSender messageSender = (MessageSender) future.result();
            Message newMessage = newMessage(ResourceIdentifier.from(str3, str, str2), messageSender.isRegistrationAssertionRequired(), mqttContext.message().topicName(), mqttContext.contentType(), buffer, (JsonObject) registrationAssertion.result(), null);
            addRetainAnnotation(mqttContext, newMessage, start);
            customizeDownstreamMessage(newMessage, mqttContext);
            return mqttContext.message().qosLevel() == MqttQoS.AT_LEAST_ONCE ? messageSender.sendAndWaitForOutcome(newMessage, start.context()) : messageSender.send(newMessage, start.context());
        }).compose(protonDelivery -> {
            this.LOG.trace("successfully processed message [topic: {}, QoS: {}] from device [tenantId: {}, deviceId: {}]", new Object[]{mqttContext.message().topicName(), mqttContext.message().qosLevel(), str, str2});
            if (mqttContext.deviceEndpoint().isConnected() && mqttContext.message().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
                start.log("sending PUBACK");
                mqttContext.deviceEndpoint().publishAcknowledge(mqttContext.message().messageId());
            }
            start.finish();
            return Future.succeededFuture();
        }).recover(th -> {
            if (ClientErrorException.class.isInstance(th)) {
                ClientErrorException clientErrorException = (ClientErrorException) th;
                this.LOG.debug("cannot process message [endpoint: {}] from device [tenantId: {}, deviceId: {}]: {} - {}", new Object[]{str3, str, str2, Integer.valueOf(clientErrorException.getErrorCode()), clientErrorException.getMessage()});
            } else {
                this.LOG.debug("cannot process message [endpoint: {}] from device [tenantId: {}, deviceId: {}]", new Object[]{str3, str, str2, th});
            }
            TracingHelper.logError(start, th);
            start.finish();
            return Future.failedFuture(th);
        });
    }

    protected long messagePayloadSize(MqttPublishMessage mqttPublishMessage) {
        if (mqttPublishMessage == null || mqttPublishMessage.payload() == null) {
            return 0L;
        }
        return mqttPublishMessage.payload().length();
    }

    protected final void close(MqttEndpoint mqttEndpoint, Device device) {
        onClose(mqttEndpoint);
        sendDisconnectedEvent(mqttEndpoint.clientIdentifier(), device);
        if (device == null) {
            this.LOG.debug("connection to anonymous device [clientId: {}] closed", mqttEndpoint.clientIdentifier());
            this.metrics.decrementUnauthenticatedConnections();
        } else {
            this.LOG.debug("connection to device [tenant-id: {}, device-id: {}] closed", device.getTenantId(), device.getDeviceId());
            this.metrics.decrementConnections(device.getTenantId());
        }
        if (!mqttEndpoint.isConnected()) {
            this.LOG.trace("client has already closed connection");
        } else {
            this.LOG.debug("closing connection with client [client ID: {}]", mqttEndpoint.clientIdentifier());
            mqttEndpoint.close();
        }
    }

    protected final Span getCurrentSpan(ExecutionContext executionContext) {
        if (executionContext == null) {
            return null;
        }
        return (Span) executionContext.get(KEY_CURRENT_SPAN);
    }

    protected final void setCurrentSpan(ExecutionContext executionContext, Span span) {
        Objects.requireNonNull(executionContext);
        if (span != null) {
            executionContext.put(KEY_CURRENT_SPAN, span);
        }
    }

    protected void onClose(MqttEndpoint mqttEndpoint) {
    }

    protected DeviceCredentials getCredentials(MqttAuth mqttAuth) {
        if (mqttAuth.userName() == null || mqttAuth.password() == null) {
            return null;
        }
        return UsernamePasswordCredentials.create(mqttAuth.userName(), mqttAuth.password(), ((ProtocolAdapterProperties) getConfig()).isSingleTenant());
    }

    protected abstract Future<Void> onPublishedMessage(MqttContext mqttContext);

    protected void customizeDownstreamMessage(Message message, MqttContext mqttContext) {
    }

    protected void onMessageSent(MqttContext mqttContext) {
    }

    protected void onMessageUndeliverable(MqttContext mqttContext) {
    }

    protected final void onCommandReceived(MqttEndpoint mqttEndpoint, CommandSubscription commandSubscription, CommandContext commandContext) {
        Objects.requireNonNull(mqttEndpoint);
        Objects.requireNonNull(commandSubscription);
        Objects.requireNonNull(commandContext);
        MqttQoS mqttQoS = MqttQoS.AT_MOST_ONCE;
        String tenant = commandSubscription.getTenant();
        String deviceId = commandSubscription.getDeviceId();
        if (commandSubscription.isAuthenticated()) {
            deviceId = "";
            tenant = "";
        }
        TracingHelper.TAG_CLIENT_ID.set(commandContext.getCurrentSpan(), mqttEndpoint.clientIdentifier());
        Command command = commandContext.getCommand();
        String format = String.format("%s/%s/%s/%s/%s/%s", commandSubscription.getEndpoint(), tenant, deviceId, commandSubscription.getRequestPart(), command.getRequestId(), command.getName());
        mqttEndpoint.publish(format, command.getPayload(), mqttQoS, false, false);
        this.metrics.incrementCommandDeliveredToDevice(commandSubscription.getTenant());
        commandContext.accept();
        commandContext.flow(1);
        this.LOG.trace("command published to device [tenant-id: {}, device-id: {}, MQTT client-id: {}]", new Object[]{commandSubscription.getTenant(), commandSubscription.getDeviceId(), mqttEndpoint.clientIdentifier()});
        HashMap hashMap = new HashMap(3);
        hashMap.put("event", "command published to device");
        hashMap.put(Tags.MESSAGE_BUS_DESTINATION.getKey(), format);
        hashMap.put(TracingHelper.TAG_QOS.getKey(), mqttQoS.toString());
        commandContext.getCurrentSpan().log(hashMap);
    }

    private static void addRetainAnnotation(MqttContext mqttContext, Message message, Span span) {
        if (mqttContext.message().isRetain()) {
            span.log("device wants to retain message");
            MessageHelper.addAnnotation(message, "x-opt-retain", Boolean.TRUE);
        }
    }
}
