package org.eclipse.hono.adapter.http;

import io.micrometer.core.instrument.Timer;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.noop.NoopSpan;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.hono.adapter.client.command.Command;
import org.eclipse.hono.adapter.client.command.CommandConsumer;
import org.eclipse.hono.adapter.client.command.CommandContext;
import org.eclipse.hono.adapter.client.command.CommandResponse;
import org.eclipse.hono.adapter.http.HttpProtocolAdapterProperties;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.auth.device.CredentialsApiAuthProvider;
import org.eclipse.hono.service.auth.device.DeviceCredentials;
import org.eclipse.hono.service.http.ComponentMetaDataDecorator;
import org.eclipse.hono.service.http.DefaultFailureHandler;
import org.eclipse.hono.service.http.HttpContext;
import org.eclipse.hono.service.http.HttpUtils;
import org.eclipse.hono.service.http.TracingHandler;
import org.eclipse.hono.service.http.WebSpanDecorator;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.tracing.TenantTraceSamplingHelper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TenantObject;

/* loaded from: input_file:org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapter.class */
public abstract class AbstractVertxBasedHttpProtocolAdapter<T extends HttpProtocolAdapterProperties> extends AbstractProtocolAdapterBase<T> {
    protected static final String DEFAULT_UPLOADS_DIRECTORY = "/tmp";
    private static final String KEY_TIMER_ID = "timerId";
    private HttpServer server;
    private HttpServer insecureServer;
    private HttpAdapterMetrics metrics = HttpAdapterMetrics.NOOP;

    public final void setMetrics(HttpAdapterMetrics httpAdapterMetrics) {
        Optional.ofNullable(httpAdapterMetrics).ifPresent(httpAdapterMetrics2 -> {
            this.log.info("reporting metrics using [{}]", httpAdapterMetrics.getClass().getName());
        });
        this.metrics = httpAdapterMetrics;
    }

    protected final HttpAdapterMetrics getMetrics() {
        return this.metrics;
    }

    public final int getPortDefaultValue() {
        return 8443;
    }

    public final int getInsecurePortDefaultValue() {
        return 8080;
    }

    protected final int getActualPort() {
        if (this.server != null) {
            return this.server.actualPort();
        }
        return -1;
    }

