package org.eclipse.hono.adapter.amqp.impl;

import io.micrometer.core.instrument.Timer;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.ProtonServerOptions;
import io.vertx.proton.ProtonSession;
import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.Source;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.adapter.amqp.AmqpAdapterConstants;
import org.eclipse.hono.adapter.amqp.AmqpAdapterMetrics;
import org.eclipse.hono.adapter.amqp.AmqpAdapterProperties;
import org.eclipse.hono.adapter.amqp.AmqpAdapterSaslAuthenticatorFactory;
import org.eclipse.hono.adapter.amqp.SaslResponseContext;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.Command;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.CommandResponse;
import org.eclipse.hono.client.DownstreamSender;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.auth.device.UsernamePasswordAuthProvider;
import org.eclipse.hono.service.auth.device.X509AuthProvider;
import org.eclipse.hono.service.limiting.ConnectionLimitManager;
import org.eclipse.hono.service.limiting.DefaultConnectionLimitManager;
import org.eclipse.hono.service.limiting.MemoryBasedConnectionLimitStrategy;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.service.tenant.TenantTraceSamplingHelper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TenantObject;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapter.class */
public final class VertxBasedAmqpProtocolAdapter extends AbstractProtocolAdapterBase<AmqpAdapterProperties> {
    private static final long DEFAULT_COMMAND_CONSUMER_CHECK_INTERVAL_MILLIS = 10000;
    private static final int MINIMAL_MEMORY = 100000000;
    private static final int MEMORY_PER_CONNECTION = 20000;
    private ProtonServer secureServer;
    private ProtonServer insecureServer;
    private ProtonSaslAuthenticatorFactory authenticatorFactory;
    private AmqpAdapterMetrics metrics = AmqpAdapterMetrics.NOOP;
    private AmqpContextTenantAndAuthIdProvider tenantObjectWithAuthIdProvider;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.eclipse.hono.adapter.amqp.impl.VertxBasedAmqpProtocolAdapter$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType = new int[MetricsTags.EndpointType.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.COMMAND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.COMMAND_RESPONSE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.TELEMETRY.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.EVENT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    protected String getTypeName() {
        return "hono-amqp";
    }

    @Autowired
    public void setMetrics(AmqpAdapterMetrics amqpAdapterMetrics) {
        this.metrics = amqpAdapterMetrics;
    }

    protected AmqpAdapterMetrics getMetrics() {
        return this.metrics;
    }

    protected void doStart(Future<Void> future) {
        checkPortConfiguration().compose(r16 -> {
            if (this.tenantObjectWithAuthIdProvider == null) {
                this.tenantObjectWithAuthIdProvider = new AmqpContextTenantAndAuthIdProvider((ProtocolAdapterProperties) getConfig(), getTenantClientFactory());
            }
            if (this.authenticatorFactory == null && ((AmqpAdapterProperties) getConfig()).isAuthenticationRequired()) {
                ConnectionLimitManager connectionLimitManager = (ConnectionLimitManager) Optional.ofNullable(getConnectionLimitManager()).orElse(createConnectionLimitManager());
                setConnectionLimitManager(connectionLimitManager);
                this.authenticatorFactory = new AmqpAdapterSaslAuthenticatorFactory(getTenantClientFactory(), (ProtocolAdapterProperties) getConfig(), () -> {
                    return this.tracer.buildSpan("open connection").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).start();
                }, connectionLimitManager, this::checkConnectionLimit, new UsernamePasswordAuthProvider(getCredentialsClientFactory(), (ServiceConfigProperties) getConfig(), this.tracer), new X509AuthProvider(getCredentialsClientFactory(), (ServiceConfigProperties) getConfig(), this.tracer), (saslResponseContext, span) -> {
                    return applyTenantTraceSamplingPriority(saslResponseContext, span);
                });
            }
            return Future.succeededFuture();
        }).compose(obj -> {
            return CompositeFuture.all(bindSecureServer(), bindInsecureServer());
        }).compose(compositeFuture -> {
            future.complete();
        }, future);
    }

    private ConnectionLimitManager createConnectionLimitManager() {
        return new DefaultConnectionLimitManager(new MemoryBasedConnectionLimitStrategy(100000000L, MEMORY_PER_CONNECTION + ((AmqpAdapterProperties) getConfig()).getMaxSessionWindowSize()), () -> {
            return Integer.valueOf(this.metrics.getNumberOfConnections());
        }, (ProtocolAdapterProperties) getConfig());
    }

    protected void doStop(Future<Void> future) {
        CompositeFuture.all(stopSecureServer(), stopInsecureServer()).compose(compositeFuture -> {
            future.complete();
        }, future);
    }

