package org.eclipse.hono.adapter.coap;

import io.micrometer.core.instrument.Timer;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivateKey;
import java.security.cert.Certificate;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.eclipse.californium.core.CoapServer;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
import org.eclipse.californium.core.coap.OptionSet;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.network.CoapEndpoint;
import org.eclipse.californium.core.network.Endpoint;
import org.eclipse.californium.core.network.config.NetworkConfig;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.californium.core.server.resources.Resource;
import org.eclipse.californium.elements.auth.ExtensiblePrincipal;
import org.eclipse.californium.scandium.DTLSConnector;
import org.eclipse.californium.scandium.auth.ApplicationLevelInfoSupplier;
import org.eclipse.californium.scandium.config.DtlsConnectorConfig;
import org.eclipse.californium.scandium.dtls.CertificateType;
import org.eclipse.californium.scandium.dtls.pskstore.AdvancedPskStore;
import org.eclipse.hono.adapter.client.command.Command;
import org.eclipse.hono.adapter.client.command.CommandConsumer;
import org.eclipse.hono.adapter.client.command.CommandContext;
import org.eclipse.hono.adapter.client.command.CommandResponse;
import org.eclipse.hono.adapter.coap.CoapAdapterProperties;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.config.KeyLoader;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.limiting.MemoryBasedConnectionLimitStrategy;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Futures;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.TenantObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/adapter/coap/AbstractVertxBasedCoapAdapter.class */
public abstract class AbstractVertxBasedCoapAdapter<T extends CoapAdapterProperties> extends AbstractProtocolAdapterBase<T> {
    private static final String KEY_TIMER_ID = "timerId";
    private static final int MINIMAL_MEMORY = 100000000;
    private static final int MEMORY_PER_CONNECTION = 10000;
    private CoapServer server;
    private ApplicationLevelInfoSupplier honoDeviceResolver;
    private AdvancedPskStore pskStore;
    private volatile Endpoint secureEndpoint;
    private volatile Endpoint insecureEndpoint;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private final Set<Resource> resourcesToAdd = new HashSet();
    private CoapAdapterMetrics metrics = CoapAdapterMetrics.NOOP;

    public final void setHonoDeviceResolver(ApplicationLevelInfoSupplier applicationLevelInfoSupplier) {
        this.honoDeviceResolver = (ApplicationLevelInfoSupplier) Objects.requireNonNull(applicationLevelInfoSupplier);
    }

    public final void setPskStore(AdvancedPskStore advancedPskStore) {
        this.pskStore = (AdvancedPskStore) Objects.requireNonNull(advancedPskStore);
    }

