package org.eclipse.hono.commandrouter.quarkus;

import com.github.benmanes.caffeine.cache.Cache;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.opentracing.Tracer;
import io.smallrye.config.ConfigMapping;
import io.vertx.core.CompositeFuture;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Verticle;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.RequestResponseClientConfigProperties;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.kafka.CommonKafkaClientOptions;
import org.eclipse.hono.client.kafka.KafkaAdminClientConfigProperties;
import org.eclipse.hono.client.kafka.KafkaAdminClientOptions;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerOptions;
import org.eclipse.hono.client.kafka.consumer.MessagingKafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.client.kafka.metrics.KafkaMetricsOptions;
import org.eclipse.hono.client.kafka.metrics.MicrometerKafkaClientMetricsSupport;
import org.eclipse.hono.client.kafka.metrics.NoopKafkaClientMetricsSupport;
import org.eclipse.hono.client.kafka.producer.CachingKafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.notification.amqp.ProtonBasedNotificationReceiver;
import org.eclipse.hono.client.notification.kafka.KafkaBasedNotificationReceiver;
import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties;
import org.eclipse.hono.client.registry.DeviceRegistrationClient;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.client.registry.amqp.ProtonBasedDeviceRegistrationClient;
import org.eclipse.hono.client.registry.amqp.ProtonBasedTenantClient;
import org.eclipse.hono.client.util.MessagingClientProvider;
import org.eclipse.hono.commandrouter.AdapterInstanceStatusService;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandRouterAmqpServer;
import org.eclipse.hono.commandrouter.CommandRouterMetrics;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.commandrouter.impl.CommandRouterServiceImpl;
import org.eclipse.hono.commandrouter.impl.amqp.ProtonBasedCommandConsumerFactoryImpl;
import org.eclipse.hono.commandrouter.impl.kafka.InternalKafkaTopicCleanupService;
import org.eclipse.hono.commandrouter.impl.kafka.KafkaBasedCommandConsumerFactoryImpl;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.config.quarkus.ClientOptions;
import org.eclipse.hono.config.quarkus.RequestResponseClientOptions;
import org.eclipse.hono.config.quarkus.ServiceOptions;
import org.eclipse.hono.deviceconnection.infinispan.client.DeviceConnectionInfo;
import org.eclipse.hono.notification.NotificationConstants;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.NotificationReceiver;
import org.eclipse.hono.service.HealthCheckProvider;
import org.eclipse.hono.service.amqp.AmqpEndpoint;
import org.eclipse.hono.service.auth.AuthenticationService;
import org.eclipse.hono.service.cache.Caches;
import org.eclipse.hono.service.commandrouter.CommandRouterService;
import org.eclipse.hono.service.commandrouter.DelegatingCommandRouterAmqpEndpoint;
import org.eclipse.hono.service.quarkus.AbstractServiceApplication;
import org.eclipse.hono.util.RegistrationResult;
import org.eclipse.hono.util.TenantObject;
import org.eclipse.hono.util.TenantResult;
import org.eclipse.hono.util.WrappedLifecycleComponentVerticle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/eclipse/hono/commandrouter/quarkus/Application.class */
public class Application extends AbstractServiceApplication {
    private static final String COMPONENT_NAME = "Hono Command Router";
    private static final Logger LOG;

    @Inject
    Tracer tracer;

    @Inject
    DeviceConnectionInfo deviceConnectionInfo;

    @Inject
    ProtonSaslAuthenticatorFactory saslAuthenticatorFactory;

    @Inject
    AuthenticationService authenticationService;

    @Inject
    AdapterInstanceStatusService adapterInstanceStatusService;

    @Inject
    CommandRouterMetrics metrics;
    private ServiceConfigProperties amqpServerProperties;
    private ClientConfigProperties commandConsumerConnectionConfig;
    private RequestResponseClientConfigProperties deviceRegistrationClientConfig;
    private RequestResponseClientConfigProperties tenantClientConfig;
    private KafkaClientMetricsSupport kafkaClientMetricsSupport;
    private MessagingKafkaProducerConfigProperties commandInternalKafkaProducerConfig;
    private MessagingKafkaProducerConfigProperties commandResponseKafkaProducerConfig;
    private MessagingKafkaConsumerConfigProperties kafkaConsumerConfig;
    private KafkaAdminClientConfigProperties kafkaAdminClientConfig;
    private NotificationKafkaConsumerConfigProperties kafkaNotificationConfig;
    private InternalKafkaTopicCleanupService internalKafkaTopicCleanupService;
    private Cache<Object, RegistrationResult> registrationResponseCache;
    private Cache<Object, TenantResult<TenantObject>> tenantResponseCache;

