package org.eclipse.hono.adapter.http;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import java.util.Objects;
import java.util.Optional;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.adapter.http.HttpProtocolAdapterProperties;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.auth.device.Device;
import org.eclipse.hono.service.http.HttpUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapter.class */
public abstract class AbstractVertxBasedHttpProtocolAdapter<T extends HttpProtocolAdapterProperties> extends AbstractProtocolAdapterBase<T> {
    protected static final String DEFAULT_UPLOADS_DIRECTORY = "/tmp";
    private static final Logger LOG = LoggerFactory.getLogger(AbstractVertxBasedHttpProtocolAdapter.class);
    private HttpServer server;
    private HttpServer insecureServer;
    private HttpAdapterMetrics metrics;

    @Autowired
    public final void setMetrics(HttpAdapterMetrics httpAdapterMetrics) {
        this.metrics = httpAdapterMetrics;
    }

    public final int getPortDefaultValue() {
        return 8443;
    }

    public final int getInsecurePortDefaultValue() {
        return 8080;
    }

    protected final int getActualPort() {
        if (this.server != null) {
            return this.server.actualPort();
        }
        return -1;
    }

    protected final int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }

    @Autowired(required = false)
    public final void setHttpServer(HttpServer httpServer) {
        Objects.requireNonNull(httpServer);
        if (httpServer.actualPort() > 0) {
            throw new IllegalArgumentException("http server must not be started already");
        }
        this.server = httpServer;
    }

    @Autowired(required = false)
    public final void setInsecureHttpServer(HttpServer httpServer) {
        Objects.requireNonNull(httpServer);
        if (httpServer.actualPort() > 0) {
            throw new IllegalArgumentException("http server must not be started already");
        }
        this.insecureServer = httpServer;
    }

    public final void doStart(Future<Void> future) {
        checkPortConfiguration().compose(r3 -> {
            return preStartup();
        }).compose(r5 -> {
            Router createRouter = createRouter();
            if (createRouter == null) {
                return Future.failedFuture("no router configured");
            }
            addRoutes(createRouter);
            return CompositeFuture.all(bindSecureHttpServer(createRouter), bindInsecureHttpServer(createRouter));
        }).compose(compositeFuture -> {
            try {
                onStartupSuccess();
                future.complete();
            } catch (Exception e) {
                LOG.error("error in onStartupSuccess", e);
                future.fail(e);
            }
        }, future);
    }

    protected Future<Void> preStartup() {
        return Future.succeededFuture();
    }

    protected void onStartupSuccess() {
    }

    protected Router createRouter() {
        Router router = Router.router(this.vertx);
        LOG.info("limiting size of inbound request body to {} bytes", Integer.valueOf(((HttpProtocolAdapterProperties) getConfig()).getMaxPayloadSize()));
        router.route().handler(BodyHandler.create(DEFAULT_UPLOADS_DIRECTORY).setBodyLimit(((HttpProtocolAdapterProperties) getConfig()).getMaxPayloadSize()));
        return router;
    }

    protected abstract void addRoutes(Router router);

    protected HttpServerOptions getHttpServerOptions() {
        HttpServerOptions httpServerOptions = new HttpServerOptions();
        httpServerOptions.setHost(((HttpProtocolAdapterProperties) getConfig()).getBindAddress()).setPort(((HttpProtocolAdapterProperties) getConfig()).getPort(getPortDefaultValue())).setMaxChunkSize(4096);
        addTlsKeyCertOptions(httpServerOptions);
        addTlsTrustOptions(httpServerOptions);
        return httpServerOptions;
    }

    protected HttpServerOptions getInsecureHttpServerOptions() {
        HttpServerOptions httpServerOptions = new HttpServerOptions();
        httpServerOptions.setHost(((HttpProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress()).setPort(((HttpProtocolAdapterProperties) getConfig()).getInsecurePort(getInsecurePortDefaultValue())).setMaxChunkSize(4096);
        return httpServerOptions;
    }

    protected void customizeDownstreamMessage(Message message, RoutingContext routingContext) {
    }

    private Future<HttpServer> bindSecureHttpServer(Router router) {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        Future<HttpServer> future = Future.future();
        String bindAddress = this.server == null ? ((HttpProtocolAdapterProperties) getConfig()).getBindAddress() : "?";
        if (this.server == null) {
            this.server = this.vertx.createHttpServer(getHttpServerOptions());
        }
        HttpServer httpServer = this.server;
        router.getClass();
        httpServer.requestHandler(router::accept).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("secure http server listening on {}:{}", bindAddress, Integer.valueOf(this.server.actualPort()));
                future.complete(asyncResult.result());
            } else {
                LOG.error("error while starting up secure http server", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    private Future<HttpServer> bindInsecureHttpServer(Router router) {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        Future<HttpServer> future = Future.future();
        String insecurePortBindAddress = this.insecureServer == null ? ((HttpProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress() : "?";
        if (this.insecureServer == null) {
            this.insecureServer = this.vertx.createHttpServer(getInsecureHttpServerOptions());
        }
        HttpServer httpServer = this.insecureServer;
        router.getClass();
        httpServer.requestHandler(router::accept).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("insecure http server listening on {}:{}", insecurePortBindAddress, Integer.valueOf(this.insecureServer.actualPort()));
                future.complete(asyncResult.result());
            } else {
                LOG.error("error while starting up insecure http server", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    public final void doStop(Future<Void> future) {
        try {
            preShutdown();
        } catch (Exception e) {
            LOG.error("error in preShutdown", e);
        }
        Future future2 = Future.future();
        if (this.server != null) {
            this.server.close(future2.completer());
        } else {
            future2.complete();
        }
        Future future3 = Future.future();
        if (this.insecureServer != null) {
            this.insecureServer.close(future3.completer());
        } else {
            future3.complete();
        }
        CompositeFuture.all(future2, future3).compose(compositeFuture -> {
            return postShutdown();
        }).compose(r3 -> {
            future.complete();
        }, future);
    }

    protected void preShutdown() {
    }

    protected Future<Void> postShutdown() {
        return Future.succeededFuture();
    }

    public final void uploadTelemetryMessage(RoutingContext routingContext, String str, String str2) {
        uploadTelemetryMessage((RoutingContext) Objects.requireNonNull(routingContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), routingContext.getBody(), HttpUtils.getContentType(routingContext));
    }

    public final void uploadTelemetryMessage(RoutingContext routingContext, String str, String str2, Buffer buffer, String str3) {
        doUploadMessage((RoutingContext) Objects.requireNonNull(routingContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), buffer, str3, getTelemetrySender(str), "telemetry");
    }

    public final void uploadEventMessage(RoutingContext routingContext, String str, String str2) {
        uploadEventMessage((RoutingContext) Objects.requireNonNull(routingContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), routingContext.getBody(), HttpUtils.getContentType(routingContext));
    }

    public final void uploadEventMessage(RoutingContext routingContext, String str, String str2, Buffer buffer, String str3) {
        doUploadMessage((RoutingContext) Objects.requireNonNull(routingContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), buffer, str3, getEventSender(str), "event");
    }

    private void doUploadMessage(RoutingContext routingContext, String str, String str2, Buffer buffer, String str3, Future<MessageSender> future, String str4) {
        if (str3 == null) {
            HttpUtils.badRequest(routingContext, String.format("%s header is missing", HttpHeaders.CONTENT_TYPE));
        } else if (buffer == null || buffer.length() == 0) {
            HttpUtils.badRequest(routingContext, "missing body");
        } else {
            Future registrationAssertion = getRegistrationAssertion(str, str2, (Device) Optional.ofNullable(routingContext.user()).map(user -> {
                if (Device.class.isInstance(user)) {
                    return (Device) user;
                }
                return null;
            }).orElse(null));
            CompositeFuture.all(registrationAssertion, future).compose(compositeFuture -> {
                Message newMessage = newMessage(String.format("%s/%s", str4, str), str2, routingContext.request().uri(), str3, buffer, (JsonObject) registrationAssertion.result());
                customizeDownstreamMessage(newMessage, routingContext);
                return ((MessageSender) future.result()).send(newMessage);
            }).map(protonDelivery -> {
                LOG.trace("successfully processed message for device [tenantId: {}, deviceId: {}, endpoint: {}]", new Object[]{str, str2, str4});
                this.metrics.incrementProcessedHttpMessages(str4, str);
                routingContext.response().setStatusCode(202).end();
                return (Void) null;
            }).otherwise(th -> {
                if (ClientErrorException.class.isInstance(th)) {
                    ClientErrorException clientErrorException = (ClientErrorException) th;
                    LOG.debug("cannot process message for device [tenantId: {}, deviceId: {}, endpoint: {}]: {} - {}", new Object[]{str, str2, str4, Integer.valueOf(clientErrorException.getErrorCode()), clientErrorException.getMessage()});
                    routingContext.fail(clientErrorException.getErrorCode());
                } else {
                    LOG.debug("cannot process message for device [tenantId: {}, deviceId: {}, endpoint: {}]: {}", new Object[]{str, str2, str4, th.getMessage()});
                    this.metrics.incrementUndeliverableHttpMessages(str4, str);
                    HttpUtils.serviceUnavailable(routingContext, 2);
                }
                return (Void) null;
            });
        }
    }
}
