package org.eclipse.hono.adapter.http;

import io.opentracing.Span;
import io.opentracing.contrib.vertx.ext.web.TracingHandler;
import io.opentracing.contrib.vertx.ext.web.WebSpanDecorator;
import io.opentracing.tag.Tags;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.adapter.http.HttpProtocolAdapterProperties;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.auth.device.Device;
import org.eclipse.hono.service.command.Command;
import org.eclipse.hono.service.command.CommandResponse;
import org.eclipse.hono.service.command.CommandResponseSender;
import org.eclipse.hono.service.http.DefaultFailureHandler;
import org.eclipse.hono.service.http.HttpUtils;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.TenantObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapter.class */
public abstract class AbstractVertxBasedHttpProtocolAdapter<T extends HttpProtocolAdapterProperties> extends AbstractProtocolAdapterBase<T> {
    protected static final String DEFAULT_UPLOADS_DIRECTORY = "/tmp";
    private static final Logger LOG = LoggerFactory.getLogger(AbstractVertxBasedHttpProtocolAdapter.class);
    private static final int AT_LEAST_ONCE = 1;
    private static final int HEADER_QOS_INVALID = -1;
    private static final String KEY_TIMER_ID = "timerId";
    private HttpServer server;
    private HttpServer insecureServer;
    private HttpAdapterMetrics metrics;

    @Autowired
    public final void setMetrics(HttpAdapterMetrics httpAdapterMetrics) {
        this.metrics = httpAdapterMetrics;
    }

    public final int getPortDefaultValue() {
        return 8443;
    }

    public final int getInsecurePortDefaultValue() {
        return 8080;
    }

    protected final int getActualPort() {
        return this.server != null ? this.server.actualPort() : HEADER_QOS_INVALID;
    }

    protected final int getActualInsecurePort() {
        return this.insecureServer != null ? this.insecureServer.actualPort() : HEADER_QOS_INVALID;
    }

    @Autowired(required = false)
    public final void setHttpServer(HttpServer httpServer) {
        Objects.requireNonNull(httpServer);
        if (httpServer.actualPort() > 0) {
            throw new IllegalArgumentException("http server must not be started already");
        }
        this.server = httpServer;
    }

    @Autowired(required = false)
    public final void setInsecureHttpServer(HttpServer httpServer) {
        Objects.requireNonNull(httpServer);
        if (httpServer.actualPort() > 0) {
            throw new IllegalArgumentException("http server must not be started already");
        }
        this.insecureServer = httpServer;
    }

    public final void doStart(Future<Void> future) {
        checkPortConfiguration().compose(r3 -> {
            return preStartup();
        }).compose(r5 -> {
            if (this.metrics == null) {
                this.metrics = new HttpAdapterMetrics();
            }
            Router createRouter = createRouter();
            if (createRouter == null) {
                return Future.failedFuture("no router configured");
            }
            addRoutes(createRouter);
            return CompositeFuture.all(bindSecureHttpServer(createRouter), bindInsecureHttpServer(createRouter));
        }).compose(compositeFuture -> {
            try {
                onStartupSuccess();
                future.complete();
            } catch (Exception e) {
                LOG.error("error in onStartupSuccess", e);
                future.fail(e);
            }
        }, future);
    }