    @Inject
    void setAmqpServerOptions(@ConfigMapping(prefix = "hono.commandRouter.amqp") ServiceOptions serviceOptions) {
        this.amqpServerProperties = new ServiceConfigProperties(serviceOptions);
    }

    @Inject
    void setCommandClientOptions(@ConfigMapping(prefix = "hono.command") ClientOptions clientOptions) {
        ClientConfigProperties clientConfigProperties = new ClientConfigProperties(clientOptions);
        clientConfigProperties.setServerRoleIfUnknown("Command & Control");
        clientConfigProperties.setNameIfNotSet(getComponentName());
        this.commandConsumerConnectionConfig = clientConfigProperties;
    }

    @Inject
    void setTenantServiceClientConfig(@ConfigMapping(prefix = "hono.tenant") RequestResponseClientOptions requestResponseClientOptions) {
        RequestResponseClientConfigProperties requestResponseClientConfigProperties = new RequestResponseClientConfigProperties(requestResponseClientOptions);
        requestResponseClientConfigProperties.setServerRoleIfUnknown("Tenant");
        requestResponseClientConfigProperties.setNameIfNotSet(getComponentName());
        this.tenantClientConfig = requestResponseClientConfigProperties;
    }

    @Inject
    void setDeviceRegistrationClientConfig(@ConfigMapping(prefix = "hono.registration") RequestResponseClientOptions requestResponseClientOptions) {
        RequestResponseClientConfigProperties requestResponseClientConfigProperties = new RequestResponseClientConfigProperties(requestResponseClientOptions);
        requestResponseClientConfigProperties.setServerRoleIfUnknown("Device Registration");
        requestResponseClientConfigProperties.setNameIfNotSet(getComponentName());
        this.deviceRegistrationClientConfig = requestResponseClientConfigProperties;
    }

    @Inject
    void setKafkaClientMetricsSupport(KafkaMetricsOptions kafkaMetricsOptions) {
        this.kafkaClientMetricsSupport = kafkaMetricsOptions.enabled() ? new MicrometerKafkaClientMetricsSupport(this.meterRegistry, kafkaMetricsOptions.useDefaultMetrics(), (List) kafkaMetricsOptions.metricsPrefixes().orElse(List.of())) : NoopKafkaClientMetricsSupport.INSTANCE;
    }

    @Inject
    void setCommandInternalKafkaClientOptions(@ConfigMapping(prefix = "hono.kafka") CommonKafkaClientOptions commonKafkaClientOptions, @ConfigMapping(prefix = "hono.kafka.commandInternal") KafkaProducerOptions kafkaProducerOptions) {
        this.commandInternalKafkaProducerConfig = new MessagingKafkaProducerConfigProperties(commonKafkaClientOptions, kafkaProducerOptions);
    }

    @Inject
    void setCommandResponseKafkaClientOptions(@ConfigMapping(prefix = "hono.kafka") CommonKafkaClientOptions commonKafkaClientOptions, @ConfigMapping(prefix = "hono.kafka.commandResponse") KafkaProducerOptions kafkaProducerOptions) {
        this.commandResponseKafkaProducerConfig = new MessagingKafkaProducerConfigProperties(commonKafkaClientOptions, kafkaProducerOptions);
    }

    @Inject
    void setCommandConsumerKafkaClientOptions(@ConfigMapping(prefix = "hono.kafka") CommonKafkaClientOptions commonKafkaClientOptions, @ConfigMapping(prefix = "hono.kafka.command") KafkaConsumerOptions kafkaConsumerOptions) {
        this.kafkaConsumerConfig = new MessagingKafkaConsumerConfigProperties(commonKafkaClientOptions, kafkaConsumerOptions);
    }

    @Inject
    void setAdminKafkaClientOptions(@ConfigMapping(prefix = "hono.kafka") CommonKafkaClientOptions commonKafkaClientOptions, @ConfigMapping(prefix = "hono.kafka.cleanup") KafkaAdminClientOptions kafkaAdminClientOptions) {
        this.kafkaAdminClientConfig = new KafkaAdminClientConfigProperties(commonKafkaClientOptions, kafkaAdminClientOptions);
    }

