package org.eclipse.hono.adapter.quarkus;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.opentracing.Tracer;
import io.quarkus.arc.config.ConfigPrefix;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import javax.inject.Inject;
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.adapter.MessagingClientProviders;
import org.eclipse.hono.adapter.monitoring.ConnectionEventProducer;
import org.eclipse.hono.adapter.monitoring.ConnectionEventProducerConfig;
import org.eclipse.hono.adapter.monitoring.HonoEventConnectionEventProducer;
import org.eclipse.hono.adapter.monitoring.LoggingConnectionEventProducer;
import org.eclipse.hono.adapter.monitoring.quarkus.ConnectionEventProducerConfig;
import org.eclipse.hono.adapter.resourcelimits.ConnectedDevicesAsyncCacheLoader;
import org.eclipse.hono.adapter.resourcelimits.ConnectionDurationAsyncCacheLoader;
import org.eclipse.hono.adapter.resourcelimits.DataVolumeAsyncCacheLoader;
import org.eclipse.hono.adapter.resourcelimits.NoopResourceLimitChecks;
import org.eclipse.hono.adapter.resourcelimits.PrometheusBasedResourceLimitChecks;
import org.eclipse.hono.adapter.resourcelimits.ResourceLimitChecks;
import org.eclipse.hono.adapter.resourcelimits.quarkus.PrometheusBasedResourceLimitChecksConfig;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.command.CommandConsumerFactory;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.command.CommandRouterClient;
import org.eclipse.hono.client.command.CommandRouterCommandConsumerFactory;
import org.eclipse.hono.client.command.DeviceConnectionClient;
import org.eclipse.hono.client.command.amqp.ProtonBasedCommandResponseSender;
import org.eclipse.hono.client.command.amqp.ProtonBasedCommandRouterClient;
import org.eclipse.hono.client.command.amqp.ProtonBasedDelegatingCommandConsumerFactory;
import org.eclipse.hono.client.command.amqp.ProtonBasedInternalCommandConsumer;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommandResponseSender;
import org.eclipse.hono.client.command.kafka.KafkaBasedInternalCommandConsumer;
import org.eclipse.hono.client.kafka.KafkaAdminClientConfigProperties;
import org.eclipse.hono.client.kafka.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;
import org.eclipse.hono.client.quarkus.RequestResponseClientConfigProperties;
import org.eclipse.hono.client.registry.CredentialsClient;
import org.eclipse.hono.client.registry.DeviceRegistrationClient;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.client.registry.amqp.ProtonBasedCredentialsClient;
import org.eclipse.hono.client.registry.amqp.ProtonBasedDeviceRegistrationClient;
import org.eclipse.hono.client.registry.amqp.ProtonBasedTenantClient;
import org.eclipse.hono.client.telemetry.amqp.ProtonBasedDownstreamSender;
import org.eclipse.hono.client.telemetry.kafka.KafkaBasedEventSender;
import org.eclipse.hono.client.telemetry.kafka.KafkaBasedTelemetrySender;
import org.eclipse.hono.client.util.MessagingClientProvider;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.config.quarkus.ApplicationConfigProperties;
import org.eclipse.hono.service.cache.Caches;
import org.eclipse.hono.service.quarkus.AbstractServiceApplication;
import org.eclipse.hono.util.CredentialsObject;
import org.eclipse.hono.util.CredentialsResult;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.RegistrationResult;
import org.eclipse.hono.util.TenantObject;
import org.eclipse.hono.util.TenantResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/adapter/quarkus/AbstractProtocolAdapterApplication.class */
public abstract class AbstractProtocolAdapterApplication<C extends ProtocolAdapterProperties> extends AbstractServiceApplication {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractProtocolAdapterApplication.class);

    @Inject
    protected Tracer tracer;

    @Inject
    protected SendMessageSampler.Factory messageSamplerFactory;

    @Inject
    protected PrometheusBasedResourceLimitChecksConfig resourceLimitChecksConfig;

    @Inject
    protected ConnectionEventProducerConfig connectionEventsConfig;

    @Inject
    protected C protocolAdapterProperties;

    @Inject
    protected ApplicationConfigProperties appConfig;

    @Inject
    protected KafkaProducerConfigProperties kafkaProducerConfig;

    @Inject
    protected KafkaConsumerConfigProperties kafkaConsumerConfig;

    @Inject
    protected KafkaAdminClientConfigProperties kafkaAdminClientConfig;

    @ConfigPrefix("hono.messaging")
    protected RequestResponseClientConfigProperties downstreamSenderConfig;

    @ConfigPrefix("hono.command")
    protected RequestResponseClientConfigProperties commandConsumerConfig;

    @ConfigPrefix("hono.tenant")
    protected RequestResponseClientConfigProperties tenantClientConfig;

    @ConfigPrefix("hono.registration")
    protected RequestResponseClientConfigProperties deviceRegistrationClientConfig;

    @ConfigPrefix("hono.credentials")
    protected RequestResponseClientConfigProperties credentialsClientConfig;

    @ConfigPrefix("hono.commandRouter")
    protected RequestResponseClientConfigProperties commandRouterConfig;
    private Cache<Object, TenantResult<TenantObject>> tenantResponseCache;
    private Cache<Object, RegistrationResult> registrationResponseCache;
    private Cache<Object, CredentialsResult<CredentialsObject>> credentialsResponseCache;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.eclipse.hono.adapter.quarkus.AbstractProtocolAdapterApplication$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/hono/adapter/quarkus/AbstractProtocolAdapterApplication$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$hono$adapter$monitoring$ConnectionEventProducerConfig$ConnectionEventProducerType = new int[ConnectionEventProducerConfig.ConnectionEventProducerType.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$hono$adapter$monitoring$ConnectionEventProducerConfig$ConnectionEventProducerType[ConnectionEventProducerConfig.ConnectionEventProducerType.logging.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$hono$adapter$monitoring$ConnectionEventProducerConfig$ConnectionEventProducerType[ConnectionEventProducerConfig.ConnectionEventProducerType.events.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    protected abstract AbstractProtocolAdapterBase<C> adapter();

    protected void doStart() {
        LOG.info("deploying {} {} instances ...", Integer.valueOf(this.appConfig.getMaxInstances()), getComponentName());
        CompletableFuture completableFuture = new CompletableFuture();
        Promise promise = Promise.promise();
        this.vertx.deployVerticle(() -> {
            return adapter();
        }, new DeploymentOptions().setInstances(this.appConfig.getMaxInstances()), promise);
        promise.future().compose(str -> {
            return this.healthCheckServer.start();
        }).onSuccess(r4 -> {
            completableFuture.complete(null);
        }).onFailure(th -> {
            completableFuture.completeExceptionally(th);
        });
        completableFuture.join();
    }

    protected void setCollaborators(AbstractProtocolAdapterBase<?> abstractProtocolAdapterBase) {
        Objects.requireNonNull(abstractProtocolAdapterBase);
        DeviceRegistrationClient registrationClient = registrationClient();
        MessagingClientProvider messagingClientProvider = new MessagingClientProvider();
        MessagingClientProvider messagingClientProvider2 = new MessagingClientProvider();
        MessagingClientProvider messagingClientProvider3 = new MessagingClientProvider();
        if (this.kafkaProducerConfig.isConfigured()) {
            LOG.info("Kafka Producer is configured, adding Kafka messaging clients");
            Optional ofNullable = Optional.ofNullable(getComponentName());
            KafkaProducerConfigProperties kafkaProducerConfigProperties = this.kafkaProducerConfig;
            Objects.requireNonNull(kafkaProducerConfigProperties);
            ofNullable.ifPresent(kafkaProducerConfigProperties::setDefaultClientIdPrefix);
            LOG.debug("KafkaProducerConfig: " + this.kafkaProducerConfig.getProducerConfig("log"));
            KafkaProducerFactory sharedProducerFactory = KafkaProducerFactory.sharedProducerFactory(this.vertx);
            messagingClientProvider.setClient(new KafkaBasedTelemetrySender(sharedProducerFactory, this.kafkaProducerConfig, this.protocolAdapterProperties.isDefaultsEnabled(), this.tracer));
            messagingClientProvider2.setClient(new KafkaBasedEventSender(sharedProducerFactory, this.kafkaProducerConfig, this.protocolAdapterProperties.isDefaultsEnabled(), this.tracer));
            messagingClientProvider3.setClient(new KafkaBasedCommandResponseSender(sharedProducerFactory, this.kafkaProducerConfig, this.tracer));
        }
        if (this.downstreamSenderConfig.isHostConfigured()) {
            messagingClientProvider.setClient(downstreamSender());
            messagingClientProvider2.setClient(downstreamSender());
            messagingClientProvider3.setClient(new ProtonBasedCommandResponseSender(HonoConnection.newConnection(this.vertx, commandResponseSenderConfig(), this.tracer), this.messageSamplerFactory, this.protocolAdapterProperties.isJmsVendorPropsEnabled()));
        }
        MessagingClientProviders messagingClientProviders = new MessagingClientProviders(messagingClientProvider, messagingClientProvider2, messagingClientProvider3);
        if (!this.commandRouterConfig.isHostConfigured()) {
            LOG.warn("Quarkus based protocol adapters do not support the Device Connection service to be used. Make sure to configure a connection to the Command Router service instead.");
            throw new IllegalStateException("No Command Router connection configured");
        }
        CommandRouterClient commandRouterClient = commandRouterClient();
        abstractProtocolAdapterBase.setCommandRouterClient(commandRouterClient);
        CommandRouterCommandConsumerFactory commandConsumerFactory = commandConsumerFactory(commandRouterClient);
        commandConsumerFactory.registerInternalCommandConsumer((str, commandHandlers) -> {
            return new ProtonBasedInternalCommandConsumer(commandConsumerConnection(), str, commandHandlers);
        });
        CommandResponseSender client = messagingClientProviders.getCommandResponseSenderProvider().getClient(MessagingType.kafka);
        if (this.kafkaAdminClientConfig.isConfigured() && this.kafkaConsumerConfig.isConfigured() && client != null) {
            commandConsumerFactory.registerInternalCommandConsumer((str2, commandHandlers2) -> {
                return new KafkaBasedInternalCommandConsumer(this.vertx, this.kafkaAdminClientConfig, this.kafkaConsumerConfig, client, str2, commandHandlers2, this.tracer);
            });
        }
        abstractProtocolAdapterBase.setCommandConsumerFactory(commandConsumerFactory);
        TenantClient tenantClient = tenantClient();
        abstractProtocolAdapterBase.setMessagingClientProviders(messagingClientProviders);
        Optional ofNullable2 = Optional.ofNullable(connectionEventProducer());
        Objects.requireNonNull(abstractProtocolAdapterBase);
        ofNullable2.ifPresent(abstractProtocolAdapterBase::setConnectionEventProducer);
        abstractProtocolAdapterBase.setCredentialsClient(credentialsClient());
        abstractProtocolAdapterBase.setHealthCheckServer(this.healthCheckServer);
        abstractProtocolAdapterBase.setRegistrationClient(registrationClient);
        abstractProtocolAdapterBase.setResourceLimitChecks(prometheusResourceLimitChecks(this.resourceLimitChecksConfig, tenantClient));
        abstractProtocolAdapterBase.setTenantClient(tenantClient);
        abstractProtocolAdapterBase.setTracer(this.tracer);
    }

    protected ConnectionEventProducer connectionEventProducer() {
        switch (AnonymousClass1.$SwitchMap$org$eclipse$hono$adapter$monitoring$ConnectionEventProducerConfig$ConnectionEventProducerType[this.connectionEventsConfig.getType().ordinal()]) {
            case 1:
                return new LoggingConnectionEventProducer(this.connectionEventsConfig);
            case 2:
                return new HonoEventConnectionEventProducer();
            default:
                return null;
        }
    }

    private RequestResponseClientConfigProperties tenantServiceClientConfig() {
        this.tenantClientConfig.setServerRoleIfUnknown("Tenant");
        this.tenantClientConfig.setNameIfNotSet(getComponentName());
        return this.tenantClientConfig;
    }

    private Cache<Object, TenantResult<TenantObject>> tenantResponseCache() {
        if (this.tenantResponseCache == null) {
            this.tenantResponseCache = Caches.newCaffeineCache(this.tenantClientConfig);
        }
        return this.tenantResponseCache;
    }

    protected TenantClient tenantClient() {
        return new ProtonBasedTenantClient(HonoConnection.newConnection(this.vertx, tenantServiceClientConfig(), this.tracer), this.messageSamplerFactory, tenantResponseCache());
    }

    private RequestResponseClientConfigProperties registrationServiceClientConfig() {
        this.deviceRegistrationClientConfig.setServerRoleIfUnknown("Device Registration");
        this.deviceRegistrationClientConfig.setNameIfNotSet(getComponentName());
        return this.deviceRegistrationClientConfig;
    }

    private Cache<Object, RegistrationResult> registrationResponseCache() {
        if (this.registrationResponseCache == null) {
            this.registrationResponseCache = Caches.newCaffeineCache(this.deviceRegistrationClientConfig);
        }
        return this.registrationResponseCache;
    }

    protected DeviceRegistrationClient registrationClient() {
        return new ProtonBasedDeviceRegistrationClient(HonoConnection.newConnection(this.vertx, registrationServiceClientConfig(), this.tracer), this.messageSamplerFactory, registrationResponseCache());
    }

    private RequestResponseClientConfigProperties credentialsServiceClientConfig() {
        this.credentialsClientConfig.setServerRoleIfUnknown("Credentials");
        this.credentialsClientConfig.setNameIfNotSet(getComponentName());
        return this.credentialsClientConfig;
    }

    private Cache<Object, CredentialsResult<CredentialsObject>> credentialsResponseCache() {
        if (this.credentialsResponseCache == null) {
            this.credentialsResponseCache = Caches.newCaffeineCache(this.credentialsClientConfig);
        }
        return this.credentialsResponseCache;
    }

    protected CredentialsClient credentialsClient() {
        return new ProtonBasedCredentialsClient(HonoConnection.newConnection(this.vertx, credentialsServiceClientConfig(), this.tracer), this.messageSamplerFactory, credentialsResponseCache());
    }

    private RequestResponseClientConfigProperties commandRouterServiceClientConfig() {
        this.commandRouterConfig.setServerRoleIfUnknown("Command Router");
        this.commandRouterConfig.setNameIfNotSet(getComponentName());
        return this.commandRouterConfig;
    }

    protected CommandRouterClient commandRouterClient() {
        return new ProtonBasedCommandRouterClient(HonoConnection.newConnection(this.vertx, commandRouterServiceClientConfig(), this.tracer), this.messageSamplerFactory);
    }

    private ClientConfigProperties downstreamSenderConfig() {
        ClientConfigProperties clientConfigProperties = new ClientConfigProperties(this.downstreamSenderConfig);
        clientConfigProperties.setServerRoleIfUnknown("Downstream");
        clientConfigProperties.setNameIfNotSet(getComponentName());
        return clientConfigProperties;
    }

    private ProtonBasedDownstreamSender downstreamSender() {
        return new ProtonBasedDownstreamSender(HonoConnection.newConnection(this.vertx, downstreamSenderConfig(), this.tracer), this.messageSamplerFactory, this.protocolAdapterProperties.isDefaultsEnabled(), this.protocolAdapterProperties.isJmsVendorPropsEnabled());
    }

    private ClientConfigProperties commandConsumerConfig() {
        this.commandConsumerConfig.setServerRoleIfUnknown("Command & Control");
        this.commandConsumerConfig.setNameIfNotSet(getComponentName());
        return this.commandConsumerConfig;
    }

    protected HonoConnection commandConsumerConnection() {
        return HonoConnection.newConnection(this.vertx, commandConsumerConfig(), this.tracer);
    }

    protected CommandConsumerFactory commandConsumerFactory(DeviceConnectionClient deviceConnectionClient, DeviceRegistrationClient deviceRegistrationClient) {
        LOG.debug("using Device Connection service client, configuring CommandConsumerFactory [{}]", ProtonBasedDelegatingCommandConsumerFactory.class.getName());
        return new ProtonBasedDelegatingCommandConsumerFactory(commandConsumerConnection(), this.messageSamplerFactory, deviceConnectionClient, deviceRegistrationClient, this.tracer);
    }

    protected CommandRouterCommandConsumerFactory commandConsumerFactory(CommandRouterClient commandRouterClient) {
        LOG.debug("using Command Router service client, configuring CommandConsumerFactory [{}]", CommandRouterCommandConsumerFactory.class.getName());
        return new CommandRouterCommandConsumerFactory(commandRouterClient, getComponentName());
    }

    private ClientConfigProperties commandResponseSenderConfig() {
        ClientConfigProperties clientConfigProperties = new ClientConfigProperties(this.downstreamSenderConfig);
        clientConfigProperties.setServerRoleIfUnknown("Command Response");
        clientConfigProperties.setNameIfNotSet(getComponentName());
        return clientConfigProperties;
    }

    protected ResourceLimitChecks prometheusResourceLimitChecks(PrometheusBasedResourceLimitChecksConfig prometheusBasedResourceLimitChecksConfig, TenantClient tenantClient) {
        Objects.requireNonNull(prometheusBasedResourceLimitChecksConfig);
        Objects.requireNonNull(tenantClient);
        if (!prometheusBasedResourceLimitChecksConfig.isHostConfigured()) {
            return new NoopResourceLimitChecks();
        }
        WebClientOptions webClientOptions = new WebClientOptions();
        webClientOptions.setConnectTimeout(prometheusBasedResourceLimitChecksConfig.getConnectTimeout());
        webClientOptions.setDefaultHost(prometheusBasedResourceLimitChecksConfig.getHost());
        webClientOptions.setDefaultPort(prometheusBasedResourceLimitChecksConfig.getPort());
        webClientOptions.setTrustOptions(prometheusBasedResourceLimitChecksConfig.getTrustOptions());
        webClientOptions.setKeyCertOptions(prometheusBasedResourceLimitChecksConfig.getKeyCertOptions());
        webClientOptions.setSsl(prometheusBasedResourceLimitChecksConfig.isTlsEnabled());
        WebClient create = WebClient.create(this.vertx, webClientOptions);
        Duration ofSeconds = Duration.ofSeconds(prometheusBasedResourceLimitChecksConfig.getCacheTimeout());
        Caffeine refreshAfterWrite = Caffeine.newBuilder().executor(Executors.newSingleThreadExecutor(runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            return thread;
        })).initialCapacity(prometheusBasedResourceLimitChecksConfig.getCacheMinSize()).maximumSize(prometheusBasedResourceLimitChecksConfig.getCacheMaxSize()).expireAfterWrite(ofSeconds).refreshAfterWrite(ofSeconds.dividedBy(2L));
        return new PrometheusBasedResourceLimitChecks(refreshAfterWrite.buildAsync(new ConnectedDevicesAsyncCacheLoader(create, prometheusBasedResourceLimitChecksConfig, this.tracer)), refreshAfterWrite.buildAsync(new ConnectionDurationAsyncCacheLoader(create, prometheusBasedResourceLimitChecksConfig, this.tracer)), refreshAfterWrite.buildAsync(new DataVolumeAsyncCacheLoader(create, prometheusBasedResourceLimitChecksConfig, this.tracer)), tenantClient, this.tracer);
    }
}
