package org.eclipse.hono.adapter.http;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.eclipse.hono.adapter.http.HttpProtocolAdapterProperties;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.util.JwtHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapter.class */
public abstract class AbstractVertxBasedHttpProtocolAdapter<T extends HttpProtocolAdapterProperties> extends AbstractProtocolAdapterBase<T> {
    protected static final String CONTENT_TYPE_JSON = "application/json";
    protected static final String CONTENT_TYPE_JSON_UFT8 = "application/json; charset=utf-8";
    protected static final String DEFAULT_UPLOADS_DIRECTORY = "/tmp";
    protected static final String HEADER_REGISTRATION_ASSERTION = "Hono-Reg-Assertion";
    private static final Logger LOG = LoggerFactory.getLogger(AbstractVertxBasedHttpProtocolAdapter.class);

    @Value("${spring.profiles.active:}")
    private String activeProfiles;
    private HttpServer server;
    private HttpServer insecureServer;
    private HttpAdapterMetrics metrics;

    @Autowired
    public final void setMetrics(HttpAdapterMetrics httpAdapterMetrics) {
        this.metrics = httpAdapterMetrics;
    }

    public final int getPortDefaultValue() {
        return 8443;
    }

    public final int getInsecurePortDefaultValue() {
        return 8080;
    }

    protected final int getActualPort() {
        if (this.server != null) {
            return this.server.actualPort();
        }
        return -1;
    }