    @Inject
    void setNotificationKafkaClientOptions(@ConfigMapping(prefix = "hono.kafka") CommonKafkaClientOptions commonKafkaClientOptions, @ConfigMapping(prefix = "hono.kafka.notification") KafkaConsumerOptions kafkaConsumerOptions) {
        this.kafkaNotificationConfig = new NotificationKafkaConsumerConfigProperties(commonKafkaClientOptions, kafkaConsumerOptions);
    }

    public String getComponentName() {
        return COMPONENT_NAME;
    }

    protected void doStart() {
        if (!(this.authenticationService instanceof Verticle)) {
            throw new IllegalStateException("Authentication service must be a vert.x Verticle");
        }
        LOG.info("deploying {} {} instances ...", Integer.valueOf(this.appConfig.getMaxInstances()), getComponentName());
        CompletableFuture completableFuture = new CompletableFuture();
        Future onSuccess = CompositeFuture.all(this.vertx.deployVerticle(this.authenticationService).onSuccess(str -> {
            registerHealthCheckProvider(this.authenticationService);
        }), this.vertx.deployVerticle(this::amqpServer, new DeploymentOptions().setInstances(this.appConfig.getMaxInstances())), this.vertx.deployVerticle(new WrappedLifecycleComponentVerticle(notificationReceiver()))).compose(compositeFuture -> {
            return this.healthCheckServer.start();
        }).onSuccess(r4 -> {
            completableFuture.complete(null);
        });
        Objects.requireNonNull(completableFuture);
        onSuccess.onFailure(completableFuture::completeExceptionally);
        completableFuture.join();
    }

    private CommandRouterAmqpServer amqpServer() {
        CommandRouterAmqpServer commandRouterAmqpServer = new CommandRouterAmqpServer();
        commandRouterAmqpServer.setConfig(this.amqpServerProperties);
        commandRouterAmqpServer.setHealthCheckServer(this.healthCheckServer);
        commandRouterAmqpServer.setSaslAuthenticatorFactory(this.saslAuthenticatorFactory);
        commandRouterAmqpServer.setTracer(this.tracer);
        commandRouterAmqpServer.addEndpoint(commandRouterAmqpEndpoint());
        return commandRouterAmqpServer;
    }

    private AmqpEndpoint commandRouterAmqpEndpoint() {
        AmqpEndpoint amqpEndpoint = new DelegatingCommandRouterAmqpEndpoint<CommandRouterService>(this.vertx, commandRouterService()) { // from class: org.eclipse.hono.commandrouter.quarkus.Application.2
            public void registerLivenessChecks(HealthCheckHandler healthCheckHandler) {
                if (this.service instanceof HealthCheckProvider) {
                    ((HealthCheckProvider) this.service).registerLivenessChecks(healthCheckHandler);
                }
            }

            public void registerReadinessChecks(HealthCheckHandler healthCheckHandler) {
                if (this.service instanceof HealthCheckProvider) {
                    ((HealthCheckProvider) this.service).registerReadinessChecks(healthCheckHandler);
                }
            }
        };
        amqpEndpoint.setConfiguration(this.amqpServerProperties);
        amqpEndpoint.setTracer(this.tracer);
        return amqpEndpoint;
    }

    private CommandRouterService commandRouterService() {
        DeviceRegistrationClient registrationClient = registrationClient();
        TenantClient tenantClient = tenantClient();
        return new CommandRouterServiceImpl(this.amqpServerProperties, registrationClient, tenantClient, this.deviceConnectionInfo, commandConsumerFactoryProvider(tenantClient, CommandTargetMapper.create(registrationClient, this.deviceConnectionInfo, this.tracer)), this.adapterInstanceStatusService, this.tracer);
    }

