package org.eclipse.hono.adapter.amqp.impl;

import io.micrometer.core.instrument.Timer;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.ProtonServerOptions;
import io.vertx.proton.ProtonSession;
import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.Source;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.adapter.amqp.AmqpAdapterConstants;
import org.eclipse.hono.adapter.amqp.AmqpAdapterMetrics;
import org.eclipse.hono.adapter.amqp.AmqpAdapterProperties;
import org.eclipse.hono.adapter.amqp.AmqpAdapterSaslAuthenticatorFactory;
import org.eclipse.hono.adapter.amqp.SaslExternalAuthHandler;
import org.eclipse.hono.adapter.amqp.SaslPlainAuthHandler;
import org.eclipse.hono.adapter.amqp.SaslResponseContext;
import org.eclipse.hono.adapter.client.command.Command;
import org.eclipse.hono.adapter.client.command.CommandConsumer;
import org.eclipse.hono.adapter.client.command.CommandContext;
import org.eclipse.hono.adapter.client.command.CommandResponse;
import org.eclipse.hono.adapter.client.command.Commands;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.AdapterConnectionsExceededException;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.service.auth.device.CredentialsApiAuthProvider;
import org.eclipse.hono.service.auth.device.DeviceCredentials;
import org.eclipse.hono.service.auth.device.TenantServiceBasedX509Authentication;
import org.eclipse.hono.service.auth.device.UsernamePasswordAuthProvider;
import org.eclipse.hono.service.auth.device.X509AuthProvider;
import org.eclipse.hono.service.limiting.ConnectionLimitManager;
import org.eclipse.hono.service.limiting.DefaultConnectionLimitManager;
import org.eclipse.hono.service.limiting.MemoryBasedConnectionLimitStrategy;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.tracing.TenantTraceSamplingHelper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TenantObject;

/* loaded from: input_file:org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapter.class */
public final class VertxBasedAmqpProtocolAdapter extends AbstractProtocolAdapterBase<AmqpAdapterProperties> {
    private static final String KEY_CONNECTION_LOSS_HANDLERS = "connectionLossHandlers";
    private static final int MINIMAL_MEMORY = 100000000;
    private static final int MEMORY_PER_CONNECTION = 20000;
    private ProtonServer secureServer;
    private ProtonServer insecureServer;
    private ProtonSaslAuthenticatorFactory authenticatorFactory;
    private AmqpAdapterMetrics metrics = AmqpAdapterMetrics.NOOP;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.eclipse.hono.adapter.amqp.impl.VertxBasedAmqpProtocolAdapter$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType = new int[MetricsTags.EndpointType.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.TELEMETRY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.EVENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.COMMAND_RESPONSE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    protected String getTypeName() {
        return "hono-amqp";
    }

    public void setMetrics(AmqpAdapterMetrics amqpAdapterMetrics) {
        Optional.ofNullable(amqpAdapterMetrics).ifPresent(amqpAdapterMetrics2 -> {
            this.log.info("reporting metrics using [{}]", amqpAdapterMetrics.getClass().getName());
        });
        this.metrics = amqpAdapterMetrics;
    }

    protected AmqpAdapterMetrics getMetrics() {
        return this.metrics;
    }

    protected void doStart(Promise<Void> promise) {
        if (getConnectionLimitManager() == null) {
            setConnectionLimitManager(createConnectionLimitManager());
        }
        checkPortConfiguration().compose(r15 -> {
            if (this.authenticatorFactory == null && ((AmqpAdapterProperties) getConfig()).isAuthenticationRequired()) {
                this.authenticatorFactory = new AmqpAdapterSaslAuthenticatorFactory(getMetrics(), () -> {
                    return this.tracer.buildSpan("open connection").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).start();
                }, new SaslPlainAuthHandler(new UsernamePasswordAuthProvider(getCredentialsClient(), this.tracer), this::handleBeforeCredentialsValidation), new SaslExternalAuthHandler(new TenantServiceBasedX509Authentication(getTenantClient(), this.tracer), new X509AuthProvider(getCredentialsClient(), this.tracer), this::handleBeforeCredentialsValidation));
            }
            return Future.succeededFuture();
        }).compose(obj -> {
            return CompositeFuture.all(bindSecureServer(), bindInsecureServer());
        }).map(compositeFuture -> {
            return (Void) null;
        }).onComplete(promise);
    }

    protected Future<Void> handleBeforeCredentialsValidation(DeviceCredentials deviceCredentials, SaslResponseContext saslResponseContext) {
        String tenantId = deviceCredentials.getTenantId();
        Span tracingSpan = saslResponseContext.getTracingSpan();
        String authId = deviceCredentials.getAuthId();
        return getTenantConfiguration(tenantId, tracingSpan.context()).recover(th -> {
            return Future.failedFuture(CredentialsApiAuthProvider.mapNotFoundToBadCredentialsException(th));
        }).compose(tenantObject -> {
            TracingHelper.setDeviceTags(tracingSpan, tenantId, (String) null, authId);
            saslResponseContext.getProtonConnection().attachments().set(AmqpAdapterConstants.KEY_TRACE_SAMPLING_PRIORITY, OptionalInt.class, TenantTraceSamplingHelper.applyTraceSamplingPriority(tenantObject, authId, tracingSpan));
            return Future.succeededFuture();
        });
    }

    private ConnectionLimitManager createConnectionLimitManager() {
        return new DefaultConnectionLimitManager(new MemoryBasedConnectionLimitStrategy(100000000L, MEMORY_PER_CONNECTION + ((AmqpAdapterProperties) getConfig()).getMaxSessionWindowSize()), () -> {
            return Integer.valueOf(this.metrics.getNumberOfConnections());
        }, (ProtocolAdapterProperties) getConfig());
    }