    public final void setMetrics(CoapAdapterMetrics coapAdapterMetrics) {
        Optional.ofNullable(coapAdapterMetrics).ifPresent(coapAdapterMetrics2 -> {
            this.log.info("reporting metrics using [{}]", coapAdapterMetrics.getClass().getName());
        });
        this.metrics = coapAdapterMetrics;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final CoapAdapterMetrics getMetrics() {
        return this.metrics;
    }

    public final void setResources(Set<Resource> set) {
        this.resourcesToAdd.addAll((Collection) Objects.requireNonNull(set));
    }

    public final int getPortDefaultValue() {
        return 5684;
    }

    public final int getInsecurePortDefaultValue() {
        return 5683;
    }

    protected final int getActualPort() {
        int i = -1;
        Endpoint endpoint = this.secureEndpoint;
        if (endpoint != null) {
            i = endpoint.getAddress().getPort();
        }
        return i;
    }

    protected final int getActualInsecurePort() {
        int i = -1;
        Endpoint endpoint = this.insecureEndpoint;
        if (endpoint != null) {
            i = endpoint.getAddress().getPort();
        }
        return i;
    }

    public final void setCoapServer(CoapServer coapServer) {
        Objects.requireNonNull(coapServer);
        this.server = coapServer;
    }

    public final void doStart(Promise<Void> promise) {
        ((Future) Optional.ofNullable(this.server).map(coapServer -> {
            return Future.succeededFuture(coapServer);
        }).orElseGet(this::createServer)).compose(coapServer2 -> {
            return preStartup().map(coapServer2);
        }).map(coapServer3 -> {
            addResources(coapServer3);
            return coapServer3;
        }).compose(coapServer4 -> {
            return Futures.executeBlocking(this.vertx, () -> {
                coapServer4.start();
                return coapServer4;
            });
        }).compose(coapServer5 -> {
            try {
                onStartupSuccess();
                return Future.succeededFuture((Void) null);
            } catch (Exception e) {
                this.log.error("error executing onStartupSuccess", e);
                return Future.failedFuture(e);
            }
        }).onComplete(promise);
    }

    private Future<CoapServer> createServer() {
        return checkCoapPortConfiguration().compose(r6 -> {
            Future succeededFuture;
            Future succeededFuture2;
            this.log.info("creating new CoAP server");
            CoapServer coapServer = new CoapServer(NetworkConfig.createStandardWithoutFile(), new int[0]);
            if (isSecurePortEnabled()) {
                succeededFuture = createSecureEndpoint().map(endpoint -> {
                    coapServer.addEndpoint(endpoint);
                    this.secureEndpoint = endpoint;
                    return endpoint;
                });
            } else {
                this.log.info("neither key/cert nor secure port are configured, won't start secure endpoint");
                succeededFuture = Future.succeededFuture();
            }
            if (!isInsecurePortEnabled()) {
                this.log.info("insecure port is not configured, won't start insecure endpoint");
                succeededFuture2 = Future.succeededFuture();
            } else if (((CoapAdapterProperties) getConfig()).isAuthenticationRequired()) {
                this.log.warn("skipping start up of insecure endpoint, configuration requires authentication of devices");
                succeededFuture2 = Future.succeededFuture();
            } else {
                succeededFuture2 = createInsecureEndpoint().map(endpoint2 -> {
                    coapServer.addEndpoint(endpoint2);
                    this.insecureEndpoint = endpoint2;
                    return endpoint2;
                });
            }
            return CompositeFuture.all(succeededFuture2, succeededFuture).map(compositeFuture -> {
                this.server = coapServer;
                return coapServer;
            });
        });
    }

    protected boolean isSecurePortEnabled() {
        return ((CoapAdapterProperties) getConfig()).isSecurePortEnabled() || ((CoapAdapterProperties) getConfig()).getPort() > -1;
    }

    private Future<Void> checkCoapPortConfiguration() {
        Promise promise = Promise.promise();
        boolean isSecurePortEnabled = isSecurePortEnabled();
        int port = isSecurePortEnabled ? ((CoapAdapterProperties) getConfig()).getPort(getPortDefaultValue()) : -1;
        if (isSecurePortEnabled) {
            if (((CoapAdapterProperties) getConfig()).isInsecurePortEnabled() && port == ((CoapAdapterProperties) getConfig()).getInsecurePort(getInsecurePortDefaultValue())) {
                this.log.error("secure and insecure ports must be configured to bind to different port numbers");
                promise.fail("secure and insecure ports configured to bind to same port number");
            } else {
                if (((CoapAdapterProperties) getConfig()).getKeyCertOptions() == null) {
                    this.log.warn("secure port configured, but no certificate/key is set. Will only enable ciphers that do not require server certificate!");
                }
                promise.complete();
            }
        } else if (((CoapAdapterProperties) getConfig()).isInsecurePortEnabled()) {
            promise.complete();
        } else {
            this.log.error("configuration must have at least one of secure or insecure port set to start up");
            promise.fail("no ports configured");
        }
        return promise.future();
    }

    private void addResources(CoapServer coapServer) {
        this.resourcesToAdd.forEach(resource -> {
            this.log.info("adding resource to CoAP server [name: {}]", resource.getName());
            coapServer.add(new Resource[]{new VertxCoapResource(resource, this.context)});
        });
        this.resourcesToAdd.clear();
    }

    private void addIdentity(DtlsConnectorConfig.Builder builder) {
        KeyLoader fromFiles = KeyLoader.fromFiles(this.vertx, ((CoapAdapterProperties) getConfig()).getKeyPath(), ((CoapAdapterProperties) getConfig()).getCertPath());
        PrivateKey privateKey = fromFiles.getPrivateKey();
        Certificate[] certificateChain = fromFiles.getCertificateChain();
        if (privateKey == null || certificateChain == null) {
            return;
        }
        if (!privateKey.getAlgorithm().equals("EC")) {
            this.log.warn("configured key is not ECC based, certificate based cipher suites will be disabled");
        } else {
            this.log.info("using private key [{}] and certificate [{}] as server identity", ((CoapAdapterProperties) getConfig()).getKeyPath(), ((CoapAdapterProperties) getConfig()).getCertPath());
            builder.setIdentity(privateKey, certificateChain, new CertificateType[0]);
        }
    }

    private Future<Endpoint> createSecureEndpoint() {
        return getSecureNetworkConfig().compose(networkConfig -> {
            return createSecureEndpoint(networkConfig);
        });
    }

    private Future<Endpoint> createSecureEndpoint(NetworkConfig networkConfig) {
        ApplicationLevelInfoSupplier applicationLevelInfoSupplier = (ApplicationLevelInfoSupplier) Optional.ofNullable(this.honoDeviceResolver).orElseGet(() -> {
            return new DefaultDeviceResolver(this.context, this.tracer, getTypeName(), (CoapAdapterProperties) getConfig(), getCredentialsClient(), getTenantClient());
        });
        AdvancedPskStore advancedPskStore = (AdvancedPskStore) Optional.ofNullable(this.pskStore).orElseGet(() -> {
            return applicationLevelInfoSupplier instanceof AdvancedPskStore ? (AdvancedPskStore) applicationLevelInfoSupplier : new DefaultDeviceResolver(this.context, this.tracer, getTypeName(), (CoapAdapterProperties) getConfig(), getCredentialsClient(), getTenantClient());
        });
        DtlsConnectorConfig.Builder builder = new DtlsConnectorConfig.Builder();
        builder.setServerOnly(true);
        builder.setRecommendedCipherSuitesOnly(true);
        builder.setClientAuthenticationRequired(((CoapAdapterProperties) getConfig()).isAuthenticationRequired());
        builder.setAddress(new InetSocketAddress(((CoapAdapterProperties) getConfig()).getBindAddress(), ((CoapAdapterProperties) getConfig()).getPort(getPortDefaultValue())));
        builder.setApplicationLevelInfoSupplier(applicationLevelInfoSupplier);
        builder.setAdvancedPskStore(advancedPskStore);
        builder.setRetransmissionTimeout(((CoapAdapterProperties) getConfig()).getDtlsRetransmissionTimeout());
        builder.setMaxConnections(networkConfig.getInt("MAX_ACTIVE_PEERS"));
        addIdentity(builder);
        try {
            DtlsConnectorConfig build = builder.build();
            if (this.log.isInfoEnabled()) {
                this.log.info("creating secure endpoint supporting ciphers: {}", (String) build.getSupportedCipherSuites().stream().map(cipherSuite -> {
                    return cipherSuite.name();
                }).collect(Collectors.joining(", ")));
            }
            DTLSConnector dTLSConnector = new DTLSConnector(build);
            CoapEndpoint.Builder builder2 = new CoapEndpoint.Builder();
            builder2.setNetworkConfig(networkConfig);
            builder2.setConnector(dTLSConnector);
            return Future.succeededFuture(builder2.build());
        } catch (IllegalStateException e) {
            this.log.warn("failed to create secure endpoint", e);
            return Future.failedFuture(e);
        }
    }

    private Future<Endpoint> createInsecureEndpoint() {
        this.log.info("creating insecure endpoint");
        return getInsecureNetworkConfig().map(networkConfig -> {
            CoapEndpoint.Builder builder = new CoapEndpoint.Builder();
            builder.setNetworkConfig(networkConfig);
            builder.setInetSocketAddress(new InetSocketAddress(((CoapAdapterProperties) getConfig()).getInsecurePortBindAddress(), ((CoapAdapterProperties) getConfig()).getInsecurePort(getInsecurePortDefaultValue())));
            return builder.build();
        });
    }

    protected Future<Void> preStartup() {
        return Future.succeededFuture();
    }

    protected void onStartupSuccess() {
    }

    private NetworkConfig newDefaultNetworkConfig() {
        NetworkConfig networkConfig = new NetworkConfig();
        networkConfig.setInt("PROTOCOL_STAGE_THREAD_COUNT", ((CoapAdapterProperties) getConfig()).getCoapThreads());
        networkConfig.setInt("NETWORK_STAGE_RECEIVER_THREAD_COUNT", ((CoapAdapterProperties) getConfig()).getConnectorThreads());
        networkConfig.setInt("NETWORK_STAGE_SENDER_THREAD_COUNT", ((CoapAdapterProperties) getConfig()).getConnectorThreads());
        networkConfig.setInt("MAX_RESOURCE_BODY_SIZE", ((CoapAdapterProperties) getConfig()).getMaxPayloadSize());
        networkConfig.setInt("EXCHANGE_LIFETIME", ((CoapAdapterProperties) getConfig()).getExchangeLifetime());
        networkConfig.setBoolean("USE_MESSAGE_OFFLOADING", ((CoapAdapterProperties) getConfig()).isMessageOffloadingEnabled());
        networkConfig.setString("DEDUPLICATOR", "DEDUPLICATOR_PEERS_MARK_AND_SWEEP");
        int maxConnections = ((CoapAdapterProperties) getConfig()).getMaxConnections();
        if (maxConnections == 0) {
            networkConfig.setInt("MAX_ACTIVE_PEERS", new MemoryBasedConnectionLimitStrategy(100000000L, 10000L).getRecommendedLimit());
        } else {
            networkConfig.setInt("MAX_ACTIVE_PEERS", maxConnections);
        }
        return networkConfig;
    }

    protected Future<NetworkConfig> getSecureNetworkConfig() {
        NetworkConfig newDefaultNetworkConfig = newDefaultNetworkConfig();
        newDefaultNetworkConfig.setInt("NETWORK_STAGE_SENDER_THREAD_COUNT", ((CoapAdapterProperties) getConfig()).getDtlsThreads());
        return loadNetworkConfig(((CoapAdapterProperties) getConfig()).getNetworkConfig(), newDefaultNetworkConfig).compose(networkConfig -> {
            return loadNetworkConfig(((CoapAdapterProperties) getConfig()).getSecureNetworkConfig(), networkConfig);
        });
    }

    protected Future<NetworkConfig> getInsecureNetworkConfig() {
        return loadNetworkConfig(((CoapAdapterProperties) getConfig()).getNetworkConfig(), newDefaultNetworkConfig()).compose(networkConfig -> {
            return loadNetworkConfig(((CoapAdapterProperties) getConfig()).getInsecureNetworkConfig(), networkConfig);
        });
    }

    protected Future<NetworkConfig> loadNetworkConfig(String str, NetworkConfig networkConfig) {
        if (str != null && !str.isEmpty()) {
            getVertx().fileSystem().readFile(str, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    this.log.warn("error reading NetworkConfig file [{}]", str, asyncResult.cause());
                    return;
                }
                try {
                    ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(((Buffer) asyncResult.result()).getBytes());
                    try {
                        networkConfig.load(byteArrayInputStream);
                        byteArrayInputStream.close();
                    } finally {
                    }
                } catch (IOException e) {
                    this.log.warn("skipping malformed NetworkConfig properties [{}]", str);
                }
            });
        }
        return Future.succeededFuture(networkConfig);
    }

    protected void customizeDownstreamMessageProperties(Map<String, Object> map, CoapContext coapContext) {
    }

    public final void doStop(Promise<Void> promise) {
        try {
            preShutdown();
        } catch (Exception e) {
            this.log.error("error in preShutdown", e);
        }
        Futures.executeBlocking(this.vertx, () -> {
            if (this.server != null) {
                this.server.stop();
            }
            return (Void) null;
        }).compose(r3 -> {
            return postShutdown();
        }).onComplete(promise);
    }

    protected void preShutdown() {
    }

    protected Future<Void> postShutdown() {
        return Future.succeededFuture();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Future<Device> getAuthenticatedDevice(CoapExchange coapExchange) {
        Promise promise = Promise.promise();
        ExtensiblePrincipal peerIdentity = coapExchange.advanced().getRequest().getSourceContext().getPeerIdentity();
        if (peerIdentity instanceof ExtensiblePrincipal) {
            Device device = (Device) peerIdentity.getExtendedInfo().get(DefaultDeviceResolver.EXT_INFO_KEY_HONO_DEVICE, Device.class);
            if (device != null) {
                promise.complete(device);
            } else {
                promise.fail(new ClientErrorException(401, "DTLS session does not contain authenticated Device"));
            }
        } else {
            promise.fail(new ClientErrorException(401, "DTLS session does not contain ExtensiblePrincipal"));
        }
        return promise.future();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String getAuthId(CoapExchange coapExchange) {
        ExtensiblePrincipal peerIdentity = coapExchange.advanced().getRequest().getSourceContext().getPeerIdentity();
        if (peerIdentity instanceof ExtensiblePrincipal) {
            return (String) peerIdentity.getExtendedInfo().get(DefaultDeviceResolver.EXT_INFO_KEY_HONO_AUTH_ID, String.class);
        }
        return null;
    }

    public final Future<CoAP.ResponseCode> uploadTelemetryMessage(CoapContext coapContext) {
        return doUploadMessage(coapContext, MetricsTags.EndpointType.TELEMETRY);
    }

    public final Future<CoAP.ResponseCode> uploadEventMessage(CoapContext coapContext) {
        Objects.requireNonNull(coapContext);
        if (coapContext.isConfirmable()) {
            return doUploadMessage(coapContext, MetricsTags.EndpointType.EVENT);
        }
        coapContext.respondWithCode(CoAP.ResponseCode.BAD_REQUEST, "event endpoint supports confirmable request messages only");
        return Future.succeededFuture(CoAP.ResponseCode.BAD_REQUEST);
    }

    private Future<CoAP.ResponseCode> doUploadMessage(CoapContext coapContext, MetricsTags.EndpointType endpointType) {
        Objects.requireNonNull(coapContext);
        Objects.requireNonNull(endpointType);
        String contentType = coapContext.getContentType();
        Buffer payload = coapContext.getPayload();
        if (contentType == null) {
            coapContext.respondWithCode(CoAP.ResponseCode.BAD_REQUEST, "request message must contain content-format option");
            return Future.succeededFuture(CoAP.ResponseCode.BAD_REQUEST);
        }
        if (payload.length() == 0 && !coapContext.isEmptyNotification()) {
            coapContext.respondWithCode(CoAP.ResponseCode.BAD_REQUEST, "request contains no body but is not marked as empty notification");
            return Future.succeededFuture(CoAP.ResponseCode.BAD_REQUEST);
        }
        String gatewayId = coapContext.getGatewayId();
        String tenantId = coapContext.getOriginDevice().getTenantId();
        String deviceId = coapContext.getOriginDevice().getDeviceId();
        MetricsTags.QoS qoS = coapContext.isConfirmable() ? MetricsTags.QoS.AT_LEAST_ONCE : MetricsTags.QoS.AT_MOST_ONCE;
        Span start = TracingHelper.buildChildSpan(this.tracer, coapContext.getTracingContext(), "upload " + endpointType.getCanonicalName(), getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag(TracingHelper.TAG_TENANT_ID, tenantId).withTag(TracingHelper.TAG_DEVICE_ID, deviceId).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), coapContext.isDeviceAuthenticated()).withTag("QoS-Level", qoS.asTag().getValue()).start();
        Promise promise = Promise.promise();
        Future registrationAssertion = getRegistrationAssertion(tenantId, deviceId, coapContext.getAuthenticatedDevice(), start.context());
        Future tenantConfiguration = getTenantConfiguration(tenantId, start.context());
        Future compose = CompositeFuture.all(tenantConfiguration.compose(tenantObject -> {
            return CompositeFuture.all(isAdapterEnabled(tenantObject), checkMessageLimit(tenantObject, payload.length(), start.context())).map(tenantObject);
        }), registrationAssertion).compose(compositeFuture -> {
            return getTimeUntilDisconnect((TenantObject) tenantConfiguration.result(), coapContext.getTimeUntilDisconnect()).map(num -> {
                if (num != null) {
                    start.setTag("ttd", num);
                }
                return num;
            });
        });
        Future compose2 = compose.compose(num -> {
            return createCommandConsumer(num, (TenantObject) tenantConfiguration.result(), deviceId, gatewayId, coapContext, promise, start);
        });
        return compose2.compose(commandConsumer -> {
            Map<String, Object> downstreamMessageProperties = getDownstreamMessageProperties(coapContext);
            Optional.ofNullable((CommandConsumer) compose2.result()).map(commandConsumer -> {
                return (Integer) compose.result();
            }).ifPresent(num2 -> {
                downstreamMessageProperties.put("ttd", num2);
            });
            customizeDownstreamMessageProperties(downstreamMessageProperties, coapContext);
            if (coapContext.isConfirmable()) {
                coapContext.startAcceptTimer(this.vertx, (TenantObject) tenantConfiguration.result(), ((CoapAdapterProperties) getConfig()).getTimeoutToAck());
            }
            return CompositeFuture.all(endpointType == MetricsTags.EndpointType.EVENT ? getEventSender().sendEvent((TenantObject) tenantConfiguration.result(), (RegistrationAssertion) registrationAssertion.result(), contentType, payload, downstreamMessageProperties, start.context()) : getTelemetrySender().sendTelemetry((TenantObject) tenantConfiguration.result(), (RegistrationAssertion) registrationAssertion.result(), coapContext.getRequestedQos(), contentType, payload, downstreamMessageProperties, start.context()), promise.future()).mapEmpty();
        }).map(obj -> {
            Future onFailure = compose2.result() != null ? ((CommandConsumer) compose2.result()).close(start.context()).onFailure(th -> {
                TracingHelper.logError(start, th);
            }) : Future.succeededFuture();
            CommandContext commandContext = (CommandContext) coapContext.get("command-context");
            Response response = new Response(CoAP.ResponseCode.CHANGED);
            if (commandContext != null) {
                addCommandToResponse(response, commandContext, start);
                commandContext.accept();
                this.metrics.reportCommand(commandContext.getCommand().isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantId, (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, commandContext.getCommand().getPayloadSize(), coapContext.getTimer());
            }
            this.log.trace("successfully processed message for device [tenantId: {}, deviceId: {}, endpoint: {}]", new Object[]{tenantId, deviceId, endpointType.getCanonicalName()});
            this.metrics.reportTelemetry(endpointType, tenantId, (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, qoS, payload.length(), getTtdStatus(coapContext), coapContext.getTimer());
            coapContext.getExchange().respond(response);
            onFailure.onComplete(asyncResult -> {
                start.finish();
            });
            return response.getCode();
        }).recover(th -> {
            this.log.debug("cannot process message from device [tenantId: {}, deviceId: {}, endpoint: {}]", new Object[]{tenantId, deviceId, endpointType.getCanonicalName(), th});
            Future onFailure = compose2.result() != null ? ((CommandConsumer) compose2.result()).close(start.context()).onFailure(th -> {
                TracingHelper.logError(start, th);
            }) : Future.succeededFuture();
            CommandContext commandContext = (CommandContext) coapContext.get("command-context");
            if (commandContext != null) {
                TracingHelper.logError(commandContext.getTracingSpan(), "command won't be forwarded to device in CoAP response, CoAP request handling failed", th);
                commandContext.release();
                start.log("released command for device");
            }
            this.metrics.reportTelemetry(endpointType, tenantId, (TenantObject) tenantConfiguration.result(), ClientErrorException.class.isInstance(th) ? MetricsTags.ProcessingOutcome.UNPROCESSABLE : MetricsTags.ProcessingOutcome.UNDELIVERABLE, qoS, payload.length(), getTtdStatus(coapContext), coapContext.getTimer());
            TracingHelper.logError(start, th);
            CoAP.ResponseCode respond = coapContext.respond(CoapErrorResponse.respond(th, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
            onFailure.onComplete(asyncResult -> {
                start.finish();
            });
            return Future.succeededFuture(respond);
        });
    }

    protected void addCommandToResponse(Response response, CommandContext commandContext, Span span) {
        Command command = commandContext.getCommand();
        OptionSet options = response.getOptions();
        options.addLocationQuery("hono-command=" + command.getName());
        if (command.isOneWay()) {
            options.setLocationPath("command");
        } else {
            options.setLocationPath("command_response");
        }
        span.setTag("hono-command", command.getName());
        this.log.debug("adding command [name: {}, request-id: {}] to response for device [tenant-id: {}, device-id: {}]", new Object[]{command.getName(), command.getRequestId(), command.getTenant(), command.getGatewayOrDeviceId()});
        commandContext.getTracingSpan().log("forwarding command to device in CoAP response");
        if (command.isTargetedAtGateway()) {
            options.addLocationPath(command.getTenant());
            options.addLocationPath(command.getDeviceId());
            span.setTag("hono-cmd-target-device", command.getDeviceId());
        }
        if (!command.isOneWay()) {
            options.addLocationPath(command.getRequestId());
            span.setTag("hono-cmd-req-id", command.getRequestId());
        }
        int parse = MediaTypeRegistry.parse(command.getContentType());
        if (parse != -1) {
            options.setContentFormat(parse);
        } else {
            span.log("ignoring unknown content type [" + command.getContentType() + "] of command");
        }
        Optional.ofNullable(command.getPayload()).ifPresent(buffer -> {
            response.setPayload(buffer.getBytes());
        });
    }

    protected final Future<CommandConsumer> createCommandConsumer(Integer num, TenantObject tenantObject, String str, String str2, CoapContext coapContext, Handler<AsyncResult<Void>> handler, Span span) {
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(str);
        Objects.requireNonNull(coapContext);
        Objects.requireNonNull(handler);
        Objects.requireNonNull(span);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        if (num == null || num.intValue() <= 0) {
            if (atomicBoolean.compareAndSet(false, true)) {
                handler.handle(Future.succeededFuture());
            }
            return Future.succeededFuture();
        }
        span.setTag("ttd", num);
        Span start = TracingHelper.buildChildSpan(this.tracer, span.context(), "wait for command", getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag(TracingHelper.TAG_TENANT_ID, tenantObject.getTenantId()).withTag(TracingHelper.TAG_DEVICE_ID, str).start();
        Handler handler2 = commandContext -> {
            Tags.COMPONENT.set(commandContext.getTracingSpan(), getTypeName());
            commandContext.logCommandToSpan(start);
            Command command = commandContext.getCommand();
            Timer.Sample startTimer = getMetrics().startTimer();
            if (!isCommandValid(command, start)) {
                getMetrics().reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantObject.getTenantId(), tenantObject, MetricsTags.ProcessingOutcome.UNPROCESSABLE, command.getPayloadSize(), startTimer);
                this.log.debug("command message is invalid: {}", command);
                commandContext.reject("malformed command message");
            } else {
                if (atomicBoolean.compareAndSet(false, true)) {
                    checkMessageLimit(tenantObject, command.getPayloadSize(), start.context()).onComplete(asyncResult -> {
                        if (asyncResult.succeeded()) {
                            addMicrometerSample(commandContext, startTimer);
                            coapContext.put("command-context", commandContext);
                        } else {
                            commandContext.reject(asyncResult.cause().getMessage());
                            TracingHelper.logError(start, "rejected command for device", asyncResult.cause());
                            this.metrics.reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantObject.getTenantId(), tenantObject, MetricsTags.ProcessingOutcome.from(asyncResult.cause()), command.getPayloadSize(), startTimer);
                        }
                        cancelCommandReceptionTimer(coapContext);
                        setTtdStatus(coapContext, MetricsTags.TtdStatus.COMMAND);
                        handler.handle(Future.succeededFuture());
                    });
                    return;
                }
                this.log.debug("waiting time for command has elapsed or another command has already been processed [tenantId: {}, deviceId: {}]", tenantObject.getTenantId(), str);
                getMetrics().reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantObject.getTenantId(), tenantObject, MetricsTags.ProcessingOutcome.UNDELIVERABLE, command.getPayloadSize(), startTimer);
                TracingHelper.logError(commandContext.getTracingSpan(), "waiting time for command has elapsed or another command has already been processed");
                commandContext.release();
            }
        };
        return (str2 != null ? getCommandConsumerFactory().createCommandConsumer(tenantObject.getTenantId(), str, str2, handler2, Duration.ofSeconds(num.intValue()), start.context()) : getCommandConsumerFactory().createCommandConsumer(tenantObject.getTenantId(), str, handler2, Duration.ofSeconds(num.intValue()), start.context())).map(commandConsumer -> {
            if (!atomicBoolean.get()) {
                addCommandReceptionTimer(coapContext, atomicBoolean, handler, num.intValue(), start);
                coapContext.startAcceptTimer(this.vertx, tenantObject, ((CoapAdapterProperties) getConfig()).getTimeoutToAck());
            }
            return new CommandConsumer() { // from class: org.eclipse.hono.adapter.coap.AbstractVertxBasedCoapAdapter.1
                public Future<Void> close(SpanContext spanContext) {
                    Future close = commandConsumer.close(start.context());
                    Span span2 = start;
                    Future onFailure = close.onFailure(th -> {
                        TracingHelper.logError(span2, th);
                    });
                    Span span3 = start;
                    return onFailure.onComplete(asyncResult -> {
                        span3.finish();
                    });
                }
            };
        });
    }

    protected boolean isCommandValid(Command command, Span span) {
        return command.isValid();
    }

    private void addCommandReceptionTimer(CoapContext coapContext, AtomicBoolean atomicBoolean, Handler<AsyncResult<Void>> handler, long j, Span span) {
        Long valueOf = Long.valueOf(this.vertx.setTimer(j * 1000, l -> {
            this.log.trace("time to wait [{}s] for command expired [timer id: {}]", Long.valueOf(j), l);
            if (!atomicBoolean.compareAndSet(false, true)) {
                this.log.trace("response already sent, nothing to do ...");
                return;
            }
            setTtdStatus(coapContext, MetricsTags.TtdStatus.EXPIRED);
            span.log(String.format("time to wait for command expired (%ds)", Long.valueOf(j)));
            handler.handle(Future.succeededFuture());
        }));
        this.log.trace("adding command reception timer [id: {}]", valueOf);
        coapContext.put(KEY_TIMER_ID, valueOf);
    }

    private void cancelCommandReceptionTimer(CoapContext coapContext) {
        Long l = (Long) coapContext.get(KEY_TIMER_ID);
        if (l == null || l.longValue() < 0) {
            return;
        }
        if (this.vertx.cancelTimer(l.longValue())) {
            this.log.trace("Cancelled timer id {}", l);
        } else {
            this.log.debug("Could not cancel timer id {}", l);
        }
    }

    private void setTtdStatus(CoapContext coapContext, MetricsTags.TtdStatus ttdStatus) {
        coapContext.put(MetricsTags.TtdStatus.class.getName(), ttdStatus);
    }

    private MetricsTags.TtdStatus getTtdStatus(CoapContext coapContext) {
        return (MetricsTags.TtdStatus) Optional.ofNullable((MetricsTags.TtdStatus) coapContext.get(MetricsTags.TtdStatus.class.getName())).orElse(MetricsTags.TtdStatus.NONE);
    }

    public final Future<CoAP.ResponseCode> uploadCommandResponseMessage(CoapContext coapContext) {
        Objects.requireNonNull(coapContext);
        Device originDevice = coapContext.getOriginDevice();
        Device authenticatedDevice = coapContext.getAuthenticatedDevice();
        if (!coapContext.isConfirmable()) {
            coapContext.respondWithCode(CoAP.ResponseCode.BAD_REQUEST, "command response endpoint supports confirmable request messages only");
            return Future.succeededFuture(CoAP.ResponseCode.BAD_REQUEST);
        }
        Buffer payload = coapContext.getPayload();
        String contentType = coapContext.getContentType();
        String commandRequestId = coapContext.getCommandRequestId();
        Integer commandResponseStatus = coapContext.getCommandResponseStatus();
        this.log.debug("processing response to command [tenantId: {}, deviceId: {}, cmd-req-id: {}, status code: {}]", new Object[]{originDevice.getTenantId(), originDevice.getDeviceId(), commandRequestId, commandResponseStatus});
        Span start = TracingHelper.buildChildSpan(this.tracer, coapContext.getTracingContext(), "upload Command response", getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag(TracingHelper.TAG_TENANT_ID, originDevice.getTenantId()).withTag(TracingHelper.TAG_DEVICE_ID, originDevice.getDeviceId()).withTag("hono-cmd-status", commandResponseStatus).withTag("hono-cmd-req-id", commandRequestId).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), authenticatedDevice != null).start();
        Future tenantConfiguration = getTenantConfiguration(originDevice.getTenantId(), start.context());
        Future future = (Future) Optional.ofNullable(CommandResponse.fromRequestId(commandRequestId, originDevice.getTenantId(), originDevice.getDeviceId(), payload, contentType, commandResponseStatus)).map(commandResponse -> {
            return Future.succeededFuture(commandResponse);
        }).orElseGet(() -> {
            return Future.failedFuture(new ClientErrorException(400, String.format("command-request-id [%s] or status code [%s] is missing/invalid", commandRequestId, commandResponseStatus)));
        });
        return CompositeFuture.all(tenantConfiguration, future).compose(compositeFuture -> {
            return CompositeFuture.all(CompositeFuture.all(isAdapterEnabled((TenantObject) tenantConfiguration.result()), checkMessageLimit((TenantObject) tenantConfiguration.result(), payload.length(), start.context())).mapEmpty(), getRegistrationAssertion(originDevice.getTenantId(), originDevice.getDeviceId(), authenticatedDevice, start.context()));
        }).compose(compositeFuture2 -> {
            return sendCommandResponse((CommandResponse) future.result(), start.context());
        }).map(r15 -> {
            this.log.trace("delivered command response [command-request-id: {}] to application", commandRequestId);
            start.log("delivered command response to application");
            start.finish();
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, originDevice.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, payload.length(), coapContext.getTimer());
            coapContext.respondWithCode(CoAP.ResponseCode.CHANGED);
            return CoAP.ResponseCode.CHANGED;
        }).otherwise(th -> {
            this.log.debug("could not send command response [command-request-id: {}] to application", commandRequestId, th);
            TracingHelper.logError(start, th);
            start.finish();
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, originDevice.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), payload.length(), coapContext.getTimer());
            return coapContext.respond(CoapErrorResponse.respond(th, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
        });
    }
}
