package org.eclipse.hono.client.command.kafka;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandHandlerWrapper;
import org.eclipse.hono.client.command.CommandHandlers;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.command.InternalCommandConsumer;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaAdminClientConfigProperties;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.consumer.MessagingKafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.client.kafka.tracing.KafkaTracingHelper;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.client.registry.TenantDisabledOrNotRegisteredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/command/kafka/KafkaBasedInternalCommandConsumer.class */
public class KafkaBasedInternalCommandConsumer implements InternalCommandConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBasedInternalCommandConsumer.class);
    private static final int NUM_PARTITIONS = 1;
    private static final long CREATE_TOPIC_RETRY_INTERVAL = 1000;
    private final Vertx vertx;
    private final Supplier<Future<KafkaConsumer<String, Buffer>>> consumerCreator;
    private final Supplier<Future<Admin>> kafkaAdminClientCreator;
    private final String adapterInstanceId;
    private final String clientId;
    private final CommandHandlers commandHandlers;
    private final Tracer tracer;
    private final CommandResponseSender commandResponseSender;
    private final TenantClient tenantClient;
    private final AtomicBoolean isTopicCreated = new AtomicBoolean(false);
    private final AtomicBoolean retryCreateTopic = new AtomicBoolean(true);
    private final Map<String, Map<Integer, Long>> lastHandledPartitionOffsetsPerTenant = new HashMap();
    private KafkaConsumer<String, Buffer> consumer;
    private Admin adminClient;
    private Context context;
    private KafkaClientMetricsSupport metricsSupport;
    private long retryCreateTopicTimerId;

    public KafkaBasedInternalCommandConsumer(Vertx vertx, KafkaAdminClientConfigProperties kafkaAdminClientConfigProperties, MessagingKafkaConsumerConfigProperties messagingKafkaConsumerConfigProperties, TenantClient tenantClient, CommandResponseSender commandResponseSender, String str, CommandHandlers commandHandlers, Tracer tracer) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        Objects.requireNonNull(kafkaAdminClientConfigProperties);
        Objects.requireNonNull(messagingKafkaConsumerConfigProperties);
        this.tenantClient = (TenantClient) Objects.requireNonNull(tenantClient);
        this.commandResponseSender = (CommandResponseSender) Objects.requireNonNull(commandResponseSender);
        this.adapterInstanceId = (String) Objects.requireNonNull(str);
        this.commandHandlers = (CommandHandlers) Objects.requireNonNull(commandHandlers);
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
        Map adminClientConfig = kafkaAdminClientConfigProperties.getAdminClientConfig("internal-cmd-admin");
        String str2 = (String) adminClientConfig.get("bootstrap.servers");
        KafkaClientFactory kafkaClientFactory = new KafkaClientFactory(vertx);
        this.kafkaAdminClientCreator = () -> {
            return kafkaClientFactory.createClientWithRetries(() -> {
                return Admin.create(new HashMap(adminClientConfig));
            }, str2, KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
        };
        Map consumerConfig = messagingKafkaConsumerConfigProperties.getConsumerConfig("internal-cmd");
        consumerConfig.put("group.id", str);
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        this.clientId = (String) consumerConfig.get("client.id");
        this.consumerCreator = () -> {
            return kafkaClientFactory.createKafkaConsumerWithRetries(consumerConfig, String.class, Buffer.class, KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
        };
    }

    KafkaBasedInternalCommandConsumer(Context context, Admin admin, KafkaConsumer<String, Buffer> kafkaConsumer, String str, TenantClient tenantClient, CommandResponseSender commandResponseSender, String str2, CommandHandlers commandHandlers, Tracer tracer) {
        this.context = (Context) Objects.requireNonNull(context);
        Objects.requireNonNull(admin);
        this.consumer = (KafkaConsumer) Objects.requireNonNull(kafkaConsumer);
        this.clientId = (String) Objects.requireNonNull(str);
        this.tenantClient = (TenantClient) Objects.requireNonNull(tenantClient);
        this.commandResponseSender = (CommandResponseSender) Objects.requireNonNull(commandResponseSender);
        this.adapterInstanceId = (String) Objects.requireNonNull(str2);
        this.commandHandlers = (CommandHandlers) Objects.requireNonNull(commandHandlers);
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
        this.vertx = context.owner();
        this.consumerCreator = () -> {
            return Future.succeededFuture(kafkaConsumer);
        };
        this.kafkaAdminClientCreator = () -> {
            return Future.succeededFuture(admin);
        };
    }

    public final KafkaBasedInternalCommandConsumer setMetricsSupport(KafkaClientMetricsSupport kafkaClientMetricsSupport) {
        this.metricsSupport = kafkaClientMetricsSupport;
        return this;
    }

    public Future<Void> start() {
        if (this.context == null) {
            this.context = Vertx.currentContext();
            if (this.context == null) {
                return Future.failedFuture(new IllegalStateException("Consumer must be started in a Vert.x context"));
            }
        }
        return this.kafkaAdminClientCreator.get().onFailure(th -> {
            LOG.error("admin client creation failed", th);
        }).compose(admin -> {
            this.adminClient = admin;
            return createTopic();
        }).recover(th2 -> {
            return retryCreateTopic();
        }).compose(r4 -> {
            this.isTopicCreated.set(true);
            return this.consumerCreator.get().onFailure(th3 -> {
                LOG.error("consumer creation failed", th3);
            });
        }).compose(kafkaConsumer -> {
            this.consumer = kafkaConsumer;
            Optional.ofNullable(this.metricsSupport).ifPresent(kafkaClientMetricsSupport -> {
                kafkaClientMetricsSupport.registerKafkaConsumer(this.consumer.unwrap());
            });
            return subscribeToTopic();
        });
    }

    public void registerReadinessChecks(HealthCheckHandler healthCheckHandler) {
        LOG.trace("registering readiness check using kafka based internal command consumer [adapter instance id: {}]", this.adapterInstanceId);
        healthCheckHandler.register(String.format("internal-command-consumer[%s]-readiness", this.adapterInstanceId), promise -> {
            if (this.isTopicCreated.get()) {
                promise.tryComplete(Status.OK());
            } else {
                LOG.debug("readiness check failed [internal command topic is not created]");
                promise.tryComplete(Status.KO());
            }
        });
    }

    public void registerLivenessChecks(HealthCheckHandler healthCheckHandler) {
    }

    private Future<Void> createTopic() {
        Promise promise = Promise.promise();
        String topicName = getTopicName();
        this.adminClient.createTopics(List.of(new NewTopic(topicName, Optional.of(Integer.valueOf(NUM_PARTITIONS)), Optional.empty()))).all().whenComplete((r6, th) -> {
            this.context.runOnContext(r7 -> {
                Optional filter = Optional.ofNullable(th).filter(th -> {
                    return !(th instanceof TopicExistsException);
                });
                Objects.requireNonNull(promise);
                Consumer consumer = promise::fail;
                Objects.requireNonNull(promise);
                filter.ifPresentOrElse(consumer, promise::complete);
            });
        });
        return promise.future().onSuccess(r5 -> {
            LOG.debug("created topic [{}]", topicName);
        }).onFailure(th2 -> {
            LOG.error("error creating topic [{}]", topicName, th2);
        });
    }

    private Future<Void> retryCreateTopic() {
        Promise promise = Promise.promise();
        this.retryCreateTopicTimerId = this.vertx.setPeriodic(CREATE_TOPIC_RETRY_INTERVAL, l -> {
            if (this.retryCreateTopic.compareAndSet(true, false)) {
                createTopic().onSuccess(r7 -> {
                    this.vertx.cancelTimer(l.longValue());
                    promise.complete();
                }).onFailure(th -> {
                    this.retryCreateTopic.set(true);
                });
            }
        });
        return promise.future();
    }

    private Future<Void> subscribeToTopic() {
        this.consumer.handler(this::handleCommandMessage);
        this.consumer.exceptionHandler(th -> {
            LOG.error("consumer error occurred [adapterInstanceId: {}, clientId: {}]", new Object[]{this.adapterInstanceId, this.clientId, th});
        });
        this.consumer.partitionsRevokedHandler(this::onPartitionsRevoked);
        Promise promise = Promise.promise();
        this.consumer.partitionsAssignedHandler(set -> {
            LOG.debug("partitions assigned: {}", set);
            promise.tryComplete();
        });
        String topicName = getTopicName();
        Promise promise2 = Promise.promise();
        this.consumer.subscribe(topicName, promise2);
        return CompositeFuture.all(promise2.future(), promise.future()).map((Void) null).onComplete(asyncResult -> {
            this.consumer.partitionsAssignedHandler(this::onPartitionsAssigned);
        }).onSuccess(r5 -> {
            LOG.debug("subscribed and got partition assignment for topic [{}]", topicName);
        }).onFailure(th2 -> {
            LOG.error("error subscribing to topic [{}]", topicName, th2);
        });
    }

    private void onPartitionsAssigned(Set<TopicPartition> set) {
        LOG.debug("partitions assigned: {}", set);
    }

    private void onPartitionsRevoked(Set<TopicPartition> set) {
        LOG.debug("partitions revoked: {}", set);
    }

    private String getTopicName() {
        return new HonoTopic(HonoTopic.Type.COMMAND_INTERNAL, this.adapterInstanceId).toString();
    }

    public Future<Void> stop() {
        this.retryCreateTopic.set(false);
        this.vertx.cancelTimer(this.retryCreateTopicTimerId);
        return this.consumer == null ? Future.failedFuture("not started") : CompositeFuture.all(closeAdminClient(), closeConsumer()).mapEmpty();
    }

    private Future<Void> closeAdminClient() {
        Promise promise = Promise.promise();
        LOG.debug("stop: close admin client");
        this.context.executeBlocking(promise2 -> {
            this.adminClient.close();
            LOG.debug("admin client closed");
            promise2.complete();
        }, promise);
        return promise.future();
    }

    private Future<Void> closeConsumer() {
        Promise promise = Promise.promise();
        LOG.debug("stop: close consumer");
        this.consumer.close(promise);
        promise.future().onComplete(asyncResult -> {
            LOG.debug("consumer closed");
            Optional.ofNullable(this.metricsSupport).ifPresent(kafkaClientMetricsSupport -> {
                kafkaClientMetricsSupport.unregisterKafkaConsumer(this.consumer.unwrap());
            });
        });
        return promise.future();
    }

    void handleCommandMessage(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord) {
        Integer num = (Integer) KafkaRecordHelper.getOriginalPartitionHeader(kafkaConsumerRecord.headers()).orElse(null);
        Long l = (Long) KafkaRecordHelper.getOriginalOffsetHeader(kafkaConsumerRecord.headers()).orElse(null);
        if (num == null || l == null) {
            LOG.warn("command record is invalid - missing required original partition/offset headers");
            return;
        }
        try {
            KafkaBasedCommand fromRoutedCommandRecord = KafkaBasedCommand.fromRoutedCommandRecord(kafkaConsumerRecord);
            Map<Integer, Long> computeIfAbsent = this.lastHandledPartitionOffsetsPerTenant.computeIfAbsent(fromRoutedCommandRecord.getTenant(), str -> {
                return new HashMap();
            });
            Long l2 = computeIfAbsent.get(num);
            if (l2 != null && l.longValue() <= l2.longValue()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("ignoring command - record partition offset {} <= last handled offset {} [{}]", new Object[]{l, l2, fromRoutedCommandRecord});
                    return;
                }
                return;
            }
            computeIfAbsent.put(num, l);
            CommandHandlerWrapper commandHandler = this.commandHandlers.getCommandHandler(fromRoutedCommandRecord.getTenant(), fromRoutedCommandRecord.getGatewayOrDeviceId());
            if (commandHandler != null && commandHandler.getGatewayId() != null) {
                fromRoutedCommandRecord.setGatewayId(commandHandler.getGatewayId());
            }
            SpanContext extractSpanContext = KafkaTracingHelper.extractSpanContext(this.tracer, kafkaConsumerRecord);
            Span createSpan = CommandContext.createSpan(this.tracer, fromRoutedCommandRecord, extractSpanContext, commandHandler != null ? commandHandler.getConsumerCreationSpanContext() : null);
            createSpan.setTag("adapter_instance_id", this.adapterInstanceId);
            KafkaTracingHelper.TAG_OFFSET.set(createSpan, Long.valueOf(kafkaConsumerRecord.offset()));
            KafkaBasedCommandContext kafkaBasedCommandContext = new KafkaBasedCommandContext(fromRoutedCommandRecord, this.commandResponseSender, createSpan);
            this.tenantClient.get(fromRoutedCommandRecord.getTenant(), extractSpanContext).onFailure(th -> {
                if (ServiceInvocationException.extractStatusCode(th) == 404) {
                    kafkaBasedCommandContext.reject((Throwable) new TenantDisabledOrNotRegisteredException(fromRoutedCommandRecord.getTenant(), 404));
                } else {
                    kafkaBasedCommandContext.release(new ServerErrorException(fromRoutedCommandRecord.getTenant(), 503, "error retrieving tenant configuration", th));
                }
            }).onSuccess(tenantObject -> {
                kafkaBasedCommandContext.put("tenant-config", tenantObject);
                if (commandHandler != null) {
                    LOG.trace("using [{}] for received command [{}]", commandHandler, fromRoutedCommandRecord);
                    commandHandler.handleCommand(kafkaBasedCommandContext);
                } else {
                    LOG.info("no command handler found for command [{}]", fromRoutedCommandRecord);
                    kafkaBasedCommandContext.release(new NoConsumerException("no command handler found for command"));
                }
            });
        } catch (IllegalArgumentException e) {
            LOG.warn("command record is invalid [tenant-id: {}, device-id: {}]", new Object[]{KafkaRecordHelper.getTenantId(kafkaConsumerRecord.headers()).orElse(null), KafkaRecordHelper.getDeviceId(kafkaConsumerRecord.headers()).orElse(null), e});
        }
    }
}
