package org.eclipse.hono.commandrouter.quarkus;

import com.github.benmanes.caffeine.cache.Cache;
import io.micrometer.core.instrument.MeterRegistry;
import io.opentracing.Tracer;
import io.quarkus.arc.config.ConfigPrefix;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.CompositeFuture;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.impl.cpu.CpuCoreSensor;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory;
import java.util.concurrent.CompletableFuture;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.eclipse.hono.adapter.client.registry.DeviceRegistrationClient;
import org.eclipse.hono.adapter.client.registry.TenantClient;
import org.eclipse.hono.adapter.client.registry.amqp.ProtonBasedDeviceRegistrationClient;
import org.eclipse.hono.adapter.client.registry.amqp.ProtonBasedTenantClient;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.kafka.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;
import org.eclipse.hono.client.quarkus.ClientConfigProperties;
import org.eclipse.hono.client.quarkus.RequestResponseClientConfigProperties;
import org.eclipse.hono.client.util.MessagingClient;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandRouterAmqpServer;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.commandrouter.impl.CommandRouterServiceImpl;
import org.eclipse.hono.commandrouter.impl.amqp.ProtonBasedCommandConsumerFactoryImpl;
import org.eclipse.hono.commandrouter.impl.kafka.KafkaBasedCommandConsumerFactoryImpl;
import org.eclipse.hono.config.quarkus.ApplicationConfigProperties;
import org.eclipse.hono.config.quarkus.ServiceConfigProperties;
import org.eclipse.hono.deviceconnection.infinispan.client.DeviceConnectionInfo;
import org.eclipse.hono.service.HealthCheckProvider;
import org.eclipse.hono.service.HealthCheckServer;
import org.eclipse.hono.service.amqp.AmqpEndpoint;
import org.eclipse.hono.service.auth.AuthenticationService;
import org.eclipse.hono.service.cache.Caches;
import org.eclipse.hono.service.commandrouter.CommandRouterService;
import org.eclipse.hono.service.commandrouter.DelegatingCommandRouterAmqpEndpoint;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.RegistrationResult;
import org.eclipse.hono.util.TenantObject;
import org.eclipse.hono.util.TenantResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/eclipse/hono/commandrouter/quarkus/Application.class */
public class Application {
    private static final String COMPONENT_NAME = "Hono Command Router";
    private static final Logger LOG = LoggerFactory.getLogger(Application.class);

    @Inject
    Vertx vertx;

    @Inject
    Tracer tracer;

    @Inject
    ApplicationConfigProperties appConfig;

    @ConfigPrefix("hono.commandRouter.amqp")
    ServiceConfigProperties amqpServerProperties;

    @Inject
    CommandRouterServiceConfigProperties serviceConfig;

    @ConfigPrefix("hono.command")
    ClientConfigProperties commandConsumerFactoryConfig;

    @ConfigPrefix("hono.registration")
    RequestResponseClientConfigProperties deviceRegistrationClientConfig;

    @ConfigPrefix("hono.tenant")
    RequestResponseClientConfigProperties tenantClientConfig;

    @Inject
    KafkaProducerConfigProperties kafkaProducerConfig;

    @Inject
    KafkaConsumerConfigProperties kafkaConsumerConfig;

    @Inject
    DeviceConnectionInfo deviceConnectionInfo;

    @Inject
    HealthCheckServer healthCheckServer;

    @Inject
    MeterRegistry meterRegistry;

    @Inject
    ProtonSaslAuthenticatorFactory saslAuthenticatorFactory;

    @Inject
    AuthenticationService authenticationService;
    private Cache<Object, RegistrationResult> registrationResponseCache;
    private Cache<Object, TenantResult<TenantObject>> tenantResponseCache;

    String getComponentName() {
        return COMPONENT_NAME;
    }

    void onStart(@Observes StartupEvent startupEvent) {
        if (!(this.authenticationService instanceof Verticle)) {
            throw new IllegalStateException("Authentication service must be a vert.x Verticle");
        }
        logJvmDetails();
        LOG.info("adding common tags to meter registry");
        this.meterRegistry.config().commonTags(MetricsTags.forService("hono-command-router"));
        LOG.info("deploying {} {} instances ...", Integer.valueOf(this.appConfig.getMaxInstances()), getComponentName());
        CompletableFuture completableFuture = new CompletableFuture();
        Promise promise = Promise.promise();
        this.vertx.deployVerticle(this.authenticationService, promise);
        Promise promise2 = Promise.promise();
        this.vertx.deployVerticle(() -> {
            return amqpServer();
        }, new DeploymentOptions().setInstances(this.appConfig.getMaxInstances()), promise2);
        CompositeFuture.all(promise.future(), promise2.future()).compose(compositeFuture -> {
            return this.healthCheckServer.start();
        }).onSuccess(r4 -> {
            completableFuture.complete(null);
        }).onFailure(th -> {
            completableFuture.completeExceptionally(th);
        });
        completableFuture.join();
    }

    void onStop(@Observes ShutdownEvent shutdownEvent) {
        LOG.info("shutting down {}", getComponentName());
        CompletableFuture completableFuture = new CompletableFuture();
        this.healthCheckServer.stop().onComplete(asyncResult -> {
            this.vertx.close(asyncResult -> {
                if (asyncResult.succeeded()) {
                    completableFuture.complete(null);
                } else {
                    completableFuture.completeExceptionally(asyncResult.cause());
                }
            });
        });
        completableFuture.join();
    }