    protected final int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }

    @Autowired(required = false)
    public final void setHttpServer(HttpServer httpServer) {
        Objects.requireNonNull(httpServer);
        if (httpServer.actualPort() > 0) {
            throw new IllegalArgumentException("http server must not be started already");
        }
        this.server = httpServer;
    }

    @Autowired(required = false)
    public final void setInsecureHttpServer(HttpServer httpServer) {
        Objects.requireNonNull(httpServer);
        if (httpServer.actualPort() > 0) {
            throw new IllegalArgumentException("http server must not be started already");
        }
        this.insecureServer = httpServer;
    }

    public final void doStart(Future<Void> future) {
        checkPortConfiguration().compose(r3 -> {
            return preStartup();
        }).compose(r5 -> {
            Router createRouter = createRouter();
            if (createRouter == null) {
                return Future.failedFuture("no router configured");
            }
            addRoutes(createRouter);
            return CompositeFuture.all(bindSecureHttpServer(createRouter), bindInsecureHttpServer(createRouter));
        }).compose(compositeFuture -> {
            connectToMessaging(null);
            connectToDeviceRegistration(null);
            try {
                onStartupSuccess();
                future.complete();
            } catch (Exception e) {
                LOG.error("error in onStartupSuccess", e);
                future.fail(e);
            }
        }, future);
    }

    protected Future<Void> preStartup() {
        return Future.succeededFuture();
    }

    protected void onStartupSuccess() {
    }

    protected Router createRouter() {
        Router router = Router.router(this.vertx);
        LOG.info("limiting size of inbound request body to {} bytes", Integer.valueOf(((HttpProtocolAdapterProperties) getConfig()).getMaxPayloadSize()));
        router.route().handler(BodyHandler.create().setBodyLimit(((HttpProtocolAdapterProperties) getConfig()).getMaxPayloadSize()).setUploadsDirectory(DEFAULT_UPLOADS_DIRECTORY));
        String statusResourcePath = getStatusResourcePath();
        if (statusResourcePath != null) {
            router.route(HttpMethod.GET, statusResourcePath).handler(this::doGetStatus);
        }
        return router;
    }

    protected String getStatusResourcePath() {
        return "/status";
    }

    protected abstract void addRoutes(Router router);

    protected HttpServerOptions getHttpServerOptions() {
        HttpServerOptions httpServerOptions = new HttpServerOptions();
        httpServerOptions.setHost(((HttpProtocolAdapterProperties) getConfig()).getBindAddress()).setPort(((HttpProtocolAdapterProperties) getConfig()).getPort(getPortDefaultValue())).setMaxChunkSize(4096);
        addTlsKeyCertOptions(httpServerOptions);
        addTlsTrustOptions(httpServerOptions);
        return httpServerOptions;
    }

    protected HttpServerOptions getInsecureHttpServerOptions() {
        HttpServerOptions httpServerOptions = new HttpServerOptions();
        httpServerOptions.setHost(((HttpProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress()).setPort(((HttpProtocolAdapterProperties) getConfig()).getInsecurePort(getInsecurePortDefaultValue())).setMaxChunkSize(4096);
        return httpServerOptions;
    }

    private Future<HttpServer> bindSecureHttpServer(Router router) {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        Future<HttpServer> future = Future.future();
        String bindAddress = this.server == null ? ((HttpProtocolAdapterProperties) getConfig()).getBindAddress() : "?";
        if (this.server == null) {
            this.server = this.vertx.createHttpServer(getHttpServerOptions());
        }
        HttpServer httpServer = this.server;
        router.getClass();
        httpServer.requestHandler(router::accept).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("secure http server listening on {}:{}", bindAddress, Integer.valueOf(this.server.actualPort()));
                future.complete(asyncResult.result());
            } else {
                LOG.error("error while starting up secure http server", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    private Future<HttpServer> bindInsecureHttpServer(Router router) {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        Future<HttpServer> future = Future.future();
        String insecurePortBindAddress = this.insecureServer == null ? ((HttpProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress() : "?";
        if (this.insecureServer == null) {
            this.insecureServer = this.vertx.createHttpServer(getInsecureHttpServerOptions());
        }
        HttpServer httpServer = this.insecureServer;
        router.getClass();
        httpServer.requestHandler(router::accept).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("insecure http server listening on {}:{}", insecurePortBindAddress, Integer.valueOf(this.insecureServer.actualPort()));
                future.complete(asyncResult.result());
            } else {
                LOG.error("error while starting up insecure http server", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    public final void doStop(Future<Void> future) {
        try {
            preShutdown();
        } catch (Exception e) {
            LOG.error("error in preShutdown", e);
        }
        Future future2 = Future.future();
        future2.setHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("HTTP adapter has been shut down successfully");
                future.complete();
            } else {
                LOG.info("error while shutting down adapter", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        Future future3 = Future.future();
        if (this.server != null) {
            this.server.close(future3.completer());
        } else {
            future3.complete();
        }
        Future future4 = Future.future();
        if (this.insecureServer != null) {
            this.insecureServer.close(future4.completer());
        } else {
            future4.complete();
        }
        CompositeFuture.all(future3, future4).compose(compositeFuture -> {
            Future future5 = Future.future();
            closeClients(future5.completer());
            return future5;
        }).compose(r3 -> {
            return postShutdown();
        }).compose(r32 -> {
            future2.complete();
        }, future2);
    }

    protected void preShutdown() {
    }

    protected Future<Void> postShutdown() {
        return Future.succeededFuture();
    }

    protected static void badRequest(HttpServerResponse httpServerResponse, String str) {
        badRequest(httpServerResponse, str, null);
    }

    protected static void badRequest(HttpServerResponse httpServerResponse, String str, String str2) {
        LOG.debug("Bad request: {}", str);
        endWithStatus(httpServerResponse, 400, null, str, str2);
    }

    protected static void internalServerError(HttpServerResponse httpServerResponse, String str) {
        LOG.debug("Internal server error: {}", str);
        endWithStatus(httpServerResponse, 500, null, str, null);
    }

    protected static void serviceUnavailable(HttpServerResponse httpServerResponse, int i) {
        serviceUnavailable(httpServerResponse, i, null, null);
    }

    protected static void serviceUnavailable(HttpServerResponse httpServerResponse, int i, String str, String str2) {
        LOG.debug("Service unavailable: {}", str);
        HashMap hashMap = new HashMap(2);
        hashMap.put(HttpHeaders.CONTENT_TYPE, str2 != null ? str2 : "text/plain");
        hashMap.put(HttpHeaders.RETRY_AFTER, String.valueOf(i));
        endWithStatus(httpServerResponse, 503, hashMap, str, str2);
    }

    protected static void unauthorized(HttpServerResponse httpServerResponse, String str) {
        Objects.requireNonNull(httpServerResponse);
        Objects.requireNonNull(str);
        LOG.debug("client is required to authenticate [{}]", str);
        HashMap hashMap = new HashMap();
        hashMap.put("WWW-Authenticate", str);
        endWithStatus(httpServerResponse, 401, hashMap, null, null);
    }

    protected static void endWithStatus(HttpServerResponse httpServerResponse, int i, Map<CharSequence, CharSequence> map, String str, String str2) {
        Objects.requireNonNull(httpServerResponse);
        httpServerResponse.setStatusCode(i);
        if (map != null) {
            for (Map.Entry<CharSequence, CharSequence> entry : map.entrySet()) {
                httpServerResponse.putHeader(entry.getKey(), entry.getValue());
            }
        }
        if (str == null) {
            httpServerResponse.end();
            return;
        }
        if (str2 != null) {
            httpServerResponse.putHeader(HttpHeaders.CONTENT_TYPE, str2);
        } else {
            httpServerResponse.putHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
        }
        httpServerResponse.end(str);
    }

    private void doGetStatus(RoutingContext routingContext) {
        JsonObject jsonObject = new JsonObject(getHonoMessagingClient().getConnectionStatus());
        jsonObject.put("active profiles", this.activeProfiles);
        jsonObject.put("senders", getHonoMessagingClient().getSenderStatus());
        adaptStatusResource(jsonObject);
        routingContext.response().putHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_JSON).end(jsonObject.encodePrettily());
    }

    protected void adaptStatusResource(JsonObject jsonObject) {
    }

    protected static String getContentType(RoutingContext routingContext) {
        return ((RoutingContext) Objects.requireNonNull(routingContext)).request().getHeader(HttpHeaders.CONTENT_TYPE);
    }

    public final void uploadTelemetryMessage(RoutingContext routingContext, String str, String str2) {
        uploadTelemetryMessage((RoutingContext) Objects.requireNonNull(routingContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), routingContext.getBody(), getContentType(routingContext));
    }

    public final void uploadTelemetryMessage(RoutingContext routingContext, String str, String str2, Buffer buffer, String str3) {
        doUploadMessage((RoutingContext) Objects.requireNonNull(routingContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), buffer, str3, getTelemetrySender(str), "telemetry");
    }

    public final void uploadEventMessage(RoutingContext routingContext, String str, String str2) {
        uploadEventMessage((RoutingContext) Objects.requireNonNull(routingContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), routingContext.getBody(), getContentType(routingContext));
    }

    public final void uploadEventMessage(RoutingContext routingContext, String str, String str2, Buffer buffer, String str3) {
        doUploadMessage((RoutingContext) Objects.requireNonNull(routingContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), buffer, str3, getEventSender(str), "event");
    }

    private void doUploadMessage(RoutingContext routingContext, String str, String str2, Buffer buffer, String str3, Future<MessageSender> future, String str4) {
        if (str3 == null) {
            badRequest(routingContext.response(), String.format("%s header is missing", HttpHeaders.CONTENT_TYPE));
            this.metrics.incrementUndeliverableHttpMessages(str4, str);
        } else if (buffer == null || buffer.length() == 0) {
            badRequest(routingContext.response(), "missing body");
            this.metrics.incrementUndeliverableHttpMessages(str4, str);
        } else {
            Future<String> registrationAssertionHeader = getRegistrationAssertionHeader(routingContext, str, str2);
            CompositeFuture.all(registrationAssertionHeader, future).setHandler(asyncResult -> {
                if (!asyncResult.failed()) {
                    sendToHono(routingContext.response(), str2, buffer, str3, (String) registrationAssertionHeader.result(), (MessageSender) future.result(), str, str4);
                    return;
                }
                if (registrationAssertionHeader.failed()) {
                    LOG.debug("could not get registration assertion [tenant: {}, device: {}]", new Object[]{str, str2, asyncResult.cause()});
                    endWithStatus(routingContext.response(), 403, null, null, null);
                } else {
                    serviceUnavailable(routingContext.response(), 5);
                }
                this.metrics.incrementUndeliverableHttpMessages(str4, str);
            });
        }
    }

    private void sendToHono(HttpServerResponse httpServerResponse, String str, Buffer buffer, String str2, String str3, MessageSender messageSender, String str4, String str5) {
        if (messageSender.send(str, buffer.getBytes(), str2, str3)) {
            httpServerResponse.setStatusCode(202).end();
            this.metrics.incrementProcessedHttpMessages(str5, str4);
        } else {
            serviceUnavailable(httpServerResponse, 2, "resource limit exceeded, please try again later", "text/plain");
            this.metrics.incrementUndeliverableHttpMessages(str5, str4);
        }
    }

    protected final Future<String> getRegistrationAssertionHeader(RoutingContext routingContext, String str, String str2) {
        String header = routingContext.request().getHeader(HEADER_REGISTRATION_ASSERTION);
        return (header == null || JwtHelper.isExpired(header, 5)) ? getRegistrationAssertion(str, str2).compose(str3 -> {
            if (((HttpProtocolAdapterProperties) getConfig()).isRegAssertionEnabled()) {
                routingContext.response().putHeader(HEADER_REGISTRATION_ASSERTION, str3);
            }
            return Future.succeededFuture(str3);
        }) : Future.succeededFuture(header);
    }
}
