package org.eclipse.hono.adapter.http;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.proton.ProtonClientOptions;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.config.HonoConfigProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapter.class */
public abstract class AbstractVertxBasedHttpProtocolAdapter extends AbstractVerticle {
    protected static final String CONTENT_TYPE_JSON = "application/json";
    protected static final String CONTENT_TYPE_JSON_UFT8 = "application/json; charset=utf-8";
    private static final Logger LOG = LoggerFactory.getLogger(AbstractVertxBasedHttpProtocolAdapter.class);

    @Value("${spring.profiles.active:}")
    private String activeProfiles;
    private HttpServer server;
    private HonoClient hono;
    private HonoConfigProperties config;
    private BiConsumer<String, Handler<AsyncResult<MessageSender>>> eventSenderSupplier;
    private BiConsumer<String, Handler<AsyncResult<MessageSender>>> telemetrySenderSupplier;

    @Autowired(required = false)
    public final void setHttpServer(HttpServer httpServer) {
        Objects.requireNonNull(httpServer);
        if (httpServer.actualPort() > 0) {
            throw new IllegalArgumentException("http server must not be started already");
        }
        this.server = httpServer;
    }

    @Autowired
    public final void setHonoClient(HonoClient honoClient) {
        this.hono = (HonoClient) Objects.requireNonNull(honoClient);
    }

    public final HonoClient getHonoClient() {
        return this.hono;
    }

    @Autowired
    public final void setConfig(HonoConfigProperties honoConfigProperties) {
        this.config = (HonoConfigProperties) Objects.requireNonNull(honoConfigProperties);
    }

    public final HonoConfigProperties getConfig() {
        return this.config;
    }

    public final void start(Future<Void> future) throws Exception {
        if (this.hono == null) {
            future.fail("Hono client must be set");
            return;
        }
        this.eventSenderSupplier = (str, handler) -> {
            getHonoClient().getOrCreateEventSender(str, handler);
        };
        this.telemetrySenderSupplier = (str2, handler2) -> {
            getHonoClient().getOrCreateTelemetrySender(str2, handler2);
        };
        Future<Void> future2 = Future.future();
        preStartup(future2);
        future2.compose(r5 -> {
            Future<Void> future3 = Future.future();
            Router createRouter = createRouter();
            if (createRouter == null) {
                future3.fail("no router configured");
            } else {
                addRoutes(createRouter);
                bindHttpServer(createRouter, future3);
                connectToHono(null);
            }
            return future3;
        }).compose(r6 -> {
            try {
                onStartupSuccess();
            } catch (Exception e) {
                LOG.error("error in onStartupSuccess", e);
            }
            future.complete();
        }, future);
    }

    protected void preStartup(Future<Void> future) {
        future.complete();
    }

    protected void onStartupSuccess() {
    }

    protected Router createRouter() {
        Router router = Router.router(this.vertx);
        LOG.info("limiting size of inbound request body to {} bytes", Integer.valueOf(this.config.getMaxPayloadSize()));
        router.route().handler(BodyHandler.create().setBodyLimit(this.config.getMaxPayloadSize()));
        String statusResourcePath = getStatusResourcePath();
        if (statusResourcePath != null) {
            router.route(HttpMethod.GET, statusResourcePath).handler(this::doGetStatus);
        }
        return router;
    }

    protected String getStatusResourcePath() {
        return "/status";
    }

    protected abstract void addRoutes(Router router);

    protected HttpServerOptions getHttpServerOptions() {
        HttpServerOptions httpServerOptions = new HttpServerOptions();
        httpServerOptions.setHost(this.config.getBindAddress()).setPort(this.config.getPort()).setMaxChunkSize(4096);
        return httpServerOptions;
    }