    private void addTracingHandler(Router router, int i) {
        HashMap hashMap = new HashMap();
        hashMap.put(Tags.COMPONENT.getKey(), getTypeName());
        addCustomTags(hashMap);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ComponentMetaDataDecorator(hashMap));
        addCustomSpanDecorators(arrayList);
        TracingHandler tracingHandler = new TracingHandler(this.tracer, arrayList);
        router.route().order(i).handler(tracingHandler).failureHandler(tracingHandler);
    }

    protected void addCustomTags(Map<String, String> map) {
    }

    protected void addCustomSpanDecorators(List<WebSpanDecorator> list) {
    }

    protected Future<Void> preStartup() {
        return Future.succeededFuture();
    }

    protected void onStartupSuccess() {
    }

    protected Router createRouter() {
        Router router = Router.router(this.vertx);
        LOG.info("limiting size of inbound request body to {} bytes", Integer.valueOf(((HttpProtocolAdapterProperties) getConfig()).getMaxPayloadSize()));
        router.route().handler(BodyHandler.create(DEFAULT_UPLOADS_DIRECTORY).setBodyLimit(((HttpProtocolAdapterProperties) getConfig()).getMaxPayloadSize()));
        addTracingHandler(router, -5);
        router.route().order(HEADER_QOS_INVALID).failureHandler(new DefaultFailureHandler());
        return router;
    }

    protected abstract void addRoutes(Router router);

    protected HttpServerOptions getHttpServerOptions() {
        HttpServerOptions httpServerOptions = new HttpServerOptions();
        httpServerOptions.setHost(((HttpProtocolAdapterProperties) getConfig()).getBindAddress()).setPort(((HttpProtocolAdapterProperties) getConfig()).getPort(getPortDefaultValue())).setMaxChunkSize(4096);
        addTlsKeyCertOptions(httpServerOptions);
        addTlsTrustOptions(httpServerOptions);
        return httpServerOptions;
    }

    protected HttpServerOptions getInsecureHttpServerOptions() {
        HttpServerOptions httpServerOptions = new HttpServerOptions();
        httpServerOptions.setHost(((HttpProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress()).setPort(((HttpProtocolAdapterProperties) getConfig()).getInsecurePort(getInsecurePortDefaultValue())).setMaxChunkSize(4096);
        return httpServerOptions;
    }

    protected void customizeDownstreamMessage(Message message, RoutingContext routingContext) {
    }

    protected final Device getAuthenticatedDevice(RoutingContext routingContext) {
        return (Device) Optional.ofNullable(routingContext.user()).map(user -> {
            if (Device.class.isInstance(user)) {
                return (Device) user;
            }
            return null;
        }).orElse(null);
    }

    private Future<HttpServer> bindSecureHttpServer(Router router) {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        Future<HttpServer> future = Future.future();
        String bindAddress = this.server == null ? ((HttpProtocolAdapterProperties) getConfig()).getBindAddress() : "?";
        if (this.server == null) {
            this.server = this.vertx.createHttpServer(getHttpServerOptions());
        }
        HttpServer httpServer = this.server;
        router.getClass();
        httpServer.requestHandler(router::accept).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("secure http server listening on {}:{}", bindAddress, Integer.valueOf(this.server.actualPort()));
                future.complete(asyncResult.result());
            } else {
                LOG.error("error while starting up secure http server", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    private Future<HttpServer> bindInsecureHttpServer(Router router) {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        Future<HttpServer> future = Future.future();
        String insecurePortBindAddress = this.insecureServer == null ? ((HttpProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress() : "?";
        if (this.insecureServer == null) {
            this.insecureServer = this.vertx.createHttpServer(getInsecureHttpServerOptions());
        }
        HttpServer httpServer = this.insecureServer;
        router.getClass();
        httpServer.requestHandler(router::accept).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("insecure http server listening on {}:{}", insecurePortBindAddress, Integer.valueOf(this.insecureServer.actualPort()));
                future.complete(asyncResult.result());
            } else {
                LOG.error("error while starting up insecure http server", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    public final void doStop(Future<Void> future) {
        try {
            preShutdown();
        } catch (Exception e) {
            LOG.error("error in preShutdown", e);
        }
        Future future2 = Future.future();
        if (this.server != null) {
            this.server.close(future2.completer());
        } else {
            future2.complete();
        }
        Future future3 = Future.future();
        if (this.insecureServer != null) {
            this.insecureServer.close(future3.completer());
        } else {
            future3.complete();
        }
        CompositeFuture.all(future2, future3).compose(compositeFuture -> {
            return postShutdown();
        }).compose(r3 -> {
            future.complete();
        }, future);
    }

    protected void preShutdown() {
    }

    protected Future<Void> postShutdown() {
        return Future.succeededFuture();
    }

    public final void uploadTelemetryMessage(RoutingContext routingContext, String str, String str2) {
        uploadTelemetryMessage((RoutingContext) Objects.requireNonNull(routingContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), routingContext.getBody(), HttpUtils.getContentType(routingContext));
    }

    public final void uploadTelemetryMessage(RoutingContext routingContext, String str, String str2, Buffer buffer, String str3) {
        doUploadMessage((RoutingContext) Objects.requireNonNull(routingContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), buffer, str3, getTelemetrySender(str), "telemetry");
    }

    public final void uploadEventMessage(RoutingContext routingContext, String str, String str2) {
        uploadEventMessage((RoutingContext) Objects.requireNonNull(routingContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), routingContext.getBody(), HttpUtils.getContentType(routingContext));
    }

    public final void uploadEventMessage(RoutingContext routingContext, String str, String str2, Buffer buffer, String str3) {
        doUploadMessage((RoutingContext) Objects.requireNonNull(routingContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), buffer, str3, getEventSender(str), "event");
    }

    private void doUploadMessage(RoutingContext routingContext, String str, String str2, Buffer buffer, String str3, Future<MessageSender> future, String str4) {
        if (!isPayloadOfIndicatedType(buffer, str3)) {
            HttpUtils.badRequest(routingContext, String.format("content type [%s] does not match payload", str3));
            return;
        }
        Integer qoSLevel = getQoSLevel(routingContext.request().getHeader("QoS-Level"));
        if (qoSLevel != null && qoSLevel.intValue() == HEADER_QOS_INVALID) {
            HttpUtils.badRequest(routingContext, "unsupported QoS-Level header value");
            return;
        }
        Future<Void> future2 = Future.future();
        Device authenticatedDevice = getAuthenticatedDevice(routingContext);
        Span start = this.tracer.buildSpan("upload " + str4).asChildOf(TracingHandler.serverSpanContext(routingContext)).ignoreActiveSpan().withTag(Tags.COMPONENT.getKey(), getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag("tenant_id", str).withTag("device_id", str2).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), authenticatedDevice != null).start();
        Future registrationAssertion = getRegistrationAssertion(str, str2, authenticatedDevice);
        Future tenantConfiguration = getTenantConfiguration(str, start.context());
        Future<MessageConsumer> createCommandConsumer = createCommandConsumer(str, str2, routingContext, future2, start);
        CompositeFuture.all(registrationAssertion, tenantConfiguration, future, createCommandConsumer).compose(compositeFuture -> {
            if (!((TenantObject) tenantConfiguration.result()).isAdapterEnabled(getTypeName())) {
                return Future.failedFuture(new ClientErrorException(403, "adapter is not enabled for tenant"));
            }
            MessageSender messageSender = (MessageSender) future.result();
            Message newMessage = newMessage(ResourceIdentifier.from(str4, str, str2), messageSender.isRegistrationAssertionRequired(), routingContext.request().uri(), str3, buffer, (JsonObject) registrationAssertion.result(), HttpUtils.getTimeTilDisconnect(routingContext));
            customizeDownstreamMessage(newMessage, routingContext);
            addConnectionCloseHandler(routingContext, (MessageConsumer) createCommandConsumer.result(), str, str2);
            return qoSLevel == null ? CompositeFuture.all(messageSender.send(newMessage, start.context()), future2) : CompositeFuture.all(messageSender.sendAndWaitForOutcome(newMessage, start.context()), future2);
        }).compose(compositeFuture2 -> {
            if (!routingContext.response().closed()) {
                Command command = Command.get(routingContext);
                setResponsePayload(routingContext.response(), command);
                routingContext.addBodyEndHandler(r12 -> {
                    LOG.trace("successfully processed [{}] message for device [tenantId: {}, deviceId: {}]", new Object[]{str4, str, str2});
                    this.metrics.incrementProcessedHttpMessages(str4, str);
                    start.finish();
                });
                routingContext.response().exceptionHandler(th -> {
                    start.log("failed to send HTTP response to device");
                    LOG.debug("failed to send http response for [{}] message from device [tenantId: {}, deviceId: {}]", new Object[]{str4, str, str2, th});
                    if (command != null) {
                        sendCommandResponse(str, CommandResponse.from(command.getRequestId(), str2, 503));
                    }
                    TracingHelper.logError(start, th);
                    start.finish();
                });
                routingContext.response().end();
            }
            return Future.succeededFuture();
        }).recover(th -> {
            LOG.debug("cannot process [{}] message from device [tenantId: {}, deviceId: {}]", new Object[]{str4, str, str2, th});
            Command command = Command.get(routingContext);
            if (command != null) {
                sendCommandResponse(str, CommandResponse.from(command.getRequestId(), str2, 503));
            }
            if (ClientErrorException.class.isInstance(th)) {
                routingContext.fail((ClientErrorException) th);
            } else {
                this.metrics.incrementUndeliverableHttpMessages(str4, str);
                HttpUtils.serviceUnavailable(routingContext, 2, "temporarily unavailable");
            }
            TracingHelper.logError(start, th);
            start.finish();
            return Future.failedFuture(th);
        });
    }

    private void addConnectionCloseHandler(RoutingContext routingContext, MessageConsumer messageConsumer, String str, String str2) {
        Optional.ofNullable(messageConsumer).map(messageConsumer2 -> {
            if (!routingContext.response().closed()) {
                routingContext.response().closeHandler(r9 -> {
                    cancelCommandReceptionTimer(routingContext);
                    LOG.debug("Connection was closed before response could be sent - closing command consumer for device [tenantId: {}, deviceId: {}]", str, str2);
                    getCommandConnection().closeCommandConsumer(str, str2).setHandler(asyncResult -> {
                        if (asyncResult.failed()) {
                            LOG.warn("Close command consumer failed", asyncResult.cause());
                        }
                    });
                });
            }
            return messageConsumer2;
        });
    }

    private void setResponsePayload(HttpServerResponse httpServerResponse, Command command) {
        if (command == null) {
            httpServerResponse.setStatusCode(202);
            return;
        }
        LOG.trace("adding command [name: {}, request-id: {}] to response for device [tenant-id: {}, device-id: {}]", new Object[]{command.getName(), command.getRequestId(), command.getTenant(), command.getDeviceId()});
        httpServerResponse.setStatusCode(200);
        httpServerResponse.putHeader("hono-command", command.getName());
        httpServerResponse.putHeader("hono-cmd-req-id", command.getRequestId());
        HttpUtils.setResponseBody(httpServerResponse, command.getPayload());
    }

    protected final Future<MessageConsumer> createCommandConsumer(String str, String str2, RoutingContext routingContext, Future<Void> future, Span span) {
        long longValue = ((Long) Optional.ofNullable(HttpUtils.getTimeTilDisconnect(routingContext)).map(num -> {
            return Long.valueOf(num.intValue() * 1000);
        }).orElse(0L)).longValue();
        if (longValue <= 0) {
            future.tryComplete();
            return Future.succeededFuture();
        }
        span.setTag("ttd", Long.valueOf(longValue));
        return getCommandConnection().getOrCreateCommandConsumer(str, str2, createCommandMessageConsumer(str, str2, command -> {
            getCommandConnection().closeCommandConsumer(str, str2).setHandler(asyncResult -> {
                if (future.isComplete()) {
                    command.release();
                } else {
                    command.put(routingContext);
                    cancelCommandReceptionTimer(routingContext);
                    future.tryComplete();
                }
                if (asyncResult.failed()) {
                    LOG.warn("Close command consumer failed", asyncResult.cause());
                }
            });
        }), r7 -> {
            LOG.debug("peer closed command receiver link [tenant-id: {}, device-id: {}]", str, str2);
        }).map(messageConsumer -> {
            messageConsumer.flow(AT_LEAST_ONCE);
            if (!future.isComplete()) {
                addCommandReceptionTimer(routingContext, str, str2, future, longValue);
            }
            return messageConsumer;
        });
    }

    private void addCommandReceptionTimer(RoutingContext routingContext, String str, String str2, Future<Void> future, long j) {
        Long valueOf = Long.valueOf(routingContext.vertx().setTimer(j, l -> {
            LOG.trace("Command Reception timer fired, id {}", l);
            if (future.isComplete()) {
                LOG.trace("Nothing to close for timer since response was sent already");
            } else {
                future.tryComplete();
                getCommandConnection().closeCommandConsumer(str, str2).setHandler(asyncResult -> {
                    if (asyncResult.failed()) {
                        LOG.warn("Close command consumer failed", asyncResult.cause());
                    }
                });
            }
        }));
        LOG.trace("Adding command reception timer id {}", valueOf);
        routingContext.put(KEY_TIMER_ID, valueOf);
    }

    private void cancelCommandReceptionTimer(RoutingContext routingContext) {
        Optional.ofNullable(routingContext.get(KEY_TIMER_ID)).map(obj -> {
            if (((Long) obj).longValue() >= 0) {
                if (routingContext.vertx().cancelTimer(((Long) obj).longValue())) {
                    LOG.trace("Cancelled timer id {}", obj);
                } else {
                    LOG.debug("Could not cancel timer id {}", obj);
                }
            }
            return obj;
        });
    }

    public final void uploadCommandResponseMessage(RoutingContext routingContext, String str, String str2, String str3, Integer num) {
        Objects.requireNonNull(routingContext);
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Buffer body = routingContext.getBody();
        String contentType = HttpUtils.getContentType(routingContext);
        LOG.debug("processing response to command [tenantId: {}, deviceId: {}, cmd-req-id: {}, status code: {}]", new Object[]{str, str2, str3, num});
        CommandResponse from = CommandResponse.from(str3, str2, body, contentType, num);
        if (from == null) {
            HttpUtils.badRequest(routingContext, String.format("command-request-id [%s] or status code [%s] is missing/invalid", str3, num));
        } else {
            Future createCommandResponseSender = createCommandResponseSender(str, from.getReplyToId());
            createCommandResponseSender.compose(commandResponseSender -> {
                return commandResponseSender.sendCommandResponse(from);
            }).map(protonDelivery -> {
                LOG.trace("command response [command-request-id: {}] accepted by application", str3);
                routingContext.response().setStatusCode(202);
                routingContext.response().end();
                return protonDelivery;
            }).otherwise(th -> {
                LOG.debug("could not send command response [command-request-id: {}] to application", str3, th);
                routingContext.fail(new ServerErrorException(503, th));
                return null;
            }).setHandler(asyncResult -> {
                CommandResponseSender commandResponseSender2 = (CommandResponseSender) createCommandResponseSender.result();
                if (commandResponseSender2 != null) {
                    commandResponseSender2.close(asyncResult -> {
                    });
                }
            });
        }
    }

    private static Integer getQoSLevel(String str) {
        if (str == null) {
            return null;
        }
        try {
            return Integer.valueOf(Integer.parseInt(str) != AT_LEAST_ONCE ? HEADER_QOS_INVALID : AT_LEAST_ONCE);
        } catch (NumberFormatException e) {
            return Integer.valueOf(HEADER_QOS_INVALID);
        }
    }
}