    private Future<Void> stopInsecureServer() {
        Future<Void> future = Future.future();
        if (this.insecureServer != null) {
            this.log.info("Shutting down insecure server");
            this.insecureServer.close(future);
        } else {
            future.complete();
        }
        return future;
    }

    private Future<Void> stopSecureServer() {
        Future<Void> future = Future.future();
        if (this.secureServer != null) {
            this.log.info("Shutting down secure server");
            this.secureServer.close(future);
        } else {
            future.complete();
        }
        return future;
    }

    private Future<Void> bindInsecureServer() {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        ProtonServerOptions heartbeat = new ProtonServerOptions().setHost(((AmqpAdapterProperties) getConfig()).getInsecurePortBindAddress()).setPort(determineInsecurePort()).setMaxFrameSize(((AmqpAdapterProperties) getConfig()).getMaxFrameSize()).setHeartbeat(((AmqpAdapterProperties) getConfig()).getIdleTimeout() >> 1);
        Future<Void> future = Future.future();
        this.insecureServer = createServer(this.insecureServer, heartbeat);
        this.insecureServer.connectHandler(this::onConnectRequest).listen(asyncResult -> {
            if (!asyncResult.succeeded()) {
                future.fail(asyncResult.cause());
            } else {
                this.log.info("insecure AMQP server listening on [{}:{}]", ((AmqpAdapterProperties) getConfig()).getInsecurePortBindAddress(), Integer.valueOf(getActualInsecurePort()));
                future.complete();
            }
        });
        return future;
    }