    private void bindHttpServer(Router router, Future<Void> future) {
        if (this.server == null) {
            this.server = this.vertx.createHttpServer(getHttpServerOptions());
        }
        HttpServer httpServer = this.server;
        router.getClass();
        httpServer.requestHandler(router::accept).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("adapter running on {}:{}", this.config.getBindAddress(), Integer.valueOf(this.server.actualPort()));
                future.complete();
            } else {
                LOG.error("error while starting up adapter", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
    }

    public final void stop(Future<Void> future) throws Exception {
        try {
            preShutdown();
        } catch (Exception e) {
            LOG.error("error in preShutdown", e);
        }
        Future future2 = Future.future();
        future2.setHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("adapter has been shut down successfully");
                future.complete();
            } else {
                LOG.info("error while shutting down adapter", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        Future future3 = Future.future();
        if (this.server != null) {
            this.server.close(future3.completer());
        } else {
            future3.complete();
        }
        future3.compose(r4 -> {
            Future future4 = Future.future();
            if (this.hono != null) {
                this.hono.shutdown(future4.completer());
            } else {
                future4.complete();
            }
            return future4;
        }).compose(r5 -> {
            postShutdown(future2);
        }, future2);
    }

    protected void preShutdown() {
    }

    protected void postShutdown(Future<Void> future) {
        future.complete();
    }

    protected static void badRequest(HttpServerResponse httpServerResponse, String str) {
        badRequest(httpServerResponse, str, null);
    }

    protected static void badRequest(HttpServerResponse httpServerResponse, String str, String str2) {
        endWithStatus(httpServerResponse, 400, null, str, str2);
    }

    protected static void internalServerError(HttpServerResponse httpServerResponse, String str) {
        endWithStatus(httpServerResponse, 500, null, str, null);
    }

    protected static void serviceUnavailable(HttpServerResponse httpServerResponse, int i) {
        serviceUnavailable(httpServerResponse, i, null, null);
    }

    protected static void serviceUnavailable(HttpServerResponse httpServerResponse, int i, String str, String str2) {
        endWithStatus(httpServerResponse, 503, Collections.singletonMap(HttpHeaders.CONTENT_TYPE, str2 != null ? str2 : "text/plain"), str, str2);
    }

    protected static void endWithStatus(HttpServerResponse httpServerResponse, int i, Map<CharSequence, CharSequence> map, String str, String str2) {
        Objects.requireNonNull(httpServerResponse);
        httpServerResponse.setStatusCode(i);
        if (map != null) {
            for (Map.Entry<CharSequence, CharSequence> entry : map.entrySet()) {
                httpServerResponse.putHeader(entry.getKey(), entry.getValue());
            }
        }
        if (str == null) {
            httpServerResponse.end();
            return;
        }
        if (str2 != null) {
            httpServerResponse.putHeader(HttpHeaders.CONTENT_TYPE, str2);
        } else {
            httpServerResponse.putHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
        }
        httpServerResponse.end(str);
    }

    private void doGetStatus(RoutingContext routingContext) {
        JsonObject jsonObject = new JsonObject(this.hono.getConnectionStatus());
        jsonObject.put("active profiles", this.activeProfiles);
        jsonObject.put("senders", this.hono.getSenderStatus());
        adaptStatusResource(jsonObject);
        routingContext.response().putHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_JSON).end(jsonObject.encodePrettily());
    }

    protected void adaptStatusResource(JsonObject jsonObject) {
    }

    protected static String getContentType(RoutingContext routingContext) {
        return ((RoutingContext) Objects.requireNonNull(routingContext)).request().getHeader(HttpHeaders.CONTENT_TYPE);
    }

    public final void uploadTelemetryMessage(RoutingContext routingContext, String str, String str2) {
        uploadTelemetryMessage(((RoutingContext) Objects.requireNonNull(routingContext)).response(), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), routingContext.getBody(), getContentType(routingContext));
    }

    public final void uploadTelemetryMessage(HttpServerResponse httpServerResponse, String str, String str2, Buffer buffer, String str3) {
        doUploadMessage((HttpServerResponse) Objects.requireNonNull(httpServerResponse), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), buffer, str3, this.telemetrySenderSupplier);
    }

    public final void uploadEventMessage(RoutingContext routingContext, String str, String str2) {
        uploadEventMessage(((RoutingContext) Objects.requireNonNull(routingContext)).response(), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), routingContext.getBody(), getContentType(routingContext));
    }

    public final void uploadEventMessage(HttpServerResponse httpServerResponse, String str, String str2, Buffer buffer, String str3) {
        doUploadMessage((HttpServerResponse) Objects.requireNonNull(httpServerResponse), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), buffer, str3, this.eventSenderSupplier);
    }

    private void doUploadMessage(HttpServerResponse httpServerResponse, String str, String str2, Buffer buffer, String str3, BiConsumer<String, Handler<AsyncResult<MessageSender>>> biConsumer) {
        if (str3 == null) {
            badRequest(httpServerResponse, String.format("%s header is missing", HttpHeaders.CONTENT_TYPE));
        } else if (buffer == null || buffer.length() == 0) {
            badRequest(httpServerResponse, "missing body");
        } else {
            biConsumer.accept(str, asyncResult -> {
                if (asyncResult.succeeded()) {
                    sendToHono(httpServerResponse, str2, buffer, str3, (MessageSender) asyncResult.result());
                } else {
                    serviceUnavailable(httpServerResponse, 5);
                }
            });
        }
    }

    private void sendToHono(HttpServerResponse httpServerResponse, String str, Buffer buffer, String str2, MessageSender messageSender) {
        if (messageSender.send(str, buffer.getBytes(), str2)) {
            httpServerResponse.setStatusCode(202).end();
        } else {
            serviceUnavailable(httpServerResponse, 2, "resource limit exceeded, please try again later", "text/plain");
        }
    }

    private void connectToHono(Handler<AsyncResult<HonoClient>> handler) {
        this.hono.connect(new ProtonClientOptions().setReconnectAttempts(-1).setReconnectInterval(200L), asyncResult -> {
            if (handler != null) {
                handler.handle(asyncResult);
            }
        });
    }
}