    protected final int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }

    public final void setHttpServer(HttpServer httpServer) {
        Objects.requireNonNull(httpServer);
        if (httpServer.actualPort() > 0) {
            throw new IllegalArgumentException("http server must not be started already");
        }
        this.server = httpServer;
    }

    public final void setInsecureHttpServer(HttpServer httpServer) {
        Objects.requireNonNull(httpServer);
        if (httpServer.actualPort() > 0) {
            throw new IllegalArgumentException("http server must not be started already");
        }
        this.insecureServer = httpServer;
    }

    public final void doStart(Promise<Void> promise) {
        checkPortConfiguration().compose(r3 -> {
            return preStartup();
        }).compose(r5 -> {
            Router createRouter = createRouter();
            if (createRouter == null) {
                return Future.failedFuture("no router configured");
            }
            addRoutes(createRouter);
            return CompositeFuture.all(bindSecureHttpServer(createRouter), bindInsecureHttpServer(createRouter));
        }).compose(compositeFuture -> {
            try {
                onStartupSuccess();
                return Future.succeededFuture((Void) null);
            } catch (Exception e) {
                this.log.error("error in onStartupSuccess", e);
                return Future.failedFuture(e);
            }
        }).onComplete(promise);
    }

    private Timer.Sample getMicrometerSample(RoutingContext routingContext) {
        return (Timer.Sample) routingContext.get("micrometer.sample");
    }

    private void setTtdStatus(RoutingContext routingContext, MetricsTags.TtdStatus ttdStatus) {
        routingContext.put(MetricsTags.TtdStatus.class.getName(), ttdStatus);
    }

    private TracingHandler createTracingHandler() {
        HashMap hashMap = new HashMap();
        hashMap.put(Tags.COMPONENT.getKey(), getTypeName());
        addCustomTags(hashMap);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ComponentMetaDataDecorator(hashMap));
        addCustomSpanDecorators(arrayList);
        return new TracingHandler(this.tracer, arrayList);
    }

    protected void addCustomTags(Map<String, String> map) {
    }

    protected void addCustomSpanDecorators(List<WebSpanDecorator> list) {
    }

    protected Future<Void> preStartup() {
        return Future.succeededFuture();
    }

    protected void onStartupSuccess() {
    }

    protected Router createRouter() {
        Router router = Router.router(this.vertx);
        Route route = router.route();
        route.handler(routingContext -> {
            routingContext.put("micrometer.sample", getMetrics().startTimer());
            routingContext.next();
        });
        TracingHandler createTracingHandler = createTracingHandler();
        route.handler(createTracingHandler).failureHandler(createTracingHandler);
        route.handler(routingContext2 -> {
            if (!routingContext2.response().closed() && !routingContext2.response().ended()) {
                routingContext2.response().closeHandler(r5 -> {
                    logResponseGettingClosedPrematurely(routingContext2);
                });
            }
            routingContext2.next();
        });
        route.failureHandler(new DefaultFailureHandler());
        this.log.info("limiting size of inbound request body to {} bytes", Integer.valueOf(((HttpProtocolAdapterProperties) getConfig()).getMaxPayloadSize()));
        route.handler(BodyHandler.create(DEFAULT_UPLOADS_DIRECTORY).setBodyLimit(((HttpProtocolAdapterProperties) getConfig()).getMaxPayloadSize()));
        return router;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Future<Void> handleBeforeCredentialsValidation(DeviceCredentials deviceCredentials, HttpContext httpContext) {
        String tenantId = deviceCredentials.getTenantId();
        String authId = deviceCredentials.getAuthId();
        if (httpContext.getTracingSpan() == null) {
            this.log.warn("handleBeforeCredentialsValidation: no span context set in httpContext");
        }
        Span span = (Span) Optional.ofNullable(httpContext.getTracingSpan()).orElse(NoopSpan.INSTANCE);
        return getTenantConfiguration(tenantId, span.context()).recover(th -> {
            return Future.failedFuture(CredentialsApiAuthProvider.mapNotFoundToBadCredentialsException(th));
        }).map(tenantObject -> {
            TracingHelper.setDeviceTags(span, tenantId, (String) null, authId);
            TenantTraceSamplingHelper.applyTraceSamplingPriority(tenantObject, authId, span);
            return tenantObject;
        }).compose(tenantObject2 -> {
            return isAdapterEnabled(tenantObject2);
        }).mapEmpty();
    }

    protected abstract void addRoutes(Router router);

    protected HttpServerOptions getHttpServerOptions() {
        HttpServerOptions httpServerOptions = new HttpServerOptions();
        httpServerOptions.setHost(((HttpProtocolAdapterProperties) getConfig()).getBindAddress()).setPort(((HttpProtocolAdapterProperties) getConfig()).getPort(getPortDefaultValue())).setMaxChunkSize(4096);
        addTlsKeyCertOptions(httpServerOptions);
        addTlsTrustOptions(httpServerOptions);
        return httpServerOptions;
    }

    protected HttpServerOptions getInsecureHttpServerOptions() {
        HttpServerOptions httpServerOptions = new HttpServerOptions();
        httpServerOptions.setHost(((HttpProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress()).setPort(((HttpProtocolAdapterProperties) getConfig()).getInsecurePort(getInsecurePortDefaultValue())).setMaxChunkSize(4096);
        return httpServerOptions;
    }

    protected void customizeDownstreamMessageProperties(Map<String, Object> map, HttpContext httpContext) {
    }

    private Future<HttpServer> bindSecureHttpServer(Router router) {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        Promise promise = Promise.promise();
        String bindAddress = this.server == null ? ((HttpProtocolAdapterProperties) getConfig()).getBindAddress() : "?";
        if (this.server == null) {
            this.server = this.vertx.createHttpServer(getHttpServerOptions());
        }
        this.server.requestHandler(router).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                this.log.info("secure http server listening on {}:{}", bindAddress, Integer.valueOf(this.server.actualPort()));
                promise.complete((HttpServer) asyncResult.result());
            } else {
                this.log.error("error while starting up secure http server", asyncResult.cause());
                promise.fail(asyncResult.cause());
            }
        });
        return promise.future();
    }

    private Future<HttpServer> bindInsecureHttpServer(Router router) {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        Promise promise = Promise.promise();
        String insecurePortBindAddress = this.insecureServer == null ? ((HttpProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress() : "?";
        if (this.insecureServer == null) {
            this.insecureServer = this.vertx.createHttpServer(getInsecureHttpServerOptions());
        }
        this.insecureServer.requestHandler(router).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                this.log.info("insecure http server listening on {}:{}", insecurePortBindAddress, Integer.valueOf(this.insecureServer.actualPort()));
                promise.complete((HttpServer) asyncResult.result());
            } else {
                this.log.error("error while starting up insecure http server", asyncResult.cause());
                promise.fail(asyncResult.cause());
            }
        });
        return promise.future();
    }

    public final void doStop(Promise<Void> promise) {
        try {
            preShutdown();
        } catch (Exception e) {
            this.log.error("error in preShutdown", e);
        }
        Promise promise2 = Promise.promise();
        if (this.server != null) {
            this.server.close(promise2);
        } else {
            promise2.complete();
        }
        Promise promise3 = Promise.promise();
        if (this.insecureServer != null) {
            this.insecureServer.close(promise3);
        } else {
            promise3.complete();
        }
        CompositeFuture.all(promise2.future(), promise3.future()).compose(compositeFuture -> {
            return postShutdown();
        }).onComplete(promise);
    }

    protected void preShutdown() {
    }

    protected Future<Void> postShutdown() {
        return Future.succeededFuture();
    }

    public final void uploadTelemetryMessage(HttpContext httpContext, String str, String str2) {
        uploadTelemetryMessage((HttpContext) Objects.requireNonNull(httpContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), httpContext.getRoutingContext().getBody(), httpContext.getContentType());
    }

    public final void uploadTelemetryMessage(HttpContext httpContext, String str, String str2, Buffer buffer, String str3) {
        doUploadMessage((HttpContext) Objects.requireNonNull(httpContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), buffer, str3, MetricsTags.EndpointType.TELEMETRY);
    }

    public final void uploadEventMessage(HttpContext httpContext, String str, String str2) {
        uploadEventMessage((HttpContext) Objects.requireNonNull(httpContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), httpContext.getRoutingContext().getBody(), httpContext.getContentType());
    }

    public final void uploadEventMessage(HttpContext httpContext, String str, String str2, Buffer buffer, String str3) {
        doUploadMessage((HttpContext) Objects.requireNonNull(httpContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), buffer, str3, MetricsTags.EndpointType.EVENT);
    }

    private void doUploadMessage(HttpContext httpContext, String str, String str2, Buffer buffer, String str3, MetricsTags.EndpointType endpointType) {
        if (!httpContext.hasValidQoS()) {
            HttpUtils.badRequest(httpContext.getRoutingContext(), "unsupported QoS-Level header value");
            return;
        }
        if (!isPayloadOfIndicatedType(buffer, str3)) {
            HttpUtils.badRequest(httpContext.getRoutingContext(), String.format("content type [%s] does not match payload", str3));
            return;
        }
        MetricsTags.QoS qoSLevel = getQoSLevel(endpointType, httpContext.getRequestedQos());
        Device authenticatedDevice = httpContext.getAuthenticatedDevice();
        String deviceId = (authenticatedDevice == null || str2.equals(authenticatedDevice.getDeviceId())) ? null : authenticatedDevice.getDeviceId();
        Span start = TracingHelper.buildChildSpan(this.tracer, TracingHandler.serverSpanContext(httpContext.getRoutingContext()), "upload " + endpointType.getCanonicalName(), getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag(TracingHelper.TAG_TENANT_ID, str).withTag(TracingHelper.TAG_DEVICE_ID, str2).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), authenticatedDevice != null).withTag(TracingHelper.TAG_QOS, qoSLevel.name()).start();
        Promise promise = Promise.promise();
        Future registrationAssertion = getRegistrationAssertion(str, str2, authenticatedDevice, start.context());
        int intValue = ((Integer) Optional.ofNullable(buffer).map(buffer2 -> {
            return Integer.valueOf(buffer.length());
        }).orElse(0)).intValue();
        Future tenantConfiguration = getTenantConfiguration(str, start.context());
        Future compose = CompositeFuture.all(tenantConfiguration.compose(tenantObject -> {
            return CompositeFuture.all(isAdapterEnabled(tenantObject), checkMessageLimit(tenantObject, intValue, start.context())).map(compositeFuture -> {
                return tenantObject;
            });
        }), registrationAssertion).compose(compositeFuture -> {
            return getTimeUntilDisconnect((TenantObject) tenantConfiguration.result(), getTimeUntilDisconnectFromRequest(httpContext)).map(num -> {
                if (num != null) {
                    start.setTag("ttd", num);
                }
                return num;
            });
        });
        Future compose2 = compose.compose(num -> {
            return createCommandConsumer(num, (TenantObject) tenantConfiguration.result(), str2, deviceId, httpContext.getRoutingContext(), promise, start);
        });
        compose2.compose(commandConsumer -> {
            Map<String, Object> downstreamMessageProperties = getDownstreamMessageProperties(httpContext);
            Optional.ofNullable((CommandConsumer) compose2.result()).map(commandConsumer -> {
                return (Integer) compose.result();
            }).ifPresent(num2 -> {
                downstreamMessageProperties.put("ttd", num2);
            });
            downstreamMessageProperties.put("qos", Integer.valueOf(httpContext.getRequestedQos().ordinal()));
            customizeDownstreamMessageProperties(downstreamMessageProperties, httpContext);
            setTtdRequestConnectionCloseHandler(httpContext.getRoutingContext(), (CommandConsumer) compose2.result(), str, str2, start);
            if (!MetricsTags.EndpointType.EVENT.equals(endpointType)) {
                return CompositeFuture.all(getTelemetrySender().sendTelemetry((TenantObject) tenantConfiguration.result(), (RegistrationAssertion) registrationAssertion.result(), httpContext.getRequestedQos(), str3, buffer, downstreamMessageProperties, start.context()), promise.future()).map(compositeFuture2 -> {
                    return (Void) null;
                });
            }
            httpContext.getTimeToLive().ifPresent(duration -> {
                downstreamMessageProperties.put("ttl", Long.valueOf(duration.toSeconds()));
            });
            return CompositeFuture.all(getEventSender().sendEvent((TenantObject) tenantConfiguration.result(), (RegistrationAssertion) registrationAssertion.result(), str3, buffer, downstreamMessageProperties, start.context()), promise.future()).map(compositeFuture3 -> {
                return (Void) null;
            });
        }).map(r23 -> {
            Future onFailure = compose2.result() != null ? ((CommandConsumer) compose2.result()).close(start.context()).onFailure(th -> {
                TracingHelper.logError(start, th);
            }) : Future.succeededFuture();
            if (httpContext.response().closed()) {
                this.log.debug("failed to send http response for [{}] message from device [tenantId: {}, deviceId: {}]: response already closed", new Object[]{endpointType, str, str2});
                TracingHelper.logError(start, "failed to send HTTP response to device: response already closed");
                onFailure.onComplete(asyncResult -> {
                    start.finish();
                    httpContext.response().end();
                });
            } else {
                CommandContext commandContext = (CommandContext) httpContext.get("command-context");
                setResponsePayload(httpContext.response(), commandContext, start);
                httpContext.getRoutingContext().addBodyEndHandler(r22 -> {
                    this.log.trace("successfully processed [{}] message for device [tenantId: {}, deviceId: {}]", new Object[]{endpointType, str, str2});
                    if (commandContext != null) {
                        commandContext.getTracingSpan().log("forwarded command to device in HTTP response body");
                        commandContext.accept();
                        this.metrics.reportCommand(commandContext.getCommand().isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, str, (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, commandContext.getCommand().getPayloadSize(), getMicrometerSample(commandContext));
                    }
                    this.metrics.reportTelemetry(endpointType, str, (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, qoSLevel, intValue, httpContext.getTtdStatus(), getMicrometerSample(httpContext.getRoutingContext()));
                    onFailure.onComplete(asyncResult2 -> {
                        start.finish();
                    });
                });
                httpContext.response().exceptionHandler(th2 -> {
                    this.log.debug("failed to send http response for [{}] message from device [tenantId: {}, deviceId: {}]", new Object[]{endpointType, str, str2, th2});
                    if (commandContext != null) {
                        TracingHelper.logError(commandContext.getTracingSpan(), "failed to forward command to device in HTTP response body", th2);
                        commandContext.release();
                        this.metrics.reportCommand(commandContext.getCommand().isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, str, (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.UNDELIVERABLE, commandContext.getCommand().getPayloadSize(), getMicrometerSample(commandContext));
                    }
                    start.log("failed to send HTTP response to device");
                    TracingHelper.logError(start, th2);
                    onFailure.onComplete(asyncResult2 -> {
                        start.finish();
                    });
                });
                httpContext.response().end();
            }
            return r23;
        }).recover(th -> {
            MetricsTags.ProcessingOutcome processingOutcome;
            this.log.debug("cannot process [{}] message from device [tenantId: {}, deviceId: {}]", new Object[]{endpointType, str, str2, th});
            boolean closed = httpContext.response().closed();
            Future onFailure = compose2.result() != null ? ((CommandConsumer) compose2.result()).close(start.context()).onFailure(th -> {
                TracingHelper.logError(start, th);
            }) : Future.succeededFuture();
            CommandContext commandContext = (CommandContext) httpContext.get("command-context");
            if (commandContext != null) {
                TracingHelper.logError(commandContext.getTracingSpan(), "command won't be forwarded to device in HTTP response body, HTTP request handling failed", th);
                commandContext.release();
                start.log("released command for device");
            }
            if (ClientErrorException.class.isInstance(th)) {
                processingOutcome = MetricsTags.ProcessingOutcome.UNPROCESSABLE;
                httpContext.fail(th);
            } else {
                processingOutcome = MetricsTags.ProcessingOutcome.UNDELIVERABLE;
                String clientFacingMessage = th instanceof ServerErrorException ? ((ServerErrorException) th).getClientFacingMessage() : null;
                HttpUtils.serviceUnavailable(httpContext.getRoutingContext(), 2, Strings.isNullOrEmpty(clientFacingMessage) ? "temporarily unavailable" : clientFacingMessage);
            }
            if (closed) {
                this.log.debug("failed to send http response for [{}] message from device [tenantId: {}, deviceId: {}]: response already closed", new Object[]{endpointType, str, str2});
                TracingHelper.logError(start, "failed to send HTTP response to device: response already closed");
            }
            this.metrics.reportTelemetry(endpointType, str, (TenantObject) tenantConfiguration.result(), processingOutcome, qoSLevel, intValue, httpContext.getTtdStatus(), getMicrometerSample(httpContext.getRoutingContext()));
            TracingHelper.logError(start, th);
            onFailure.onComplete(asyncResult -> {
                start.finish();
            });
            return Future.failedFuture(th);
        });
    }

    private void logResponseGettingClosedPrematurely(RoutingContext routingContext) {
        this.log.trace("connection got closed before response could be sent");
        Optional.ofNullable(getRootSpan(routingContext)).ifPresent(span -> {
            TracingHelper.logError(span, "connection got closed before response could be sent");
        });
    }

    private Span getRootSpan(RoutingContext routingContext) {
        Object obj = routingContext.get(TracingHandler.CURRENT_SPAN);
        if (obj instanceof Span) {
            return (Span) obj;
        }
        return null;
    }

    protected Integer getTimeUntilDisconnectFromRequest(HttpContext httpContext) {
        return httpContext.getTimeTillDisconnect();
    }

    private void setTtdRequestConnectionCloseHandler(RoutingContext routingContext, CommandConsumer commandConsumer, String str, String str2, Span span) {
        if (commandConsumer == null || routingContext.response().closed() || routingContext.response().ended()) {
            return;
        }
        routingContext.response().closeHandler(r11 -> {
            this.log.debug("device [tenant: {}, device-id: {}] closed connection before response could be sent", str, str2);
            span.log("device closed connection, stop waiting for command");
            cancelCommandReceptionTimer(routingContext);
            commandConsumer.close(span.context()).onFailure(th -> {
                TracingHelper.logError(span, th);
            }).onComplete(asyncResult -> {
                span.finish();
                logResponseGettingClosedPrematurely(routingContext);
                routingContext.response().end();
            });
        });
    }

    private void setResponsePayload(HttpServerResponse httpServerResponse, CommandContext commandContext, Span span) {
        if (commandContext == null) {
            setEmptyResponsePayload(httpServerResponse, span);
        } else {
            setNonEmptyResponsePayload(httpServerResponse, commandContext, span);
        }
    }

    protected void setEmptyResponsePayload(HttpServerResponse httpServerResponse, Span span) {
        httpServerResponse.setStatusCode(202);
    }

    protected void setNonEmptyResponsePayload(HttpServerResponse httpServerResponse, CommandContext commandContext, Span span) {
        Command command = commandContext.getCommand();
        httpServerResponse.putHeader("hono-command", command.getName());
        span.setTag("hono-command", command.getName());
        this.log.debug("adding command [name: {}, request-id: {}] to response for device [tenant-id: {}, device-id: {}]", new Object[]{command.getName(), command.getRequestId(), command.getTenant(), command.getDeviceId()});
        if (!command.isOneWay()) {
            httpServerResponse.putHeader("hono-cmd-req-id", command.getRequestId());
            span.setTag("hono-cmd-req-id", command.getRequestId());
        }
        if (command.isTargetedAtGateway()) {
            httpServerResponse.putHeader("hono-cmd-target-device", command.getOriginalDeviceId());
            span.setTag("hono-cmd-target-device", command.getOriginalDeviceId());
        }
        httpServerResponse.setStatusCode(200);
        HttpUtils.setResponseBody(httpServerResponse, command.getPayload(), command.getContentType());
    }

    protected final Future<CommandConsumer> createCommandConsumer(Integer num, TenantObject tenantObject, String str, String str2, RoutingContext routingContext, Handler<AsyncResult<Void>> handler, Span span) {
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(str);
        Objects.requireNonNull(routingContext);
        Objects.requireNonNull(handler);
        Objects.requireNonNull(span);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        if (num == null || num.intValue() <= 0) {
            if (atomicBoolean.compareAndSet(false, true)) {
                handler.handle(Future.succeededFuture());
            }
            return Future.succeededFuture();
        }
        span.setTag("ttd", num);
        Span start = TracingHelper.buildChildSpan(this.tracer, span.context(), "wait for command", getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag(TracingHelper.TAG_TENANT_ID, tenantObject.getTenantId()).withTag(TracingHelper.TAG_DEVICE_ID, str).start();
        Handler handler2 = commandContext -> {
            Tags.COMPONENT.set(commandContext.getTracingSpan(), getTypeName());
            commandContext.logCommandToSpan(start);
            Command command = commandContext.getCommand();
            Timer.Sample startTimer = getMetrics().startTimer();
            if (!isCommandValid(command, start)) {
                getMetrics().reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantObject.getTenantId(), tenantObject, MetricsTags.ProcessingOutcome.UNPROCESSABLE, command.getPayloadSize(), startTimer);
                this.log.debug("command message is invalid: {}", command);
                commandContext.reject("malformed command message");
            } else {
                if (atomicBoolean.compareAndSet(false, true)) {
                    checkMessageLimit(tenantObject, command.getPayloadSize(), start.context()).onComplete(asyncResult -> {
                        if (asyncResult.succeeded()) {
                            addMicrometerSample(commandContext, startTimer);
                            routingContext.put("command-context", commandContext);
                        } else {
                            commandContext.reject(asyncResult.cause().getMessage());
                            TracingHelper.logError(start, "rejected command for device", asyncResult.cause());
                            this.metrics.reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantObject.getTenantId(), tenantObject, MetricsTags.ProcessingOutcome.from(asyncResult.cause()), command.getPayloadSize(), startTimer);
                        }
                        cancelCommandReceptionTimer(routingContext);
                        setTtdStatus(routingContext, MetricsTags.TtdStatus.COMMAND);
                        handler.handle(Future.succeededFuture());
                    });
                    return;
                }
                this.log.debug("waiting time for command has elapsed or another command has already been processed [tenantId: {}, deviceId: {}]", tenantObject.getTenantId(), str);
                getMetrics().reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantObject.getTenantId(), tenantObject, MetricsTags.ProcessingOutcome.UNDELIVERABLE, command.getPayloadSize(), startTimer);
                TracingHelper.logError(commandContext.getTracingSpan(), "waiting time for command has elapsed or another command has already been processed");
                commandContext.release();
            }
        };
        return (str2 != null ? getCommandConsumerFactory().createCommandConsumer(tenantObject.getTenantId(), str, str2, handler2, Duration.ofSeconds(num.intValue()), start.context()) : getCommandConsumerFactory().createCommandConsumer(tenantObject.getTenantId(), str, handler2, Duration.ofSeconds(num.intValue()), start.context())).map(commandConsumer -> {
            if (!atomicBoolean.get()) {
                addCommandReceptionTimer(routingContext, atomicBoolean, handler, num.intValue(), start);
            }
            return new CommandConsumer() { // from class: org.eclipse.hono.adapter.http.AbstractVertxBasedHttpProtocolAdapter.1
                public Future<Void> close(SpanContext spanContext) {
                    Future close = commandConsumer.close(start.context());
                    Span span2 = start;
                    Future onFailure = close.onFailure(th -> {
                        TracingHelper.logError(span2, th);
                    });
                    Span span3 = start;
                    return onFailure.onComplete(asyncResult -> {
                        span3.finish();
                    });
                }
            };
        });
    }

    protected boolean isCommandValid(Command command, Span span) {
        return command.isValid();
    }

    private void addCommandReceptionTimer(RoutingContext routingContext, AtomicBoolean atomicBoolean, Handler<AsyncResult<Void>> handler, long j, Span span) {
        Long valueOf = Long.valueOf(routingContext.vertx().setTimer(j * 1000, l -> {
            this.log.trace("time to wait [{}s] for command expired [timer id: {}]", Long.valueOf(j), l);
            if (!atomicBoolean.compareAndSet(false, true)) {
                this.log.trace("response already sent, nothing to do ...");
                return;
            }
            setTtdStatus(routingContext, MetricsTags.TtdStatus.EXPIRED);
            span.log(String.format("time to wait for command expired (%ds)", Long.valueOf(j)));
            handler.handle(Future.succeededFuture());
        }));
        this.log.trace("adding command reception timer [id: {}]", valueOf);
        routingContext.put(KEY_TIMER_ID, valueOf);
    }

    private void cancelCommandReceptionTimer(RoutingContext routingContext) {
        Long l = (Long) routingContext.get(KEY_TIMER_ID);
        if (l == null || l.longValue() < 0) {
            return;
        }
        if (routingContext.vertx().cancelTimer(l.longValue())) {
            this.log.trace("Cancelled timer id {}", l);
        } else {
            this.log.debug("Could not cancel timer id {}", l);
        }
    }

    public final void uploadCommandResponseMessage(HttpContext httpContext, String str, String str2, String str3, Integer num) {
        Objects.requireNonNull(httpContext);
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Buffer body = httpContext.getRoutingContext().getBody();
        String contentType = httpContext.getContentType();
        this.log.debug("processing response to command [tenantId: {}, deviceId: {}, cmd-req-id: {}, status code: {}]", new Object[]{str, str2, str3, num});
        Device authenticatedDevice = httpContext.getAuthenticatedDevice();
        Span start = TracingHelper.buildChildSpan(this.tracer, TracingHandler.serverSpanContext(httpContext.getRoutingContext()), "upload Command response", getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag(TracingHelper.TAG_TENANT_ID, str).withTag(TracingHelper.TAG_DEVICE_ID, str2).withTag("hono-cmd-status", num).withTag("hono-cmd-req-id", str3).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), authenticatedDevice != null).start();
        CommandResponse fromRequestId = CommandResponse.fromRequestId(str3, str, str2, body, contentType, num);
        Future tenantConfiguration = getTenantConfiguration(str, start.context());
        Future succeededFuture = fromRequestId != null ? Future.succeededFuture(fromRequestId) : Future.failedFuture(new ClientErrorException(400, String.format("command-request-id [%s] or status code [%s] is missing/invalid", str3, num)));
        int intValue = ((Integer) Optional.ofNullable(body).map((v0) -> {
            return v0.length();
        }).orElse(0)).intValue();
        CompositeFuture.all(tenantConfiguration, succeededFuture).compose(compositeFuture -> {
            return CompositeFuture.all(CompositeFuture.all(isAdapterEnabled((TenantObject) tenantConfiguration.result()), checkMessageLimit((TenantObject) tenantConfiguration.result(), intValue, start.context())).map(compositeFuture -> {
                return null;
            }), getRegistrationAssertion(str, str2, authenticatedDevice, start.context())).compose(compositeFuture2 -> {
                return sendCommandResponse((CommandResponse) succeededFuture.result(), start.context());
            }).map(r16 -> {
                this.log.trace("delivered command response [command-request-id: {}] to application", str3);
                start.log("delivered command response to application");
                start.finish();
                this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, str, (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, intValue, getMicrometerSample(httpContext.getRoutingContext()));
                httpContext.response().setStatusCode(202);
                httpContext.response().end();
                return r16;
            });
        }).otherwise(th -> {
            this.log.debug("could not send command response [command-request-id: {}] to application", str3, th);
            TracingHelper.logError(start, th);
            start.finish();
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, str, (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), intValue, getMicrometerSample(httpContext.getRoutingContext()));
            httpContext.fail(th);
            return null;
        });
    }

    private static MetricsTags.QoS getQoSLevel(MetricsTags.EndpointType endpointType, QoS qoS) {
        return endpointType == MetricsTags.EndpointType.EVENT ? MetricsTags.QoS.AT_LEAST_ONCE : qoS == null ? MetricsTags.QoS.UNKNOWN : qoS == QoS.AT_MOST_ONCE ? MetricsTags.QoS.AT_MOST_ONCE : MetricsTags.QoS.AT_LEAST_ONCE;
    }
}