    protected void doStop(Promise<Void> promise) {
        CompositeFuture.all(stopSecureServer(), stopInsecureServer()).map(compositeFuture -> {
            return (Void) null;
        }).onComplete(promise);
    }

    private Future<Void> stopInsecureServer() {
        Promise promise = Promise.promise();
        if (this.insecureServer != null) {
            this.log.info("Shutting down insecure server");
            this.insecureServer.close(promise);
        } else {
            promise.complete();
        }
        return promise.future();
    }

    private Future<Void> stopSecureServer() {
        Promise promise = Promise.promise();
        if (this.secureServer != null) {
            this.log.info("Shutting down secure server");
            this.secureServer.close(promise);
        } else {
            promise.complete();
        }
        return promise.future();
    }

    private Future<Void> bindInsecureServer() {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        ProtonServerOptions heartbeat = new ProtonServerOptions().setHost(((AmqpAdapterProperties) getConfig()).getInsecurePortBindAddress()).setPort(determineInsecurePort()).setMaxFrameSize(((AmqpAdapterProperties) getConfig()).getMaxFrameSize()).setHeartbeat(((AmqpAdapterProperties) getConfig()).getIdleTimeout() >> 1);
        Promise promise = Promise.promise();
        this.insecureServer = createServer(this.insecureServer, heartbeat);
        this.insecureServer.connectHandler(this::onConnectRequest).listen(asyncResult -> {
            if (!asyncResult.succeeded()) {
                promise.fail(asyncResult.cause());
            } else {
                this.log.info("insecure AMQP server listening on [{}:{}]", ((AmqpAdapterProperties) getConfig()).getInsecurePortBindAddress(), Integer.valueOf(getActualInsecurePort()));
                promise.complete();
            }
        });
        return promise.future();
    }