    private void logJvmDetails() {
        if (LOG.isInfoEnabled()) {
            LOG.info("running on Java VM [version: {}, name: {}, vendor: {}, max memory: {}MB, processors: {}]", new Object[]{System.getProperty("java.version"), System.getProperty("java.vm.name"), System.getProperty("java.vm.vendor"), Long.valueOf(Runtime.getRuntime().maxMemory() >> 20), Integer.valueOf(CpuCoreSensor.availableProcessors())});
        }
    }

    private CommandRouterAmqpServer amqpServer() {
        CommandRouterAmqpServer commandRouterAmqpServer = new CommandRouterAmqpServer();
        commandRouterAmqpServer.setConfig(this.amqpServerProperties);
        commandRouterAmqpServer.setHealthCheckServer(this.healthCheckServer);
        commandRouterAmqpServer.setSaslAuthenticatorFactory(this.saslAuthenticatorFactory);
        commandRouterAmqpServer.setTracer(this.tracer);
        commandRouterAmqpServer.addEndpoint(commandRouterAmqpEndpoint());
        return commandRouterAmqpServer;
    }

    private AmqpEndpoint commandRouterAmqpEndpoint() {
        AmqpEndpoint amqpEndpoint = new DelegatingCommandRouterAmqpEndpoint<CommandRouterService>(this.vertx, commandRouterService()) { // from class: org.eclipse.hono.commandrouter.quarkus.Application.1
            public void registerLivenessChecks(HealthCheckHandler healthCheckHandler) {
                if (this.service instanceof HealthCheckProvider) {
                    ((HealthCheckProvider) this.service).registerLivenessChecks(healthCheckHandler);
                }
            }

            public void registerReadinessChecks(HealthCheckHandler healthCheckHandler) {
                if (this.service instanceof HealthCheckProvider) {
                    ((HealthCheckProvider) this.service).registerReadinessChecks(healthCheckHandler);
                }
            }
        };
        amqpEndpoint.setConfiguration(this.serviceConfig);
        amqpEndpoint.setTracer(this.tracer);
        return amqpEndpoint;
    }

    private CommandRouterService commandRouterService() {
        DeviceRegistrationClient registrationClient = registrationClient();
        TenantClient tenantClient = tenantClient();
        return new CommandRouterServiceImpl(this.serviceConfig, registrationClient, tenantClient, this.deviceConnectionInfo, commandConsumerFactories(tenantClient, CommandTargetMapper.create(registrationClient, this.deviceConnectionInfo, this.tracer)), this.tracer);
    }

    private ClientConfigProperties commandConsumerFactoryConfig() {
        this.commandConsumerFactoryConfig.setServerRoleIfUnknown("Command & Control");
        this.commandConsumerFactoryConfig.setNameIfNotSet(getComponentName());
        return this.commandConsumerFactoryConfig;
    }

    private MessagingClient<CommandConsumerFactory> commandConsumerFactories(TenantClient tenantClient, CommandTargetMapper commandTargetMapper) {
        MessagingClient<CommandConsumerFactory> messagingClient = new MessagingClient<>();
        if (this.kafkaProducerConfig.isConfigured() && this.kafkaConsumerConfig.isConfigured()) {
            messagingClient.setClient(MessagingType.kafka, new KafkaBasedCommandConsumerFactoryImpl(this.vertx, tenantClient, commandTargetMapper, KafkaProducerFactory.sharedProducerFactory(this.vertx), this.kafkaProducerConfig, this.kafkaConsumerConfig, this.tracer));
        }
        ClientConfigProperties commandConsumerFactoryConfig = commandConsumerFactoryConfig();
        if (commandConsumerFactoryConfig.isHostConfigured()) {
            messagingClient.setClient(MessagingType.amqp, new ProtonBasedCommandConsumerFactoryImpl(HonoConnection.newConnection(this.vertx, commandConsumerFactoryConfig, this.tracer), tenantClient, commandTargetMapper, SendMessageSampler.Factory.noop()));
        }
        return messagingClient;
    }

    private RequestResponseClientConfigProperties registrationServiceClientConfig() {
        this.deviceRegistrationClientConfig.setServerRoleIfUnknown("Device Registration");
        this.deviceRegistrationClientConfig.setNameIfNotSet(getComponentName());
        return this.deviceRegistrationClientConfig;
    }

    private Cache<Object, RegistrationResult> registrationResponseCache() {
        if (this.registrationResponseCache == null) {
            this.registrationResponseCache = Caches.newCaffeineCache(this.deviceRegistrationClientConfig);
        }
        return this.registrationResponseCache;
    }

    private RequestResponseClientConfigProperties tenantServiceClientConfig() {
        this.tenantClientConfig.setServerRoleIfUnknown("Tenant");
        this.tenantClientConfig.setNameIfNotSet(getComponentName());
        return this.tenantClientConfig;
    }

    private Cache<Object, TenantResult<TenantObject>> tenantResponseCache() {
        if (this.tenantResponseCache == null) {
            this.tenantResponseCache = Caches.newCaffeineCache(this.tenantClientConfig);
        }
        return this.tenantResponseCache;
    }

    protected DeviceRegistrationClient registrationClient() {
        return new ProtonBasedDeviceRegistrationClient(HonoConnection.newConnection(this.vertx, registrationServiceClientConfig(), this.tracer), SendMessageSampler.Factory.noop(), registrationResponseCache());
    }

    protected TenantClient tenantClient() {
        return new ProtonBasedTenantClient(HonoConnection.newConnection(this.vertx, tenantServiceClientConfig(), this.tracer), SendMessageSampler.Factory.noop(), tenantResponseCache());
    }
}
