package org.eclipse.hono.adapter.amqp;

import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.ProtonServerOptions;
import io.vertx.proton.ProtonSession;
import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.Source;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.Command;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.CommandResponse;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.EndpointType;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TenantObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapter.class */
public final class VertxBasedAmqpProtocolAdapter extends AbstractProtocolAdapterBase<ProtocolAdapterProperties> {
    private static final Logger LOG = LoggerFactory.getLogger(VertxBasedAmqpProtocolAdapter.class);
    private static final int DEFAULT_MAX_FRAME_SIZE = 32768;
    private static final int DEFAULT_MAX_SESSION_WINDOW = 3276800;
    private static final long DEFAULT_COMMAND_CONSUMER_CHECK_INTERVAL_MILLIS = 10000;
    private ProtonServer secureServer;
    private ProtonServer insecureServer;
    private ProtonSaslAuthenticatorFactory authenticatorFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.eclipse.hono.adapter.amqp.VertxBasedAmqpProtocolAdapter$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$hono$util$EndpointType = new int[EndpointType.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$hono$util$EndpointType[EndpointType.TELEMETRY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$hono$util$EndpointType[EndpointType.EVENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$eclipse$hono$util$EndpointType[EndpointType.CONTROL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    protected String getTypeName() {
        return "hono-amqp";
    }

    protected void doStart(Future<Void> future) {
        checkPortConfiguration().compose(r9 -> {
            if (this.authenticatorFactory == null && ((ProtocolAdapterProperties) getConfig()).isAuthenticationRequired()) {
                this.authenticatorFactory = new AmqpAdapterSaslAuthenticatorFactory(getTenantServiceClient(), getCredentialsServiceClient(), (ProtocolAdapterProperties) getConfig(), () -> {
                    return this.tracer.buildSpan("open connection").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).start();
                });
            }
            return Future.succeededFuture();
        }).compose(obj -> {
            return CompositeFuture.all(bindSecureServer(), bindInsecureServer());
        }).compose(compositeFuture -> {
            future.complete();
        }, future);
    }

    protected void doStop(Future<Void> future) {
        CompositeFuture.all(stopSecureServer(), stopInsecureServer()).compose(compositeFuture -> {
            future.complete();
        }, future);
    }

    private Future<Void> stopInsecureServer() {
        Future<Void> future = Future.future();
        if (this.insecureServer != null) {
            LOG.info("Shutting down insecure server");
            this.insecureServer.close(future.completer());
        } else {
            future.complete();
        }
        return future;
    }

    private Future<Void> stopSecureServer() {
        Future<Void> future = Future.future();
        if (this.secureServer != null) {
            LOG.info("Shutting down secure server");
            this.secureServer.close(future.completer());
        } else {
            future.complete();
        }
        return future;
    }

    private Future<Void> bindInsecureServer() {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        ProtonServerOptions port = new ProtonServerOptions().setHost(((ProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress()).setPort(determineInsecurePort());
        Future<Void> future = Future.future();
        this.insecureServer = createServer(this.insecureServer, port);
        this.insecureServer.connectHandler(this::onConnectRequest).listen(asyncResult -> {
            if (!asyncResult.succeeded()) {
                future.fail(asyncResult.cause());
            } else {
                LOG.info("insecure AMQP server listening on [{}:{}]", ((ProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress(), Integer.valueOf(getActualInsecurePort()));
                future.complete();
            }
        });
        return future;
    }

    private Future<Void> bindSecureServer() {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        ProtonServerOptions maxFrameSize = new ProtonServerOptions().setHost(((ProtocolAdapterProperties) getConfig()).getBindAddress()).setPort(determineSecurePort()).setMaxFrameSize(DEFAULT_MAX_FRAME_SIZE);
        addTlsKeyCertOptions(maxFrameSize);
        addTlsTrustOptions(maxFrameSize);
        Future<Void> future = Future.future();
        this.secureServer = createServer(this.secureServer, maxFrameSize);
        this.secureServer.connectHandler(this::onConnectRequest).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("secure AMQP server listening on {}:{}", ((ProtocolAdapterProperties) getConfig()).getBindAddress(), Integer.valueOf(getActualPort()));
                future.complete();
            } else {
                LOG.error("cannot bind to secure port", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    private ProtonServer createServer(ProtonServer protonServer, ProtonServerOptions protonServerOptions) {
        ProtonServer create = protonServer != null ? protonServer : ProtonServer.create(this.vertx, protonServerOptions);
        if (((ProtocolAdapterProperties) getConfig()).isAuthenticationRequired()) {
            create.saslAuthenticatorFactory(this.authenticatorFactory);
        } else {
            create.saslAuthenticatorFactory((ProtonSaslAuthenticatorFactory) null);
        }
        return create;
    }

    private void onConnectRequest(ProtonConnection protonConnection) {
        Span span = (Span) Optional.ofNullable((Span) protonConnection.attachments().get(AmqpAdapterConstants.KEY_CURRENT_SPAN, Span.class)).orElse(this.tracer.buildSpan("open connection").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).start());
        Device device = (Device) protonConnection.attachments().get(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class);
        TracingHelper.TAG_AUTHENTICATED.set(span, Boolean.valueOf(device != null));
        Future future = Future.future();
        if (!((ProtocolAdapterProperties) getConfig()).isAuthenticationRequired()) {
            LOG.trace("received connection request from anonymous device [container: {}]", protonConnection.getRemoteContainer());
            future.complete();
        } else if (device == null) {
            future.fail(new ClientErrorException(401));
        } else {
            LOG.trace("received connection request from {}", device);
            checkDeviceRegistration(device, span.context()).map(r7 -> {
                LOG.debug("device [tenant-id: {}, device-id: {}] is registered and enabled", device.getTenantId(), device.getDeviceId());
                span.log("device is registered and enabled");
                return (Void) null;
            }).setHandler(future);
        }
        future.map(r8 -> {
            protonConnection.setContainer(getTypeName());
            setConnectionHandlers(protonConnection);
            protonConnection.open();
            if (device != null) {
                span.setTag("tenant_id", device.getTenantId());
                span.setTag("device_id", device.getDeviceId());
            }
            span.log("connection accepted");
            return null;
        }).otherwise(th -> {
            protonConnection.setCondition(AmqpContext.getErrorCondition(th));
            protonConnection.close();
            TracingHelper.logError(span, th);
            return null;
        }).setHandler(asyncResult -> {
            span.finish();
        });
    }

    private void setConnectionHandlers(ProtonConnection protonConnection) {
        protonConnection.disconnectHandler(protonConnection2 -> {
            LOG.debug("lost connection to device [container: {}]", protonConnection.getRemoteContainer());
            Optional.ofNullable(getConnectionLossHandler(protonConnection)).ifPresent(handler -> {
                handler.handle((Object) null);
            });
        });
        protonConnection.closeHandler(asyncResult -> {
            handleRemoteConnectionClose(protonConnection, asyncResult);
            Optional.ofNullable(getConnectionLossHandler(protonConnection)).ifPresent(handler -> {
                handler.handle((Object) null);
            });
        });
        protonConnection.sessionOpenHandler(protonSession -> {
            HonoProtonHelper.setDefaultCloseHandler(protonSession);
            handleSessionOpen(protonConnection, protonSession);
        });
        protonConnection.receiverOpenHandler(protonReceiver -> {
            HonoProtonHelper.setDefaultCloseHandler(protonReceiver);
            handleRemoteReceiverOpen(protonConnection, protonReceiver);
        });
        protonConnection.senderOpenHandler(protonSender -> {
            handleRemoteSenderOpenForCommands(protonConnection, protonSender);
        });
    }

    protected void setInsecureAmqpServer(ProtonServer protonServer) {
        Objects.requireNonNull(protonServer);
        if (protonServer.actualPort() > 0) {
            throw new IllegalArgumentException("AMQP Server should not be running");
        }
        this.insecureServer = protonServer;
    }

    protected void setSaslAuthenticatorFactory(ProtonSaslAuthenticatorFactory protonSaslAuthenticatorFactory) {
        this.authenticatorFactory = (ProtonSaslAuthenticatorFactory) Objects.requireNonNull(protonSaslAuthenticatorFactory, "authFactory must not be null");
    }

    private void handleSessionOpen(ProtonConnection protonConnection, ProtonSession protonSession) {
        LOG.debug("opening new session with client [container: {}]", protonConnection.getRemoteContainer());
        protonSession.setIncomingCapacity(DEFAULT_MAX_SESSION_WINDOW);
        protonSession.open();
    }

    private void handleRemoteConnectionClose(ProtonConnection protonConnection, AsyncResult<ProtonConnection> asyncResult) {
        if (asyncResult.succeeded()) {
            LOG.debug("client [container: {}] closed connection", protonConnection.getRemoteContainer());
        } else {
            LOG.debug("client [container: {}] closed connection with error", protonConnection.getRemoteContainer(), asyncResult.cause());
        }
        protonConnection.disconnectHandler((Handler) null);
        protonConnection.close();
        protonConnection.disconnect();
    }

    protected void handleRemoteReceiverOpen(ProtonConnection protonConnection, ProtonReceiver protonReceiver) {
        Device device = (Device) protonConnection.attachments().get(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class);
        Span start = this.tracer.buildSpan("attach anonymous sender").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), device != null).start();
        if (device != null) {
            start.setTag("tenant_id", device.getTenantId());
            start.setTag("device_id", device.getDeviceId());
        }
        if (ProtonQoS.AT_MOST_ONCE.equals(protonReceiver.getRemoteQoS())) {
            ClientErrorException clientErrorException = new ClientErrorException(400, "link must not use snd-settle-mode 'settled'");
            TracingHelper.logError(start, clientErrorException);
            closeLinkWithError(protonReceiver, clientErrorException);
        } else if (protonReceiver.getRemoteTarget() == null || protonReceiver.getRemoteTarget().getAddress() == null) {
            protonReceiver.setTarget(protonReceiver.getRemoteTarget());
            protonReceiver.setSource(protonReceiver.getRemoteSource());
            protonReceiver.setQoS(protonReceiver.getRemoteQoS());
            protonReceiver.setPrefetch(30);
            protonReceiver.setAutoAccept(false);
            HonoProtonHelper.setCloseHandler(protonReceiver, asyncResult -> {
                onLinkDetach(protonReceiver);
            });
            HonoProtonHelper.setDetachHandler(protonReceiver, asyncResult2 -> {
                onLinkDetach(protonReceiver);
            });
            protonReceiver.handler((protonDelivery, message) -> {
                Span start2 = this.tracer.buildSpan("upload message").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), device != null).start();
                if (device != null) {
                    start2.setTag("tenant_id", device.getTenantId());
                    start2.setTag("device_id", device.getDeviceId());
                }
                HashMap hashMap = new HashMap(1);
                hashMap.put(Tags.MESSAGE_BUS_DESTINATION.getKey(), message.getAddress());
                start2.log(hashMap);
                validateEndpoint(message.getAddress(), protonDelivery).compose(resourceIdentifier -> {
                    return validateAddress(resourceIdentifier, device);
                }).recover(th -> {
                    MessageHelper.rejected(protonDelivery, AmqpContext.getErrorCondition(th));
                    return Future.failedFuture(th);
                }).map(resourceIdentifier2 -> {
                    return createContext(resourceIdentifier2, protonDelivery, message, device);
                }).compose(amqpContext -> {
                    return uploadMessage(amqpContext, start2);
                }).otherwise(th2 -> {
                    TracingHelper.logError(start2, th2);
                    return null;
                }).setHandler(asyncResult3 -> {
                    start2.finish();
                });
            });
            protonReceiver.open();
            if (device == null) {
                LOG.debug("established link for receiving messages from device [container: {}]", protonConnection.getRemoteContainer());
            } else {
                LOG.debug("established link for receiving messages from device [tenant: {}, device-id: {}]]", device.getTenantId(), device.getDeviceId());
            }
            start.log("link established");
        } else {
            if (!protonReceiver.getRemoteTarget().getAddress().isEmpty()) {
                LOG.debug("Closing link due to the present of Target [address : {}]", protonReceiver.getRemoteTarget().getAddress());
            }
            ClientErrorException clientErrorException2 = new ClientErrorException(400, "this adapter supports anonymous relay mode only");
            TracingHelper.logError(start, clientErrorException2);
            closeLinkWithError(protonReceiver, clientErrorException2);
        }
        start.finish();
    }

    private AmqpContext createContext(ResourceIdentifier resourceIdentifier, ProtonDelivery protonDelivery, Message message, Device device) {
        String resourceIdentifier2 = resourceIdentifier.toString();
        if (!resourceIdentifier2.equals(message.getAddress())) {
            LOG.debug("adjusting message's address [orig: {}, updated: {}]", message.getAddress(), resourceIdentifier2);
            message.setAddress(resourceIdentifier2);
        }
        return new AmqpContext(protonDelivery, message, device);
    }

    protected void handleRemoteSenderOpenForCommands(ProtonConnection protonConnection, ProtonSender protonSender) {
        Device device = (Device) protonConnection.attachments().get(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class);
        Span start = this.tracer.buildSpan("attach Command receiver").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), device != null).start();
        if (device != null) {
            start.setTag("tenant_id", device.getTenantId());
            start.setTag("device_id", device.getDeviceId());
        }
        getResourceIdentifier(protonSender.getRemoteSource()).compose(resourceIdentifier -> {
            return validateAddress(resourceIdentifier, device);
        }).map(resourceIdentifier2 -> {
            if (CommandConstants.isCommandEndpoint(resourceIdentifier2.getEndpoint())) {
                return openCommandSenderLink(protonSender, resourceIdentifier2, device, start).map(messageConsumer -> {
                    addConnectionLossHandler(protonConnection, r10 -> {
                        sendDisconnectedTtdEvent(resourceIdentifier2.getTenantId(), resourceIdentifier2.getResourceId(), device, null).setHandler(asyncResult -> {
                            messageConsumer.close((Handler) null);
                        });
                    });
                    return messageConsumer;
                });
            }
            throw new ClientErrorException(404, "no such node");
        }).map(future -> {
            start.log("link established");
            return future;
        }).otherwise(th -> {
            TracingHelper.logError(start, th);
            if (th instanceof ServiceInvocationException) {
                closeLinkWithError(protonSender, th);
                return null;
            }
            closeLinkWithError(protonSender, new ClientErrorException(400, "Invalid source address"));
            return null;
        }).setHandler(asyncResult -> {
            start.finish();
        });
    }

    private Future<MessageConsumer> openCommandSenderLink(ProtonSender protonSender, ResourceIdentifier resourceIdentifier, Device device, Span span) {
        return createCommandConsumer(protonSender, resourceIdentifier).map(messageConsumer -> {
            String tenantId = resourceIdentifier.getTenantId();
            String resourceId = resourceIdentifier.getResourceId();
            protonSender.setSource(protonSender.getRemoteSource());
            protonSender.setTarget(protonSender.getRemoteTarget());
            protonSender.setQoS(ProtonQoS.AT_LEAST_ONCE);
            Handler handler = asyncResult -> {
                sendDisconnectedTtdEvent(tenantId, resourceId, device, null);
                messageConsumer.close((Handler) null);
                onLinkDetach(protonSender);
            };
            HonoProtonHelper.setCloseHandler(protonSender, handler);
            HonoProtonHelper.setDetachHandler(protonSender, handler);
            protonSender.open();
            LOG.debug("established link [address: {}] for sending commands to device", resourceIdentifier);
            sendConnectedTtdEvent(tenantId, resourceId, device, span.context());
            return messageConsumer;
        }).otherwise(th -> {
            throw new ServerErrorException(503, "cannot create command consumer");
        });
    }

    private Future<MessageConsumer> createCommandConsumer(ProtonSender protonSender, ResourceIdentifier resourceIdentifier) {
        return getCommandConnection().createCommandConsumer(resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), commandContext -> {
            Tags.COMPONENT.set(commandContext.getCurrentSpan(), getTypeName());
            if (!commandContext.getCommand().isValid()) {
                commandContext.reject(AmqpContext.getErrorCondition(new ClientErrorException(400, "malformed command message")), 1);
            } else if (protonSender.isOpen()) {
                onCommandReceived(protonSender, commandContext);
            } else {
                commandContext.release(1);
            }
        }, r1 -> {
        }, DEFAULT_COMMAND_CONSUMER_CHECK_INTERVAL_MILLIS);
    }

    protected void onCommandReceived(ProtonSender protonSender, CommandContext commandContext) {
        Objects.requireNonNull(protonSender);
        Objects.requireNonNull(commandContext);
        Command command = commandContext.getCommand();
        Message commandMessage = command.getCommandMessage();
        if (commandMessage.getCorrelationId() == null) {
            commandMessage.setCorrelationId(commandMessage.getMessageId());
        }
        protonSender.send(commandMessage, protonDelivery -> {
            Rejected remoteState = protonDelivery.getRemoteState();
            HashMap hashMap = new HashMap(2);
            if (!protonDelivery.remotelySettled()) {
                LOG.debug("device did not settle command message [command: {}, remote state: {}]", command.getName(), remoteState.getClass().getSimpleName());
                hashMap.put("event", "device did not settle command");
                hashMap.put("remote state", remoteState.getClass().getSimpleName());
                commandContext.getCurrentSpan().log(hashMap);
                commandContext.release(1);
                return;
            }
            if (Accepted.class.isInstance(remoteState)) {
                LOG.trace("device accepted command message [command: {}]", command.getName());
                hashMap.put("event", "device accepted command");
                commandContext.getCurrentSpan().log(hashMap);
                commandContext.accept(1);
                return;
            }
            if (Rejected.class.isInstance(remoteState)) {
                ErrorCondition error = remoteState.getError();
                LOG.debug("device rejected command message [command: {}, error: {}]", command.getName(), error);
                hashMap.put("event", "device rejected command");
                commandContext.getCurrentSpan().log(hashMap);
                commandContext.reject(error, 1);
                return;
            }
            if (Released.class.isInstance(remoteState)) {
                LOG.debug("device released command message [command: {}]", command.getName());
                hashMap.put("event", "device released command");
                commandContext.getCurrentSpan().log(hashMap);
                commandContext.release(1);
            }
        });
        HashMap hashMap = new HashMap(4);
        hashMap.put("event", "command sent to device");
        if (protonSender.getRemoteTarget() != null) {
            hashMap.put(Tags.MESSAGE_BUS_DESTINATION.getKey(), protonSender.getRemoteTarget().getAddress());
        }
        hashMap.put(TracingHelper.TAG_QOS.getKey(), protonSender.getQoS().name());
        hashMap.put(TracingHelper.TAG_CREDIT.getKey(), Integer.valueOf(protonSender.getCredit()));
        commandContext.getCurrentSpan().log(hashMap);
    }

    private <T extends ProtonLink<T>> void closeLinkWithError(ProtonLink<T> protonLink, Throwable th) {
        protonLink.setCondition(AmqpContext.getErrorCondition(th));
        protonLink.close();
    }

    protected Future<Void> uploadMessage(AmqpContext amqpContext, Span span) {
        Future future = Future.future();
        if (isPayloadOfIndicatedType(amqpContext.getMessagePayload(), amqpContext.getMessageContentType())) {
            future.complete();
        } else {
            future.fail(new ClientErrorException(400, "empty notifications must not contain payload"));
        }
        return future.compose(r8 -> {
            switch (AnonymousClass1.$SwitchMap$org$eclipse$hono$util$EndpointType[EndpointType.fromString(amqpContext.getEndpoint()).ordinal()]) {
                case 1:
                    LOG.trace("forwarding telemetry data");
                    return doUploadMessage(amqpContext, getTelemetrySender(amqpContext.getTenantId()), span);
                case 2:
                    LOG.trace("forwarding event");
                    return doUploadMessage(amqpContext, getEventSender(amqpContext.getTenantId()), span);
                case 3:
                    LOG.trace("forwarding command response");
                    return doUploadCommandResponseMessage(amqpContext, span);
                default:
                    return Future.failedFuture(new ClientErrorException(404, "unknown endpoint"));
            }
        }).map(protonDelivery -> {
            amqpContext.accept();
            return (Void) null;
        }).recover(th -> {
            amqpContext.handleFailure(th);
            return Future.failedFuture(th);
        });
    }

    private Future<ProtonDelivery> doUploadMessage(AmqpContext amqpContext, Future<MessageSender> future, Span span) {
        Future registrationAssertion = getRegistrationAssertion(amqpContext.getTenantId(), amqpContext.getDeviceId(), amqpContext.getAuthenticatedDevice(), span.context());
        Future tenantConfiguration = getTenantConfiguration(amqpContext.getTenantId(), span.context());
        return CompositeFuture.all(tenantConfiguration, registrationAssertion, future).compose(compositeFuture -> {
            if (!((TenantObject) tenantConfiguration.result()).isAdapterEnabled(getTypeName())) {
                return Future.failedFuture(new ClientErrorException(403, String.format("This adapter is not enabled for tenant [tenantId: %s].", amqpContext.getTenantId())));
            }
            MessageSender messageSender = (MessageSender) future.result();
            Message newMessage = newMessage(amqpContext.getResourceIdentifier(), messageSender.isRegistrationAssertionRequired(), amqpContext.getEndpoint(), amqpContext.getMessageContentType(), amqpContext.getMessagePayload(), (JsonObject) registrationAssertion.result(), null);
            return amqpContext.isRemotelySettled() ? messageSender.send(newMessage, span.context()) : messageSender.sendAndWaitForOutcome(newMessage, span.context());
        }).recover(th -> {
            LOG.debug("cannot process {} message from device [tenant: {}, device-id: {}]", new Object[]{amqpContext.getEndpoint(), amqpContext.getTenantId(), amqpContext.getDeviceId(), th});
            return Future.failedFuture(th);
        });
    }

    private Future<ProtonDelivery> doUploadCommandResponseMessage(AmqpContext amqpContext, Span span) {
        String str = (String) Optional.ofNullable(amqpContext.getMessage().getCorrelationId()).map(obj -> {
            if (obj instanceof String) {
                return (String) obj;
            }
            return null;
        }).orElse(null);
        Integer num = (Integer) MessageHelper.getApplicationProperty(amqpContext.getMessage().getApplicationProperties(), "status", Integer.class);
        HashMap hashMap = new HashMap(2);
        hashMap.put("status", num);
        hashMap.put(TracingHelper.TAG_CORRELATION_ID.getKey(), str);
        span.log(hashMap);
        LOG.debug("creating command response [status: {}, correlation-id: {}, reply-to: {}]", new Object[]{num, str, amqpContext.getResourceIdentifier().toString()});
        CommandResponse from = CommandResponse.from(amqpContext.getMessagePayload(), amqpContext.getMessageContentType(), num, str, amqpContext.getResourceIdentifier());
        return from == null ? Future.failedFuture(new ClientErrorException(400, "malformed command response message")) : sendCommandResponse(amqpContext.getTenantId(), from, span.context()).map(protonDelivery -> {
            LOG.trace("forwarded command response from device [tenant: {}, device-id: {}]", amqpContext.getTenantId(), amqpContext.getDeviceId());
            return protonDelivery;
        }).recover(th -> {
            LOG.debug("cannot process command response from device [tenant: {}, device-id: {}]", new Object[]{amqpContext.getTenantId(), amqpContext.getDeviceId(), th});
            return Future.failedFuture(th);
        });
    }

    private <T extends ProtonLink<T>> void onLinkDetach(ProtonLink<T> protonLink) {
        LOG.debug("closing link [{}]", protonLink.getName());
        protonLink.close();
    }

    Future<ResourceIdentifier> validateEndpoint(String str, ProtonDelivery protonDelivery) {
        return getResourceIdentifier(str).map(resourceIdentifier -> {
            switch (AnonymousClass1.$SwitchMap$org$eclipse$hono$util$EndpointType[EndpointType.fromString(resourceIdentifier.getEndpoint()).ordinal()]) {
                case 1:
                    return resourceIdentifier;
                case 2:
                    if (protonDelivery.remotelySettled()) {
                        throw new ClientErrorException(400, "event endpoint accepts unsettled messages only");
                    }
                    return resourceIdentifier;
                case 3:
                    return resourceIdentifier;
                default:
                    LOG.debug("device wants to send message for unsupported endpoint [{}]", resourceIdentifier.getEndpoint());
                    throw new ClientErrorException(404, "unsupported endpoint");
            }
        });
    }

    private Future<ResourceIdentifier> validateAddress(ResourceIdentifier resourceIdentifier, Device device) {
        Future<ResourceIdentifier> future = Future.future();
        if (device == null) {
            if (resourceIdentifier.getTenantId() == null || resourceIdentifier.getResourceId() == null) {
                future.fail(new ClientErrorException(403, "unauthenticated clients must provide tenant and device ID in message's address"));
            } else {
                future.complete(resourceIdentifier);
            }
        } else if (resourceIdentifier.getTenantId() != null && resourceIdentifier.getResourceId() == null) {
            future.fail(new ClientErrorException(400, "message's address must not contain tenant ID only"));
        } else if (resourceIdentifier.getTenantId() == null && resourceIdentifier.getResourceId() == null) {
            future.complete(ResourceIdentifier.from(resourceIdentifier.getEndpoint(), device.getTenantId(), device.getDeviceId()));
        } else {
            future.complete(resourceIdentifier);
        }
        return future;
    }

    private static Future<ResourceIdentifier> getResourceIdentifier(Source source) {
        return source == null ? Future.failedFuture(new ClientErrorException(404, "no such node")) : getResourceIdentifier(source.getAddress());
    }

    private static Future<ResourceIdentifier> getResourceIdentifier(String str) {
        Future<ResourceIdentifier> future = Future.future();
        try {
            if (Strings.isNullOrEmpty(str)) {
                future.fail(new ClientErrorException(404, "no such node"));
            } else {
                future.complete(ResourceIdentifier.fromString(str));
            }
        } catch (Throwable th) {
            future.fail(th);
        }
        return future;
    }

    private static void addConnectionLossHandler(ProtonConnection protonConnection, Handler<Void> handler) {
        protonConnection.attachments().set("connectionLossHandler", Handler.class, handler);
    }

    private static Handler<Void> getConnectionLossHandler(ProtonConnection protonConnection) {
        return (Handler) protonConnection.attachments().get("connectionLossHandler", Handler.class);
    }

    public int getPortDefaultValue() {
        return 5671;
    }

    public int getInsecurePortDefaultValue() {
        return 5672;
    }

    protected int getActualPort() {
        if (this.secureServer != null) {
            return this.secureServer.actualPort();
        }
        return -1;
    }

    protected int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }
}