    private Future<Void> bindSecureServer() {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        ProtonServerOptions heartbeat = new ProtonServerOptions().setHost(((AmqpAdapterProperties) getConfig()).getBindAddress()).setPort(determineSecurePort()).setMaxFrameSize(((AmqpAdapterProperties) getConfig()).getMaxFrameSize()).setHeartbeat(((AmqpAdapterProperties) getConfig()).getIdleTimeout() >> 1);
        addTlsKeyCertOptions(heartbeat);
        addTlsTrustOptions(heartbeat);
        Promise promise = Promise.promise();
        this.secureServer = createServer(this.secureServer, heartbeat);
        this.secureServer.connectHandler(this::onConnectRequest).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                this.log.info("secure AMQP server listening on {}:{}", ((AmqpAdapterProperties) getConfig()).getBindAddress(), Integer.valueOf(getActualPort()));
                promise.complete();
            } else {
                this.log.error("cannot bind to secure port", asyncResult.cause());
                promise.fail(asyncResult.cause());
            }
        });
        return promise.future();
    }

    private ProtonServer createServer(ProtonServer protonServer, ProtonServerOptions protonServerOptions) {
        ProtonServer create = protonServer != null ? protonServer : ProtonServer.create(this.vertx, protonServerOptions);
        if (((AmqpAdapterProperties) getConfig()).isAuthenticationRequired()) {
            create.saslAuthenticatorFactory(this.authenticatorFactory);
        } else {
            create.saslAuthenticatorFactory((ProtonSaslAuthenticatorFactory) null);
        }
        return create;
    }

    protected void onConnectRequest(ProtonConnection protonConnection) {
        protonConnection.disconnectHandler(protonConnection2 -> {
            this.log.debug("lost connection to device [container: {}]", protonConnection.getRemoteContainer());
            onConnectionLoss(protonConnection);
            decrementConnectionCount(protonConnection);
        });
        protonConnection.closeHandler(asyncResult -> {
            handleRemoteConnectionClose(protonConnection, asyncResult);
            onConnectionLoss(protonConnection);
            decrementConnectionCount(protonConnection);
        });
        protonConnection.sessionOpenHandler(protonSession -> {
            HonoProtonHelper.setDefaultCloseHandler(protonSession);
            handleSessionOpen(protonConnection, protonSession);
        });
        protonConnection.receiverOpenHandler(protonReceiver -> {
            HonoProtonHelper.setDefaultCloseHandler(protonReceiver);
            protonReceiver.setMaxMessageSize(UnsignedLong.valueOf(((AmqpAdapterProperties) getConfig()).getMaxPayloadSize()));
            handleRemoteReceiverOpen(protonConnection, protonReceiver);
        });
        protonConnection.senderOpenHandler(protonSender -> {
            handleRemoteSenderOpenForCommands(protonConnection, protonSender);
        });
        protonConnection.openHandler(asyncResult2 -> {
            Device authenticatedDevice = getAuthenticatedDevice(protonConnection);
            if (authenticatedDevice == null) {
                this.metrics.incrementUnauthenticatedConnections();
            } else {
                this.metrics.incrementConnections(authenticatedDevice.getTenantId());
            }
            if (asyncResult2.failed()) {
                this.log.debug("ignoring device's open frame containing error", asyncResult2.cause());
            } else {
                processRemoteOpen((ProtonConnection) asyncResult2.result());
            }
        });
    }

    private void onConnectionLoss(ProtonConnection protonConnection) {
        Span newSpan = newSpan("handle closing of connection", getAuthenticatedDevice(protonConnection), getTraceSamplingPriority(protonConnection));
        CompositeFuture.join((List) getConnectionLossHandlers(protonConnection).stream().map(function -> {
            return (Future) function.apply(newSpan);
        }).collect(Collectors.toList())).recover(th -> {
            Tags.ERROR.set(newSpan, true);
            return Future.failedFuture(th);
        }).onComplete(asyncResult -> {
            newSpan.finish();
        });
    }

    private void processRemoteOpen(ProtonConnection protonConnection) {
        Span span = (Span) Optional.ofNullable((Span) protonConnection.attachments().get(AmqpAdapterConstants.KEY_CURRENT_SPAN, Span.class)).orElseGet(() -> {
            return this.tracer.buildSpan("open connection").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).start();
        });
        Device authenticatedDevice = getAuthenticatedDevice(protonConnection);
        TracingHelper.TAG_AUTHENTICATED.set(span, Boolean.valueOf(authenticatedDevice != null));
        if (authenticatedDevice != null) {
            TracingHelper.setDeviceTags(span, authenticatedDevice.getTenantId(), authenticatedDevice.getDeviceId());
        }
        checkConnectionLimitForAdapter().compose(r9 -> {
            return checkAuthorizationAndResourceLimits(authenticatedDevice, protonConnection, span);
        }).compose(r7 -> {
            return sendConnectedEvent((String) Optional.ofNullable(protonConnection.getRemoteContainer()).orElse("unknown"), authenticatedDevice);
        }).map(obj -> {
            protonConnection.setContainer(getTypeName());
            protonConnection.setOfferedCapabilities(new Symbol[]{Constants.CAP_ANONYMOUS_RELAY});
            protonConnection.open();
            this.log.debug("connection with device [container: {}] established", protonConnection.getRemoteContainer());
            span.log("connection established");
            this.metrics.reportConnectionAttempt(MetricsTags.ConnectionAttemptOutcome.SUCCEEDED, (String) Optional.ofNullable(authenticatedDevice).map((v0) -> {
                return v0.getTenantId();
            }).orElse(null));
            return null;
        }).otherwise(th -> {
            protonConnection.setCondition(AbstractProtocolAdapterBase.getErrorCondition(th));
            protonConnection.close();
            TracingHelper.logError(span, th);
            this.metrics.reportConnectionAttempt(AbstractProtocolAdapterBase.getOutcome(th), (String) Optional.ofNullable(authenticatedDevice).map((v0) -> {
                return v0.getTenantId();
            }).orElse(null));
            return null;
        }).onComplete(asyncResult -> {
            span.finish();
        });
    }

    private Future<Void> checkAuthorizationAndResourceLimits(Device device, ProtonConnection protonConnection, Span span) {
        Promise promise = Promise.promise();
        if (!((AmqpAdapterProperties) getConfig()).isAuthenticationRequired()) {
            this.log.trace("received connection request from anonymous device [container: {}]", protonConnection.getRemoteContainer());
            promise.complete();
        } else if (device == null) {
            promise.fail(new ClientErrorException(401, "anonymous devices not supported"));
        } else {
            this.log.trace("received connection request from {}", device);
            CompositeFuture.all(checkDeviceRegistration(device, span.context()), getTenantConfiguration(device.getTenantId(), span.context()).compose(tenantObject -> {
                return CompositeFuture.all(isAdapterEnabled(tenantObject).recover(th -> {
                    return Future.failedFuture(new AdapterDisabledException(device.getTenantId(), "adapter is disabled for tenant", th));
                }), checkConnectionLimit(tenantObject, span.context()));
            })).map(compositeFuture -> {
                this.log.debug("{} is registered and enabled", device);
                span.log(String.format("device [%s] is registered and enabled", device));
                return (Void) null;
            }).onComplete(promise);
        }
        return promise.future();
    }

    protected void setInsecureAmqpServer(ProtonServer protonServer) {
        Objects.requireNonNull(protonServer);
        if (protonServer.actualPort() > 0) {
            throw new IllegalArgumentException("AMQP Server should not be running");
        }
        this.insecureServer = protonServer;
    }

    protected void setSaslAuthenticatorFactory(ProtonSaslAuthenticatorFactory protonSaslAuthenticatorFactory) {
        this.authenticatorFactory = (ProtonSaslAuthenticatorFactory) Objects.requireNonNull(protonSaslAuthenticatorFactory, "authFactory must not be null");
    }

    private void handleSessionOpen(ProtonConnection protonConnection, ProtonSession protonSession) {
        this.log.debug("opening new session with client [container: {}, session window size: {}]", protonConnection.getRemoteContainer(), Integer.valueOf(((AmqpAdapterProperties) getConfig()).getMaxSessionWindowSize()));
        protonSession.setIncomingCapacity(((AmqpAdapterProperties) getConfig()).getMaxSessionWindowSize());
        protonSession.open();
    }

    private void handleRemoteConnectionClose(ProtonConnection protonConnection, AsyncResult<ProtonConnection> asyncResult) {
        if (asyncResult.succeeded()) {
            this.log.debug("client [container: {}] closed connection", protonConnection.getRemoteContainer());
        } else {
            this.log.debug("client [container: {}] closed connection with error", protonConnection.getRemoteContainer(), asyncResult.cause());
        }
        protonConnection.disconnectHandler((Handler) null);
        protonConnection.close();
        protonConnection.disconnect();
    }

    private void decrementConnectionCount(ProtonConnection protonConnection) {
        Device authenticatedDevice = getAuthenticatedDevice(protonConnection);
        if (authenticatedDevice == null) {
            this.metrics.decrementUnauthenticatedConnections();
        } else {
            this.metrics.decrementConnections(authenticatedDevice.getTenantId());
        }
        sendDisconnectedEvent((String) Optional.ofNullable(protonConnection.getRemoteContainer()).orElse("unknown"), authenticatedDevice);
    }

    protected void handleRemoteReceiverOpen(ProtonConnection protonConnection, ProtonReceiver protonReceiver) {
        Device authenticatedDevice = getAuthenticatedDevice(protonConnection);
        OptionalInt traceSamplingPriority = getTraceSamplingPriority(protonConnection);
        Span newSpan = newSpan("attach device sender link", authenticatedDevice, traceSamplingPriority);
        newSpan.log(Map.of("snd-settle-mode", protonReceiver.getRemoteQoS()));
        String str = (String) Optional.ofNullable(protonReceiver.getRemoteTarget()).map(target -> {
            return target.getAddress();
        }).orElse(null);
        if (Strings.isNullOrEmpty(str)) {
            protonReceiver.setTarget(protonReceiver.getRemoteTarget());
            protonReceiver.setSource(protonReceiver.getRemoteSource());
            protonReceiver.setQoS(protonReceiver.getRemoteQoS());
            protonReceiver.setPrefetch(30);
            protonReceiver.setAutoAccept(false);
            HonoProtonHelper.setCloseHandler(protonReceiver, asyncResult -> {
                onLinkDetach(protonReceiver);
            });
            HonoProtonHelper.setDetachHandler(protonReceiver, asyncResult2 -> {
                onLinkDetach(protonReceiver);
            });
            protonReceiver.handler((protonDelivery, message) -> {
                try {
                    Span newSpan2 = newSpan("upload message", authenticatedDevice, traceSamplingPriority, TracingHelper.extractSpanContext(this.tracer, message));
                    newSpan2.log(Map.of(Tags.MESSAGE_BUS_DESTINATION.getKey(), message.getAddress(), "settled", Boolean.valueOf(protonDelivery.remotelySettled())));
                    AmqpContext fromMessage = AmqpContext.fromMessage(protonDelivery, message, newSpan2, authenticatedDevice);
                    fromMessage.setTimer(this.metrics.startTimer());
                    (authenticatedDevice == null ? applyTraceSamplingPriorityForAddressTenant(fromMessage.getAddress(), newSpan2) : Future.succeededFuture()).compose(r6 -> {
                        return onMessageReceived(fromMessage).onComplete(asyncResult3 -> {
                            newSpan2.finish();
                        });
                    });
                } catch (Exception e) {
                    this.log.warn("error handling message", e);
                    ProtonHelper.released(protonDelivery, true);
                }
            });
            protonReceiver.open();
            if (authenticatedDevice == null) {
                this.log.debug("established link for receiving messages from device [container: {}]", protonConnection.getRemoteContainer());
            } else {
                this.log.debug("established link for receiving messages from device [tenant: {}, device-id: {}]]", authenticatedDevice.getTenantId(), authenticatedDevice.getDeviceId());
            }
            newSpan.log("link established");
        } else {
            this.log.debug("client provided target address [{}] in open frame, closing link", str);
            newSpan.log(Map.of("target address", str));
            closeLinkWithError(protonReceiver, new ClientErrorException(400, "container supports anonymous terminus only"), newSpan);
        }
        newSpan.finish();
    }

    protected Future<Void> applyTraceSamplingPriorityForAddressTenant(ResourceIdentifier resourceIdentifier, Span span) {
        Objects.requireNonNull(span);
        return (resourceIdentifier == null || resourceIdentifier.getTenantId() == null) ? Future.succeededFuture() : getTenantConfiguration(resourceIdentifier.getTenantId(), span.context()).map(tenantObject -> {
            TracingHelper.setDeviceTags(span, tenantObject.getTenantId(), (String) null);
            TenantTraceSamplingHelper.applyTraceSamplingPriority(tenantObject, (String) null, span);
            return (Void) null;
        }).recover(th -> {
            return Future.succeededFuture();
        });
    }

    protected Future<?> onMessageReceived(AmqpContext amqpContext) {
        this.log.trace("processing message [address: {}, qos: {}]", amqpContext.getAddress(), amqpContext.getRequestedQos());
        Span tracingSpan = amqpContext.getTracingSpan();
        return validateEndpoint(amqpContext).compose(amqpContext2 -> {
            return validateAddress(amqpContext2.getAddress(), amqpContext2.getAuthenticatedDevice());
        }).compose(resourceIdentifier -> {
            return uploadMessage(amqpContext, resourceIdentifier, tracingSpan);
        }).map(obj -> {
            ProtonHelper.accepted(amqpContext.delivery(), true);
            return obj;
        }).recover(th -> {
            if (th instanceof ClientErrorException) {
                MessageHelper.rejected(amqpContext.delivery(), AbstractProtocolAdapterBase.getErrorCondition(th));
            } else {
                ProtonHelper.released(amqpContext.delivery(), true);
            }
            this.log.debug("failed to process message from device", th);
            TracingHelper.logError(tracingSpan, th);
            return Future.failedFuture(th);
        });
    }

    protected void handleRemoteSenderOpenForCommands(ProtonConnection protonConnection, ProtonSender protonSender) {
        Device authenticatedDevice = getAuthenticatedDevice(protonConnection);
        OptionalInt traceSamplingPriority = getTraceSamplingPriority(protonConnection);
        Span newSpan = newSpan("attach device command receiver link", authenticatedDevice, traceSamplingPriority);
        getResourceIdentifier(protonSender.getRemoteSource()).compose(resourceIdentifier -> {
            return validateAddress(resourceIdentifier, authenticatedDevice);
        }).compose(resourceIdentifier2 -> {
            return CommandConstants.isCommandEndpoint(resourceIdentifier2.getEndpoint()) ? openCommandSenderLink(protonConnection, protonSender, resourceIdentifier2, authenticatedDevice, newSpan, traceSamplingPriority).map(commandConsumer -> {
                setConnectionLossHandler(protonConnection, resourceIdentifier2.toString(), span -> {
                    return closeCommandConsumer(commandConsumer, resourceIdentifier2.getTenantId(), resourceIdentifier2.getResourceId(), authenticatedDevice, span);
                });
                return commandConsumer;
            }) : Future.failedFuture(new ClientErrorException(404, "no such node"));
        }).map(commandConsumer -> {
            newSpan.log("link established");
            return commandConsumer;
        }).recover(th -> {
            if (th instanceof ServiceInvocationException) {
                closeLinkWithError(protonSender, th, newSpan);
            } else {
                closeLinkWithError(protonSender, new ClientErrorException(400, "Invalid source address"), newSpan);
            }
            return Future.failedFuture(th);
        }).onComplete(asyncResult -> {
            newSpan.finish();
        });
    }

    private Span newSpan(String str, Device device, OptionalInt optionalInt) {
        return newSpan(str, device, optionalInt, null);
    }

    private Span newSpan(String str, Device device, OptionalInt optionalInt, SpanContext spanContext) {
        Span start = TracingHelper.buildChildSpan(this.tracer, spanContext, str, getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "server").withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), device != null).start();
        if (device != null) {
            TracingHelper.setDeviceTags(start, device.getTenantId(), device.getDeviceId());
        }
        optionalInt.ifPresent(i -> {
            TracingHelper.setTraceSamplingPriority(start, i);
        });
        return start;
    }

    private Future<CommandConsumer> openCommandSenderLink(ProtonConnection protonConnection, ProtonSender protonSender, ResourceIdentifier resourceIdentifier, Device device, Span span, OptionalInt optionalInt) {
        return createCommandConsumer(protonSender, resourceIdentifier, device, span).map(commandConsumer -> {
            String tenantId = resourceIdentifier.getTenantId();
            String resourceId = resourceIdentifier.getResourceId();
            protonSender.setSource(protonSender.getRemoteSource());
            protonSender.setTarget(protonSender.getRemoteTarget());
            protonSender.setQoS(ProtonQoS.AT_LEAST_ONCE);
            Handler handler = asyncResult -> {
                Span newSpan = newSpan("detach device command receiver link", device, optionalInt);
                removeConnectionLossHandler(protonConnection, resourceIdentifier.toString());
                onLinkDetach(protonSender);
                closeCommandConsumer(commandConsumer, tenantId, resourceId, device, newSpan).onComplete(asyncResult -> {
                    newSpan.finish();
                });
            };
            HonoProtonHelper.setCloseHandler(protonSender, handler);
            HonoProtonHelper.setDetachHandler(protonSender, handler);
            protonSender.open();
            this.log.debug("established link [address: {}] for sending commands to device", resourceIdentifier);
            sendConnectedTtdEvent(tenantId, resourceId, device, span.context());
            return commandConsumer;
        }).recover(th -> {
            return Future.failedFuture(new ServerErrorException(503, "cannot create command consumer"));
        });
    }

    private Future<Void> closeCommandConsumer(CommandConsumer commandConsumer, String str, String str2, Device device, Span span) {
        return commandConsumer.close(span.context()).recover(th -> {
            TracingHelper.logError(span, th);
            if (ServiceInvocationException.extractStatusCode(th) != 412) {
                return Future.succeededFuture();
            }
            this.log.debug("command consumer wasn't active anymore - skip sending disconnected event [tenant: {}, device-id: {}]", str, str2);
            span.log("command consumer wasn't active anymore - skip sending disconnected event");
            return Future.failedFuture(th);
        }).compose(r11 -> {
            return sendDisconnectedTtdEvent(str, str2, device, span.context());
        }).mapEmpty();
    }

    private Future<CommandConsumer> createCommandConsumer(ProtonSender protonSender, ResourceIdentifier resourceIdentifier, Device device, Span span) {
        Handler handler = commandContext -> {
            Timer.Sample startTimer = this.metrics.startTimer();
            addMicrometerSample(commandContext, startTimer);
            Tags.COMPONENT.set(commandContext.getTracingSpan(), getTypeName());
            Command command = commandContext.getCommand();
            Future tenantConfiguration = getTenantConfiguration(resourceIdentifier.getTenantId(), commandContext.getTracingContext());
            tenantConfiguration.compose(tenantObject -> {
                return !command.isValid() ? Future.failedFuture(new ClientErrorException(400, "malformed command message")) : !protonSender.isOpen() ? Future.failedFuture(new ServerErrorException(503, "sender link is not open")) : checkMessageLimit(tenantObject, command.getPayloadSize(), commandContext.getTracingContext());
            }).compose(r10 -> {
                return (device == null || device.getDeviceId().equals(resourceIdentifier.getResourceId())) ? Future.succeededFuture() : getRegistrationAssertion(device.getTenantId(), resourceIdentifier.getResourceId(), device, commandContext.getTracingContext());
            }).compose(registrationAssertion -> {
                onCommandReceived((TenantObject) tenantConfiguration.result(), protonSender, commandContext);
                return Future.succeededFuture();
            }).otherwise(th -> {
                if (th instanceof ClientErrorException) {
                    commandContext.reject(getErrorCondition(th).getDescription());
                } else {
                    TracingHelper.logError(commandContext.getTracingSpan(), th);
                    commandContext.release();
                }
                this.metrics.reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, resourceIdentifier.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), command.getPayloadSize(), startTimer);
                return null;
            });
        };
        Future future = (Future) Optional.ofNullable(device).map(device2 -> {
            return getRegistrationAssertion(device.getTenantId(), resourceIdentifier.getResourceId(), device, span.context());
        }).orElseGet(Future::succeededFuture);
        return (device == null || device.getDeviceId().equals(resourceIdentifier.getResourceId())) ? future.compose(registrationAssertion -> {
            return getCommandConsumerFactory().createCommandConsumer(resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), handler, (Duration) null, span.context());
        }) : future.compose(registrationAssertion2 -> {
            return getCommandConsumerFactory().createCommandConsumer(resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), device.getDeviceId(), handler, (Duration) null, span.context());
        });
    }

    protected void onCommandReceived(TenantObject tenantObject, ProtonSender protonSender, CommandContext commandContext) {
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(protonSender);
        Objects.requireNonNull(commandContext);
        Command command = commandContext.getCommand();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        if (protonSender.sendQueueFull()) {
            this.log.debug("cannot send command to device: no credit available [{}]", command);
            TracingHelper.logError(commandContext.getTracingSpan(), new ServerErrorException(503, "no credit available for sending command to device"));
            commandContext.release();
            reportSentCommand(tenantObject, commandContext, MetricsTags.ProcessingOutcome.UNDELIVERABLE);
            return;
        }
        Message message = ProtonHelper.message();
        message.setAddress(String.format("%s/%s/%s", "command", command.getTenant(), command.getOriginalDeviceId()));
        message.setCorrelationId(command.getCorrelationId());
        message.setSubject(command.getName());
        MessageHelper.setPayload(message, command.getContentType(), command.getPayload());
        if (command.isTargetedAtGateway()) {
            MessageHelper.addDeviceId(message, command.getOriginalDeviceId());
        }
        if (!command.isOneWay()) {
            message.setReplyTo(String.format("%s/%s/%s", "command_response", command.getTenant(), Commands.getDeviceFacingReplyToId(command.getReplyToId(), command.getOriginalDeviceId())));
        }
        Long valueOf = ((AmqpAdapterProperties) getConfig()).getSendMessageToDeviceTimeout() < 1 ? null : Long.valueOf(this.vertx.setTimer(((AmqpAdapterProperties) getConfig()).getSendMessageToDeviceTimeout(), l -> {
            this.log.debug("waiting for delivery update timed out after {}ms [{}]", Long.valueOf(((AmqpAdapterProperties) getConfig()).getSendMessageToDeviceTimeout()), command);
            if (!atomicBoolean.compareAndSet(false, true)) {
                this.log.trace("command is already settled and downstream application was already notified [{}]", command);
                return;
            }
            TracingHelper.logError(commandContext.getTracingSpan(), new ServerErrorException(503, "timeout waiting for delivery update from device"));
            commandContext.release();
            reportSentCommand(tenantObject, commandContext, MetricsTags.ProcessingOutcome.UNDELIVERABLE);
        }));
        protonSender.send(message, protonDelivery -> {
            if (valueOf != null) {
                this.vertx.cancelTimer(valueOf.longValue());
            }
            if (!atomicBoolean.compareAndSet(false, true)) {
                this.log.trace("command is already settled and downstream application was already notified [{}]", command);
                return;
            }
            Rejected remoteState = protonDelivery.getRemoteState();
            MetricsTags.ProcessingOutcome processingOutcome = null;
            if (!protonDelivery.remotelySettled()) {
                this.log.debug("device did not settle command message [remote state: {}, {}]", remoteState, command);
                HashMap hashMap = new HashMap(2);
                hashMap.put("event", "device did not settle command");
                hashMap.put("remote state", remoteState);
                commandContext.getTracingSpan().log(hashMap);
                commandContext.release();
                processingOutcome = MetricsTags.ProcessingOutcome.UNDELIVERABLE;
            } else if (Accepted.class.isInstance(remoteState)) {
                processingOutcome = MetricsTags.ProcessingOutcome.FORWARDED;
                commandContext.accept();
            } else if (Rejected.class.isInstance(remoteState)) {
                processingOutcome = MetricsTags.ProcessingOutcome.UNPROCESSABLE;
                commandContext.reject((String) Optional.ofNullable(remoteState.getError()).map((v0) -> {
                    return v0.getDescription();
                }).orElse(null));
            } else if (Released.class.isInstance(remoteState)) {
                processingOutcome = MetricsTags.ProcessingOutcome.UNDELIVERABLE;
                commandContext.release();
            } else if (Modified.class.isInstance(remoteState)) {
                Modified modified = (Modified) remoteState;
                processingOutcome = modified.getUndeliverableHere().booleanValue() ? MetricsTags.ProcessingOutcome.UNPROCESSABLE : MetricsTags.ProcessingOutcome.UNDELIVERABLE;
                commandContext.modify(modified.getDeliveryFailed().booleanValue(), modified.getUndeliverableHere().booleanValue());
            }
            reportSentCommand(tenantObject, commandContext, processingOutcome);
        });
        HashMap hashMap = new HashMap(4);
        hashMap.put("event", "command sent to device");
        if (protonSender.getRemoteTarget() != null) {
            hashMap.put(Tags.MESSAGE_BUS_DESTINATION.getKey(), protonSender.getRemoteTarget().getAddress());
        }
        hashMap.put(TracingHelper.TAG_QOS.getKey(), protonSender.getQoS().name());
        hashMap.put(TracingHelper.TAG_CREDIT.getKey(), Integer.valueOf(protonSender.getCredit()));
        commandContext.getTracingSpan().log(hashMap);
    }

    private void reportSentCommand(TenantObject tenantObject, CommandContext commandContext, MetricsTags.ProcessingOutcome processingOutcome) {
        this.metrics.reportCommand(commandContext.getCommand().isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, commandContext.getCommand().getTenant(), tenantObject, processingOutcome, commandContext.getCommand().getPayloadSize(), getMicrometerSample(commandContext));
    }

    private <T extends ProtonLink<T>> void closeLinkWithError(ProtonLink<T> protonLink, Throwable th, Span span) {
        ErrorCondition errorCondition = getErrorCondition(th);
        this.log.debug("closing link with error condition [symbol: {}, description: {}]", errorCondition.getCondition(), errorCondition.getDescription());
        protonLink.setCondition(errorCondition);
        protonLink.close();
        if (span != null) {
            TracingHelper.logError(span, th);
        }
    }

    private Future<?> uploadMessage(AmqpContext amqpContext, ResourceIdentifier resourceIdentifier, Span span) {
        switch (AnonymousClass1.$SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[amqpContext.getEndpoint().ordinal()]) {
            case 1:
            case 2:
                Promise promise = Promise.promise();
                if (isPayloadOfIndicatedType(amqpContext.getMessagePayload(), amqpContext.getMessageContentType())) {
                    promise.complete();
                } else {
                    promise.fail(new ClientErrorException(400, "empty notifications must not contain payload"));
                }
                return promise.future().compose(r9 -> {
                    return doUploadMessage(amqpContext, resourceIdentifier, span);
                });
            case 3:
                return doUploadCommandResponseMessage(amqpContext, resourceIdentifier, span);
            default:
                return Future.failedFuture(new ClientErrorException(404, "unknown endpoint"));
        }
    }

    private Future<?> doUploadMessage(AmqpContext amqpContext, ResourceIdentifier resourceIdentifier, Span span) {
        this.log.trace("forwarding {} message", amqpContext.getEndpoint().getCanonicalName());
        Future registrationAssertion = getRegistrationAssertion(resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), amqpContext.getAuthenticatedDevice(), span.context());
        Future tenantConfiguration = getTenantConfiguration(resourceIdentifier.getTenantId(), span.context());
        Future compose = tenantConfiguration.compose(tenantObject -> {
            return CompositeFuture.all(isAdapterEnabled(tenantObject), checkMessageLimit(tenantObject, amqpContext.getPayloadSize(), span.context())).map(compositeFuture -> {
                return tenantObject;
            });
        });
        return CompositeFuture.all(compose, registrationAssertion).compose(compositeFuture -> {
            Map downstreamMessageProperties = getDownstreamMessageProperties(amqpContext);
            return amqpContext.getEndpoint() == MetricsTags.EndpointType.TELEMETRY ? getTelemetrySender().sendTelemetry((TenantObject) compose.result(), (RegistrationAssertion) registrationAssertion.result(), amqpContext.getRequestedQos(), amqpContext.getMessageContentType(), amqpContext.getMessagePayload(), downstreamMessageProperties, span.context()) : getEventSender().sendEvent((TenantObject) compose.result(), (RegistrationAssertion) registrationAssertion.result(), amqpContext.getMessageContentType(), amqpContext.getMessagePayload(), downstreamMessageProperties, span.context());
        }).recover(th -> {
            this.log.debug("cannot process {} message from device [tenant: {}, device-id: {}]", new Object[]{amqpContext.getEndpoint().getCanonicalName(), resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), th});
            this.metrics.reportTelemetry(amqpContext.getEndpoint(), resourceIdentifier.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), amqpContext.isRemotelySettled() ? MetricsTags.QoS.AT_MOST_ONCE : MetricsTags.QoS.AT_LEAST_ONCE, amqpContext.getPayloadSize(), amqpContext.getTimer());
            return Future.failedFuture(th);
        }).map(r13 -> {
            this.metrics.reportTelemetry(amqpContext.getEndpoint(), resourceIdentifier.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, amqpContext.isRemotelySettled() ? MetricsTags.QoS.AT_MOST_ONCE : MetricsTags.QoS.AT_LEAST_ONCE, amqpContext.getPayloadSize(), amqpContext.getTimer());
            return r13;
        });
    }

    private CommandResponse getCommandResponse(Message message) {
        return CommandResponse.fromCorrelationId(message.getCorrelationId() instanceof String ? (String) message.getCorrelationId() : null, message.getAddress(), MessageHelper.getPayload(message), message.getContentType(), MessageHelper.getStatus(message));
    }

    private Future<Void> doUploadCommandResponseMessage(AmqpContext amqpContext, ResourceIdentifier resourceIdentifier, Span span) {
        Future future = (Future) Optional.ofNullable(getCommandResponse(amqpContext.getMessage())).map((v0) -> {
            return Future.succeededFuture(v0);
        }).orElseGet(() -> {
            TracingHelper.logError(span, String.format("invalid message (correlationId: %s, address: %s, status: %s)", amqpContext.getMessage().getCorrelationId(), amqpContext.getMessage().getAddress(), MessageHelper.getStatus(amqpContext.getMessage())));
            return Future.failedFuture(new ClientErrorException(400, "malformed command response message"));
        });
        Future tenantConfiguration = getTenantConfiguration(resourceIdentifier.getTenantId(), span.context());
        return CompositeFuture.all(tenantConfiguration, future).compose(compositeFuture -> {
            CommandResponse commandResponse = (CommandResponse) future.result();
            this.log.trace("sending command response [device-id: {}, status: {}, correlation-id: {}, reply-to: {}]", new Object[]{resourceIdentifier.getResourceId(), Integer.valueOf(commandResponse.getStatus()), commandResponse.getCorrelationId(), commandResponse.getReplyToId()});
            HashMap hashMap = new HashMap(3);
            hashMap.put("event", "sending command response");
            hashMap.put(TracingHelper.TAG_CORRELATION_ID.getKey(), commandResponse.getCorrelationId());
            hashMap.put("status", Integer.valueOf(commandResponse.getStatus()));
            span.log(hashMap);
            return CompositeFuture.all(CompositeFuture.all(isAdapterEnabled((TenantObject) tenantConfiguration.result()), checkMessageLimit((TenantObject) tenantConfiguration.result(), amqpContext.getPayloadSize(), span.context())).map(compositeFuture -> {
                return (TenantObject) tenantConfiguration.result();
            }), getRegistrationAssertion(resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), amqpContext.getAuthenticatedDevice(), span.context())).compose(compositeFuture2 -> {
                return sendCommandResponse(commandResponse, span.context());
            });
        }).map(r12 -> {
            this.log.trace("forwarded command response from device [tenant: {}, device-id: {}]", resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId());
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, resourceIdentifier.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, amqpContext.getPayloadSize(), amqpContext.getTimer());
            return r12;
        }).recover(th -> {
            this.log.debug("cannot process command response from device [tenant: {}, device-id: {}]", new Object[]{resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), th});
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, resourceIdentifier.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), amqpContext.getPayloadSize(), amqpContext.getTimer());
            return Future.failedFuture(th);
        });
    }

    private <T extends ProtonLink<T>> void onLinkDetach(ProtonLink<T> protonLink) {
        this.log.debug("closing link [{}]", protonLink.getName());
        protonLink.close();
    }

    Future<AmqpContext> validateEndpoint(AmqpContext amqpContext) {
        Promise promise = Promise.promise();
        if (amqpContext.getAddress() != null) {
            switch (AnonymousClass1.$SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[amqpContext.getEndpoint().ordinal()]) {
                case 1:
                case 3:
                    promise.complete(amqpContext);
                    break;
                case 2:
                    if (!amqpContext.isRemotelySettled()) {
                        promise.complete(amqpContext);
                        break;
                    } else {
                        promise.fail(new ClientErrorException(400, "event endpoint accepts unsettled messages only"));
                        break;
                    }
                default:
                    this.log.debug("device wants to send message for unsupported address [{}]", amqpContext.getAddress());
                    promise.fail(new ClientErrorException(404, "unsupported endpoint"));
                    break;
            }
        } else {
            promise.fail(new ClientErrorException(404));
        }
        return promise.future();
    }

    private static Future<ResourceIdentifier> getResourceIdentifier(Source source) {
        return (source == null || Strings.isNullOrEmpty(source.getAddress())) ? Future.failedFuture(new ClientErrorException(404, "no such node")) : Future.succeededFuture(ResourceIdentifier.fromString(source.getAddress()));
    }

    private static void setConnectionLossHandler(ProtonConnection protonConnection, String str, Function<Span, Future<Void>> function) {
        Map map = (Map) Optional.ofNullable((Map) protonConnection.attachments().get(KEY_CONNECTION_LOSS_HANDLERS, Map.class)).orElseGet(HashMap::new);
        map.put(str, function);
        protonConnection.attachments().set(KEY_CONNECTION_LOSS_HANDLERS, Map.class, map);
    }

    private static Collection<Function<Span, Future<Void>>> getConnectionLossHandlers(ProtonConnection protonConnection) {
        Map map = (Map) protonConnection.attachments().get(KEY_CONNECTION_LOSS_HANDLERS, Map.class);
        return map != null ? map.values() : Collections.emptyList();
    }

    private static boolean removeConnectionLossHandler(ProtonConnection protonConnection, String str) {
        Map map = (Map) protonConnection.attachments().get(KEY_CONNECTION_LOSS_HANDLERS, Map.class);
        return (map == null || map.remove(str) == null) ? false : true;
    }

    private static Device getAuthenticatedDevice(ProtonConnection protonConnection) {
        return (Device) protonConnection.attachments().get(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class);
    }

    private static OptionalInt getTraceSamplingPriority(ProtonConnection protonConnection) {
        return (OptionalInt) Optional.ofNullable((OptionalInt) protonConnection.attachments().get(AmqpAdapterConstants.KEY_TRACE_SAMPLING_PRIORITY, OptionalInt.class)).orElse(OptionalInt.empty());
    }

    private Future<Void> checkConnectionLimitForAdapter() {
        return (getConnectionLimitManager() == null || !getConnectionLimitManager().isLimitExceeded()) ? Future.succeededFuture() : Future.failedFuture(new AdapterConnectionsExceededException((String) null, "connection limit for the adapter exceeded", (Throwable) null));
    }

    public int getPortDefaultValue() {
        return 5671;
    }

    public int getInsecurePortDefaultValue() {
        return 5672;
    }

    protected int getActualPort() {
        if (this.secureServer != null) {
            return this.secureServer.actualPort();
        }
        return -1;
    }

    protected int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }
}