    private Future<Void> bindSecureServer() {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        ProtonServerOptions heartbeat = new ProtonServerOptions().setHost(((AmqpAdapterProperties) getConfig()).getBindAddress()).setPort(determineSecurePort()).setMaxFrameSize(((AmqpAdapterProperties) getConfig()).getMaxFrameSize()).setHeartbeat(((AmqpAdapterProperties) getConfig()).getIdleTimeout() >> 1);
        addTlsKeyCertOptions(heartbeat);
        addTlsTrustOptions(heartbeat);
        Future<Void> future = Future.future();
        this.secureServer = createServer(this.secureServer, heartbeat);
        this.secureServer.connectHandler(this::onConnectRequest).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                this.log.info("secure AMQP server listening on {}:{}", ((AmqpAdapterProperties) getConfig()).getBindAddress(), Integer.valueOf(getActualPort()));
                future.complete();
            } else {
                this.log.error("cannot bind to secure port", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    private ProtonServer createServer(ProtonServer protonServer, ProtonServerOptions protonServerOptions) {
        ProtonServer create = protonServer != null ? protonServer : ProtonServer.create(this.vertx, protonServerOptions);
        if (((AmqpAdapterProperties) getConfig()).isAuthenticationRequired()) {
            create.saslAuthenticatorFactory(this.authenticatorFactory);
        } else {
            create.saslAuthenticatorFactory((ProtonSaslAuthenticatorFactory) null);
        }
        return create;
    }

    protected void onConnectRequest(ProtonConnection protonConnection) {
        protonConnection.disconnectHandler(protonConnection2 -> {
            this.log.debug("lost connection to device [container: {}]", protonConnection.getRemoteContainer());
            Optional.ofNullable(getConnectionLossHandler(protonConnection)).ifPresent(handler -> {
                handler.handle((Object) null);
            });
            decrementConnectionCount(protonConnection);
        });
        protonConnection.closeHandler(asyncResult -> {
            handleRemoteConnectionClose(protonConnection, asyncResult);
            Optional.ofNullable(getConnectionLossHandler(protonConnection)).ifPresent(handler -> {
                handler.handle((Object) null);
            });
            decrementConnectionCount(protonConnection);
        });
        protonConnection.sessionOpenHandler(protonSession -> {
            HonoProtonHelper.setDefaultCloseHandler(protonSession);
            handleSessionOpen(protonConnection, protonSession);
        });
        protonConnection.receiverOpenHandler(protonReceiver -> {
            HonoProtonHelper.setDefaultCloseHandler(protonReceiver);
            protonReceiver.setMaxMessageSize(UnsignedLong.valueOf(((AmqpAdapterProperties) getConfig()).getMaxPayloadSize()));
            handleRemoteReceiverOpen(protonConnection, protonReceiver);
        });
        protonConnection.senderOpenHandler(protonSender -> {
            handleRemoteSenderOpenForCommands(protonConnection, protonSender);
        });
        protonConnection.openHandler(asyncResult2 -> {
            if (asyncResult2.failed()) {
                this.log.debug("ignoring device's open frame containing error", asyncResult2.cause());
            } else {
                processRemoteOpen((ProtonConnection) asyncResult2.result());
            }
        });
    }

    private void processRemoteOpen(ProtonConnection protonConnection) {
        Span span = (Span) Optional.ofNullable((Span) protonConnection.attachments().get(AmqpAdapterConstants.KEY_CURRENT_SPAN, Span.class)).orElse(this.tracer.buildSpan("open connection").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).start());
        Device authenticatedDevice = getAuthenticatedDevice(protonConnection);
        TracingHelper.TAG_AUTHENTICATED.set(span, Boolean.valueOf(authenticatedDevice != null));
        if (authenticatedDevice != null) {
            span.setTag("tenant_id", authenticatedDevice.getTenantId());
            span.setTag("device_id", authenticatedDevice.getDeviceId());
        }
        Future future = Future.future();
        if (!((AmqpAdapterProperties) getConfig()).isAuthenticationRequired()) {
            this.log.trace("received connection request from anonymous device [container: {}]", protonConnection.getRemoteContainer());
            future.complete();
        } else if (authenticatedDevice == null) {
            future.fail(new ClientErrorException(401, "anonymous devices not supported"));
        } else {
            this.log.trace("received connection request from {}", authenticatedDevice);
            checkDeviceRegistration(authenticatedDevice, span.context()).map(r7 -> {
                this.log.debug("{} is registered and enabled", authenticatedDevice);
                span.log("device is registered and enabled");
                return r7;
            }).setHandler(future);
        }
        future.compose(r72 -> {
            return sendConnectedEvent((String) Optional.ofNullable(protonConnection.getRemoteContainer()).orElse("unknown"), authenticatedDevice);
        }).map(obj -> {
            protonConnection.setContainer(getTypeName());
            protonConnection.setOfferedCapabilities(new Symbol[]{Constants.CAP_ANONYMOUS_RELAY});
            protonConnection.open();
            this.log.debug("connection with device [container: {}] established", protonConnection.getRemoteContainer());
            span.log("connection established");
            if (authenticatedDevice == null) {
                this.metrics.incrementUnauthenticatedConnections();
                return null;
            }
            this.metrics.incrementConnections(authenticatedDevice.getTenantId());
            return null;
        }).otherwise(th -> {
            protonConnection.setCondition(getErrorCondition(th));
            protonConnection.close();
            TracingHelper.logError(span, th);
            return null;
        }).setHandler(asyncResult -> {
            span.finish();
        });
    }

    protected void setInsecureAmqpServer(ProtonServer protonServer) {
        Objects.requireNonNull(protonServer);
        if (protonServer.actualPort() > 0) {
            throw new IllegalArgumentException("AMQP Server should not be running");
        }
        this.insecureServer = protonServer;
    }

    protected void setSaslAuthenticatorFactory(ProtonSaslAuthenticatorFactory protonSaslAuthenticatorFactory) {
        this.authenticatorFactory = (ProtonSaslAuthenticatorFactory) Objects.requireNonNull(protonSaslAuthenticatorFactory, "authFactory must not be null");
    }

    public void setTenantObjectWithAuthIdProvider(AmqpContextTenantAndAuthIdProvider amqpContextTenantAndAuthIdProvider) {
        this.tenantObjectWithAuthIdProvider = (AmqpContextTenantAndAuthIdProvider) Objects.requireNonNull(amqpContextTenantAndAuthIdProvider);
    }

    private void handleSessionOpen(ProtonConnection protonConnection, ProtonSession protonSession) {
        this.log.debug("opening new session with client [container: {}, session window size: {}]", protonConnection.getRemoteContainer(), Integer.valueOf(((AmqpAdapterProperties) getConfig()).getMaxSessionWindowSize()));
        protonSession.setIncomingCapacity(((AmqpAdapterProperties) getConfig()).getMaxSessionWindowSize());
        protonSession.open();
    }

    private void handleRemoteConnectionClose(ProtonConnection protonConnection, AsyncResult<ProtonConnection> asyncResult) {
        if (asyncResult.succeeded()) {
            this.log.debug("client [container: {}] closed connection", protonConnection.getRemoteContainer());
        } else {
            this.log.debug("client [container: {}] closed connection with error", protonConnection.getRemoteContainer(), asyncResult.cause());
        }
        protonConnection.disconnectHandler((Handler) null);
        protonConnection.close();
        protonConnection.disconnect();
    }

    private void decrementConnectionCount(ProtonConnection protonConnection) {
        Device authenticatedDevice = getAuthenticatedDevice(protonConnection);
        if (authenticatedDevice == null) {
            this.metrics.decrementUnauthenticatedConnections();
        } else {
            this.metrics.decrementConnections(authenticatedDevice.getTenantId());
        }
        sendDisconnectedEvent((String) Optional.ofNullable(protonConnection.getRemoteContainer()).orElse("unknown"), authenticatedDevice);
    }

    protected void handleRemoteReceiverOpen(ProtonConnection protonConnection, ProtonReceiver protonReceiver) {
        Device device = (Device) protonConnection.attachments().get(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class);
        OptionalInt optionalInt = (OptionalInt) Optional.ofNullable((OptionalInt) protonConnection.attachments().get(AmqpAdapterConstants.KEY_TRACE_SAMPLING_PRIORITY, OptionalInt.class)).orElse(OptionalInt.empty());
        Span newSpan = newSpan("attach receiver", device, optionalInt);
        if (ProtonQoS.AT_MOST_ONCE.equals(protonReceiver.getRemoteQoS())) {
            closeLinkWithError(protonReceiver, new ClientErrorException(400, "unsupported snd-settle-mode: settled"), newSpan);
        } else if (protonReceiver.getRemoteTarget() == null || protonReceiver.getRemoteTarget().getAddress() == null) {
            protonReceiver.setTarget(protonReceiver.getRemoteTarget());
            protonReceiver.setSource(protonReceiver.getRemoteSource());
            protonReceiver.setQoS(protonReceiver.getRemoteQoS());
            protonReceiver.setPrefetch(30);
            protonReceiver.setAutoAccept(false);
            HonoProtonHelper.setCloseHandler(protonReceiver, asyncResult -> {
                onLinkDetach(protonReceiver);
            });
            HonoProtonHelper.setDetachHandler(protonReceiver, asyncResult2 -> {
                onLinkDetach(protonReceiver);
            });
            protonReceiver.handler((protonDelivery, message) -> {
                AmqpContext fromMessage = AmqpContext.fromMessage(protonDelivery, message, device);
                fromMessage.setTimer(this.metrics.startTimer());
                if (device == null) {
                    applyTenantTraceSamplingPriority(fromMessage, newSpan).setHandler(asyncResult3 -> {
                        onMessageReceived(fromMessage);
                    });
                } else {
                    fromMessage.setTraceSamplingPriority(optionalInt);
                    onMessageReceived(fromMessage);
                }
            });
            protonReceiver.open();
            if (device == null) {
                this.log.debug("established link for receiving messages from device [container: {}]", protonConnection.getRemoteContainer());
            } else {
                this.log.debug("established link for receiving messages from device [tenant: {}, device-id: {}]]", device.getTenantId(), device.getDeviceId());
            }
            newSpan.log("link established");
        } else {
            if (!protonReceiver.getRemoteTarget().getAddress().isEmpty()) {
                this.log.debug("closing link due to the presence of target address [{}]", protonReceiver.getRemoteTarget().getAddress());
            }
            closeLinkWithError(protonReceiver, new ClientErrorException(400, "this adapter supports anonymous relay mode only"), newSpan);
        }
        newSpan.finish();
    }

    protected Future<OptionalInt> applyTenantTraceSamplingPriority(AmqpContext amqpContext, Span span) {
        Objects.requireNonNull(amqpContext);
        Objects.requireNonNull(span);
        return this.tenantObjectWithAuthIdProvider.get(amqpContext, span.context()).map(tenantObjectWithAuthId -> {
            OptionalInt applyTraceSamplingPriority = TenantTraceSamplingHelper.applyTraceSamplingPriority(tenantObjectWithAuthId, span);
            amqpContext.setTraceSamplingPriority(applyTraceSamplingPriority);
            return applyTraceSamplingPriority;
        }).recover(th -> {
            return Future.succeededFuture(OptionalInt.empty());
        });
    }

    protected Future<Void> applyTenantTraceSamplingPriority(SaslResponseContext saslResponseContext, Span span) {
        Objects.requireNonNull(saslResponseContext);
        Objects.requireNonNull(span);
        return this.tenantObjectWithAuthIdProvider.get(saslResponseContext, span.context()).map(tenantObjectWithAuthId -> {
            saslResponseContext.getProtonConnection().attachments().set(AmqpAdapterConstants.KEY_TRACE_SAMPLING_PRIORITY, OptionalInt.class, TenantTraceSamplingHelper.applyTraceSamplingPriority(tenantObjectWithAuthId, span));
            return (Void) null;
        }).recover(th -> {
            return Future.succeededFuture();
        });
    }

    protected Future<ProtonDelivery> onMessageReceived(AmqpContext amqpContext) {
        Span newSpan = newSpan("upload message", amqpContext.getAuthenticatedDevice(), amqpContext.getTraceSamplingPriority());
        newSpan.log(Collections.singletonMap(Tags.MESSAGE_BUS_DESTINATION.getKey(), amqpContext.getAddress()));
        return validateEndpoint(amqpContext).compose(amqpContext2 -> {
            return validateAddress(amqpContext2.getAddress(), amqpContext2.getAuthenticatedDevice());
        }).compose(resourceIdentifier -> {
            return uploadMessage(amqpContext, resourceIdentifier, newSpan);
        }).map(protonDelivery -> {
            ProtonHelper.accepted(amqpContext.delivery(), true);
            newSpan.finish();
            return protonDelivery;
        }).recover(th -> {
            if (th instanceof ClientErrorException) {
                MessageHelper.rejected(amqpContext.delivery(), getErrorCondition(th));
            } else {
                ProtonHelper.released(amqpContext.delivery(), true);
            }
            TracingHelper.logError(newSpan, th);
            newSpan.finish();
            return Future.failedFuture(th);
        });
    }

    protected void handleRemoteSenderOpenForCommands(ProtonConnection protonConnection, ProtonSender protonSender) {
        Device device = (Device) protonConnection.attachments().get(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class);
        OptionalInt optionalInt = (OptionalInt) Optional.ofNullable((OptionalInt) protonConnection.attachments().get(AmqpAdapterConstants.KEY_TRACE_SAMPLING_PRIORITY, OptionalInt.class)).orElse(OptionalInt.empty());
        Span newSpan = newSpan("attach Command receiver", device, optionalInt);
        getResourceIdentifier(protonSender.getRemoteSource()).compose(resourceIdentifier -> {
            return validateAddress(resourceIdentifier, device);
        }).map(resourceIdentifier2 -> {
            if (CommandConstants.isCommandEndpoint(resourceIdentifier2.getEndpoint())) {
                return openCommandSenderLink(protonSender, resourceIdentifier2, device, newSpan, optionalInt).map(messageConsumer -> {
                    addConnectionLossHandler(protonConnection, r10 -> {
                        sendDisconnectedTtdEvent(resourceIdentifier2.getTenantId(), resourceIdentifier2.getResourceId(), device, null).setHandler(asyncResult -> {
                            messageConsumer.close((Handler) null);
                        });
                    });
                    return messageConsumer;
                });
            }
            throw new ClientErrorException(404, "no such node");
        }).map(future -> {
            newSpan.log("link established");
            return future;
        }).otherwise(th -> {
            if (th instanceof ServiceInvocationException) {
                closeLinkWithError(protonSender, th, newSpan);
                return null;
            }
            closeLinkWithError(protonSender, new ClientErrorException(400, "Invalid source address"), newSpan);
            return null;
        }).setHandler(asyncResult -> {
            newSpan.finish();
        });
    }

    private Span newSpan(String str, Device device, OptionalInt optionalInt) {
        Span start = this.tracer.buildSpan(str).ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), device != null).start();
        if (device != null) {
            start.setTag("tenant_id", device.getTenantId());
            start.setTag("device_id", device.getDeviceId());
        }
        optionalInt.ifPresent(i -> {
            TracingHelper.setTraceSamplingPriority(start, i);
        });
        return start;
    }

    private Future<MessageConsumer> openCommandSenderLink(ProtonSender protonSender, ResourceIdentifier resourceIdentifier, Device device, Span span, OptionalInt optionalInt) {
        return createCommandConsumer(protonSender, resourceIdentifier, device).map(messageConsumer -> {
            String tenantId = resourceIdentifier.getTenantId();
            String resourceId = resourceIdentifier.getResourceId();
            protonSender.setSource(protonSender.getRemoteSource());
            protonSender.setTarget(protonSender.getRemoteTarget());
            protonSender.setQoS(ProtonQoS.AT_LEAST_ONCE);
            Handler handler = asyncResult -> {
                Span newSpan = newSpan("detach Command receiver", device, optionalInt);
                sendDisconnectedTtdEvent(tenantId, resourceId, device, newSpan.context());
                messageConsumer.close((Handler) null);
                onLinkDetach(protonSender);
                newSpan.finish();
            };
            HonoProtonHelper.setCloseHandler(protonSender, handler);
            HonoProtonHelper.setDetachHandler(protonSender, handler);
            protonSender.open();
            this.log.debug("established link [address: {}] for sending commands to device", resourceIdentifier);
            sendConnectedTtdEvent(tenantId, resourceId, device, span.context());
            return messageConsumer;
        }).otherwise(th -> {
            throw new ServerErrorException(503, "cannot create command consumer");
        });
    }

    private Future<MessageConsumer> createCommandConsumer(ProtonSender protonSender, ResourceIdentifier resourceIdentifier, Device device) {
        Handler handler = commandContext -> {
            Timer.Sample startTimer = this.metrics.startTimer();
            addMicrometerSample(commandContext, startTimer);
            Tags.COMPONENT.set(commandContext.getCurrentSpan(), getTypeName());
            Command command = commandContext.getCommand();
            Future tenantConfiguration = getTenantConfiguration(resourceIdentifier.getTenantId(), commandContext.getTracingContext());
            tenantConfiguration.compose(tenantObject -> {
                return !command.isValid() ? Future.failedFuture(new ClientErrorException(400, "malformed command message")) : !protonSender.isOpen() ? Future.failedFuture(new ServerErrorException(503, "sender link is not open")) : checkMessageLimit(tenantObject, command.getPayloadSize());
            }).compose(r9 -> {
                onCommandReceived((TenantObject) tenantConfiguration.result(), protonSender, commandContext);
                return Future.succeededFuture();
            }).otherwise(th -> {
                if (th instanceof ClientErrorException) {
                    commandContext.reject(getErrorCondition(th), 1);
                } else {
                    commandContext.release(1);
                }
                this.metrics.reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, resourceIdentifier.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), command.getPayloadSize(), startTimer);
                return null;
            });
        };
        Handler handler2 = r1 -> {
        };
        return (device == null || device.getDeviceId().equals(resourceIdentifier.getResourceId())) ? getCommandConsumerFactory().createCommandConsumer(resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), handler, handler2, DEFAULT_COMMAND_CONSUMER_CHECK_INTERVAL_MILLIS) : getCommandConsumerFactory().createCommandConsumer(resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), device.getDeviceId(), handler, handler2, DEFAULT_COMMAND_CONSUMER_CHECK_INTERVAL_MILLIS);
    }

    protected void onCommandReceived(TenantObject tenantObject, ProtonSender protonSender, CommandContext commandContext) {
        Objects.requireNonNull(protonSender);
        Objects.requireNonNull(commandContext);
        Command command = commandContext.getCommand();
        Message commandMessage = command.getCommandMessage();
        if (commandMessage.getCorrelationId() == null) {
            commandMessage.setCorrelationId(commandMessage.getMessageId());
        }
        if (command.isTargetedAtGateway()) {
            MessageHelper.addDeviceId(commandMessage, command.getOriginalDeviceId());
        }
        protonSender.send(commandMessage, protonDelivery -> {
            Modified remoteState = protonDelivery.getRemoteState();
            MetricsTags.ProcessingOutcome processingOutcome = null;
            if (protonDelivery.remotelySettled()) {
                commandContext.disposition(remoteState, 1);
                if (Accepted.class.isInstance(remoteState)) {
                    processingOutcome = MetricsTags.ProcessingOutcome.FORWARDED;
                } else if (Rejected.class.isInstance(remoteState)) {
                    processingOutcome = MetricsTags.ProcessingOutcome.UNPROCESSABLE;
                } else if (Released.class.isInstance(remoteState)) {
                    processingOutcome = MetricsTags.ProcessingOutcome.UNDELIVERABLE;
                } else if (Modified.class.isInstance(remoteState)) {
                    processingOutcome = remoteState.getUndeliverableHere().booleanValue() ? MetricsTags.ProcessingOutcome.UNPROCESSABLE : MetricsTags.ProcessingOutcome.UNDELIVERABLE;
                }
            } else {
                this.log.debug("device did not settle command message [command: {}, remote state: {}]", command.getName(), remoteState);
                HashMap hashMap = new HashMap(2);
                hashMap.put("event", "device did not settle command");
                hashMap.put("remote state", remoteState);
                commandContext.getCurrentSpan().log(hashMap);
                commandContext.release(1);
                processingOutcome = MetricsTags.ProcessingOutcome.UNDELIVERABLE;
            }
            this.metrics.reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, command.getTenant(), tenantObject, processingOutcome, command.getPayloadSize(), getMicrometerSample(commandContext));
        });
        HashMap hashMap = new HashMap(4);
        hashMap.put("event", "command sent to device");
        if (protonSender.getRemoteTarget() != null) {
            hashMap.put(Tags.MESSAGE_BUS_DESTINATION.getKey(), protonSender.getRemoteTarget().getAddress());
        }
        hashMap.put(TracingHelper.TAG_QOS.getKey(), protonSender.getQoS().name());
        hashMap.put(TracingHelper.TAG_CREDIT.getKey(), Integer.valueOf(protonSender.getCredit()));
        commandContext.getCurrentSpan().log(hashMap);
    }

    private <T extends ProtonLink<T>> void closeLinkWithError(ProtonLink<T> protonLink, Throwable th, Span span) {
        ErrorCondition errorCondition = getErrorCondition(th);
        this.log.debug("closing link with error condition [symbol: {}, description: {}]", errorCondition.getCondition(), errorCondition.getDescription());
        protonLink.setCondition(errorCondition);
        protonLink.close();
        if (span != null) {
            TracingHelper.logError(span, th);
        }
    }

    private Future<ProtonDelivery> uploadMessage(AmqpContext amqpContext, ResourceIdentifier resourceIdentifier, Span span) {
        Future future = Future.future();
        if (isPayloadOfIndicatedType(amqpContext.getMessagePayload(), amqpContext.getMessageContentType())) {
            future.complete();
        } else {
            future.fail(new ClientErrorException(400, "empty notifications must not contain payload"));
        }
        return future.compose(r10 -> {
            switch (AnonymousClass1.$SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[amqpContext.getEndpoint().ordinal()]) {
                case 1:
                case 2:
                    return doUploadCommandResponseMessage(amqpContext, resourceIdentifier, span);
                case 3:
                    return doUploadMessage(amqpContext, resourceIdentifier, getTelemetrySender(resourceIdentifier.getTenantId()), span);
                case 4:
                    return doUploadMessage(amqpContext, resourceIdentifier, getEventSender(resourceIdentifier.getTenantId()), span);
                default:
                    return Future.failedFuture(new ClientErrorException(404, "unknown endpoint"));
            }
        });
    }

    private Future<ProtonDelivery> doUploadMessage(AmqpContext amqpContext, ResourceIdentifier resourceIdentifier, Future<DownstreamSender> future, Span span) {
        this.log.trace("forwarding {} message", amqpContext.getEndpoint().getCanonicalName());
        Future registrationAssertion = getRegistrationAssertion(resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), amqpContext.getAuthenticatedDevice(), span.context());
        Future tenantConfiguration = getTenantConfiguration(resourceIdentifier.getTenantId(), span.context());
        Future compose = tenantConfiguration.compose(tenantObject -> {
            return CompositeFuture.all(isAdapterEnabled(tenantObject), checkMessageLimit(tenantObject, amqpContext.getPayloadSize())).map(compositeFuture -> {
                return tenantObject;
            });
        });
        return CompositeFuture.all(compose, registrationAssertion, future).compose(compositeFuture -> {
            DownstreamSender downstreamSender = (DownstreamSender) future.result();
            Message addProperties = addProperties(amqpContext.getMessage(), ResourceIdentifier.from(amqpContext.getEndpoint().getCanonicalName(), resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId()), amqpContext.getAddress().toString(), (TenantObject) compose.result(), (JsonObject) registrationAssertion.result(), null);
            return amqpContext.isRemotelySettled() ? downstreamSender.send(addProperties, span.context()) : downstreamSender.sendAndWaitForOutcome(addProperties, span.context());
        }).recover(th -> {
            this.log.debug("cannot process {} message from device [tenant: {}, device-id: {}]", new Object[]{amqpContext.getEndpoint().getCanonicalName(), resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), th});
            this.metrics.reportTelemetry(amqpContext.getEndpoint(), resourceIdentifier.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), amqpContext.isRemotelySettled() ? MetricsTags.QoS.AT_MOST_ONCE : MetricsTags.QoS.AT_LEAST_ONCE, amqpContext.getPayloadSize(), amqpContext.getTimer());
            return Future.failedFuture(th);
        }).map(protonDelivery -> {
            this.metrics.reportTelemetry(amqpContext.getEndpoint(), resourceIdentifier.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, amqpContext.isRemotelySettled() ? MetricsTags.QoS.AT_MOST_ONCE : MetricsTags.QoS.AT_LEAST_ONCE, amqpContext.getPayloadSize(), amqpContext.getTimer());
            return protonDelivery;
        });
    }

    private Future<ProtonDelivery> doUploadCommandResponseMessage(AmqpContext amqpContext, ResourceIdentifier resourceIdentifier, Span span) {
        Future future = (Future) Optional.ofNullable(CommandResponse.from(amqpContext.getMessage())).map(commandResponse -> {
            return Future.succeededFuture(commandResponse);
        }).orElseGet(() -> {
            TracingHelper.logError(span, String.format("invalid message (correlationId: %s, address: %s, status: %s)", amqpContext.getMessage().getCorrelationId(), amqpContext.getMessage().getAddress(), MessageHelper.getStatus(amqpContext.getMessage())));
            return Future.failedFuture(new ClientErrorException(400, "malformed command response message"));
        });
        Future tenantConfiguration = getTenantConfiguration(resourceIdentifier.getTenantId(), span.context());
        return CompositeFuture.all(tenantConfiguration, future).compose(compositeFuture -> {
            CommandResponse commandResponse2 = (CommandResponse) future.result();
            this.log.trace("sending command response [device-id: {}, status: {}, correlation-id: {}, reply-to: {}]", new Object[]{resourceIdentifier.getResourceId(), Integer.valueOf(commandResponse2.getStatus()), commandResponse2.getCorrelationId(), commandResponse2.getReplyToId()});
            HashMap hashMap = new HashMap(3);
            hashMap.put("event", "sending command response");
            hashMap.put(TracingHelper.TAG_CORRELATION_ID.getKey(), commandResponse2.getCorrelationId());
            hashMap.put("status", Integer.valueOf(commandResponse2.getStatus()));
            span.log(hashMap);
            return CompositeFuture.all(CompositeFuture.all(isAdapterEnabled((TenantObject) tenantConfiguration.result()), checkMessageLimit((TenantObject) tenantConfiguration.result(), amqpContext.getPayloadSize())).map(compositeFuture -> {
                return (TenantObject) tenantConfiguration.result();
            }), getRegistrationAssertion(resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), amqpContext.getAuthenticatedDevice(), span.context())).compose(compositeFuture2 -> {
                return sendCommandResponse(resourceIdentifier.getTenantId(), commandResponse2, span.context());
            });
        }).map(protonDelivery -> {
            this.log.trace("forwarded command response from device [tenant: {}, device-id: {}]", resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId());
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, resourceIdentifier.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, amqpContext.getPayloadSize(), amqpContext.getTimer());
            return protonDelivery;
        }).recover(th -> {
            this.log.debug("cannot process command response from device [tenant: {}, device-id: {}]", new Object[]{resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), th});
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, resourceIdentifier.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), amqpContext.getPayloadSize(), amqpContext.getTimer());
            return Future.failedFuture(th);
        });
    }

    private <T extends ProtonLink<T>> void onLinkDetach(ProtonLink<T> protonLink) {
        this.log.debug("closing link [{}]", protonLink.getName());
        protonLink.close();
    }

    Future<AmqpContext> validateEndpoint(AmqpContext amqpContext) {
        Future<AmqpContext> future = Future.future();
        if (amqpContext.getAddress() != null) {
            switch (AnonymousClass1.$SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[amqpContext.getEndpoint().ordinal()]) {
                case 1:
                case 2:
                case 3:
                    future.complete(amqpContext);
                    break;
                case 4:
                    if (!amqpContext.isRemotelySettled()) {
                        future.complete(amqpContext);
                        break;
                    } else {
                        future.fail(new ClientErrorException(400, "event endpoint accepts unsettled messages only"));
                        break;
                    }
                default:
                    this.log.debug("device wants to send message for unsupported address [{}]", amqpContext.getAddress());
                    future.fail(new ClientErrorException(404, "unsupported endpoint"));
                    break;
            }
        } else {
            future.fail(new ClientErrorException(404));
        }
        return future;
    }

    private static Future<ResourceIdentifier> getResourceIdentifier(Source source) {
        if (source == null) {
            return Future.failedFuture(new ClientErrorException(404, "no such node"));
        }
        Future<ResourceIdentifier> future = Future.future();
        try {
            if (Strings.isNullOrEmpty(source.getAddress())) {
                future.fail(new ClientErrorException(404, "no such node"));
            } else {
                future.complete(ResourceIdentifier.fromString(source.getAddress()));
            }
        } catch (Throwable th) {
            future.fail(th);
        }
        return future;
    }

    private static void addConnectionLossHandler(ProtonConnection protonConnection, Handler<Void> handler) {
        protonConnection.attachments().set("connectionLossHandler", Handler.class, handler);
    }

    private static Handler<Void> getConnectionLossHandler(ProtonConnection protonConnection) {
        return (Handler) protonConnection.attachments().get("connectionLossHandler", Handler.class);
    }

    private static Device getAuthenticatedDevice(ProtonConnection protonConnection) {
        return (Device) Optional.ofNullable(protonConnection.attachments()).map(record -> {
            return (Device) record.get(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class);
        }).orElse(null);
    }

    public int getPortDefaultValue() {
        return 5671;
    }

    public int getInsecurePortDefaultValue() {
        return 5672;
    }

    protected int getActualPort() {
        if (this.secureServer != null) {
            return this.secureServer.actualPort();
        }
        return -1;
    }

    protected int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }
}