    private MessagingClientProvider<CommandConsumerFactory> commandConsumerFactoryProvider(TenantClient tenantClient, CommandTargetMapper commandTargetMapper) {
        MessagingClientProvider<CommandConsumerFactory> messagingClientProvider = new MessagingClientProvider<>();
        if (this.kafkaConsumerConfig.isConfigured() && this.commandResponseKafkaProducerConfig.isConfigured() && this.commandInternalKafkaProducerConfig.isConfigured()) {
            CachingKafkaProducerFactory sharedFactory = CachingKafkaProducerFactory.sharedFactory(this.vertx);
            sharedFactory.setMetricsSupport(this.kafkaClientMetricsSupport);
            if (this.internalKafkaTopicCleanupService == null && this.commandInternalKafkaProducerConfig.isConfigured() && this.kafkaConsumerConfig.isConfigured() && this.kafkaAdminClientConfig.isConfigured() && !(this.adapterInstanceStatusService instanceof AdapterInstanceStatusService.UnknownStatusProvidingService)) {
                this.internalKafkaTopicCleanupService = new InternalKafkaTopicCleanupService(this.vertx, this.adapterInstanceStatusService, this.kafkaAdminClientConfig);
            }
            messagingClientProvider.setClient(new KafkaBasedCommandConsumerFactoryImpl(this.vertx, tenantClient, commandTargetMapper, sharedFactory, this.commandInternalKafkaProducerConfig, this.commandResponseKafkaProducerConfig, this.kafkaConsumerConfig, this.metrics, this.kafkaClientMetricsSupport, this.tracer, this.internalKafkaTopicCleanupService));
        }
        if (this.commandConsumerConnectionConfig.isHostConfigured()) {
            messagingClientProvider.setClient(new ProtonBasedCommandConsumerFactoryImpl(HonoConnection.newConnection(this.vertx, this.commandConsumerConnectionConfig, this.tracer), tenantClient, commandTargetMapper, this.metrics, SendMessageSampler.Factory.noop()));
        }
        return messagingClientProvider;
    }

    private Cache<Object, RegistrationResult> registrationResponseCache() {
        if (this.registrationResponseCache == null) {
            this.registrationResponseCache = Caches.newCaffeineCache(this.deviceRegistrationClientConfig);
        }
        return this.registrationResponseCache;
    }

    private Cache<Object, TenantResult<TenantObject>> tenantResponseCache() {
        if (this.tenantResponseCache == null) {
            this.tenantResponseCache = Caches.newCaffeineCache(this.tenantClientConfig);
        }
        return this.tenantResponseCache;
    }

    protected DeviceRegistrationClient registrationClient() {
        return new ProtonBasedDeviceRegistrationClient(HonoConnection.newConnection(this.vertx, this.deviceRegistrationClientConfig, this.tracer), SendMessageSampler.Factory.noop(), registrationResponseCache());
    }

    protected TenantClient tenantClient() {
        return new ProtonBasedTenantClient(HonoConnection.newConnection(this.vertx, this.tenantClientConfig, this.tracer), SendMessageSampler.Factory.noop(), tenantResponseCache());
    }

    public NotificationReceiver notificationReceiver() {
        KafkaBasedNotificationReceiver protonBasedNotificationReceiver;
        if (this.kafkaNotificationConfig.isConfigured()) {
            protonBasedNotificationReceiver = new KafkaBasedNotificationReceiver(this.vertx, this.kafkaNotificationConfig);
        } else {
            ClientConfigProperties clientConfigProperties = new ClientConfigProperties(this.commandConsumerConnectionConfig);
            clientConfigProperties.setServerRole("Notification");
            protonBasedNotificationReceiver = new ProtonBasedNotificationReceiver(HonoConnection.newConnection(this.vertx, clientConfigProperties, this.tracer));
        }
        Handler notificationSender = NotificationEventBusSupport.getNotificationSender(this.vertx);
        KafkaBasedNotificationReceiver kafkaBasedNotificationReceiver = protonBasedNotificationReceiver;
        NotificationConstants.DEVICE_REGISTRY_NOTIFICATION_TYPES.forEach(notificationType -> {
            Objects.requireNonNull(notificationSender);
            kafkaBasedNotificationReceiver.registerConsumer(notificationType, (v1) -> {
                r2.handle(v1);
            });
        });
        return protonBasedNotificationReceiver;
    }

    static {
        new Watcher<Pod>() { // from class: org.eclipse.hono.commandrouter.quarkus.Application.1
            public void eventReceived(Watcher.Action action, Pod pod) {
            }

            public void onClose(WatcherException watcherException) {
            }
        };
        LOG = LoggerFactory.getLogger(Application.class);
    }
}
