package org.eclipse.hono.commandrouter.impl.kafka;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.impl.Helper;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommandResponseSender;
import org.eclipse.hono.client.command.kafka.KafkaBasedInternalCommandSender;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.consumer.AsyncHandlingAutoCommitKafkaConsumer;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.MessagingType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/commandrouter/impl/kafka/KafkaBasedCommandConsumerFactoryImpl.class */
public class KafkaBasedCommandConsumerFactoryImpl implements CommandConsumerFactory {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBasedCommandConsumerFactoryImpl.class);
    private static final Pattern COMMANDS_TOPIC_PATTERN = Pattern.compile(Pattern.quote(HonoTopic.Type.COMMAND.prefix) + ".*");
    private static final String DEFAULT_GROUP_ID = "cmd-router-group";
    private final Vertx vertx;
    private final TenantClient tenantClient;
    private final CommandTargetMapper commandTargetMapper;
    private final KafkaConsumerConfigProperties kafkaConsumerConfig;
    private final Tracer tracer;
    private final KafkaBasedInternalCommandSender internalCommandSender;
    private final KafkaBasedCommandResponseSender kafkaBasedCommandResponseSender;
    private String groupId = DEFAULT_GROUP_ID;
    private KafkaBasedMappingAndDelegatingCommandHandler commandHandler;
    private AsyncHandlingAutoCommitKafkaConsumer kafkaConsumer;

    public KafkaBasedCommandConsumerFactoryImpl(Vertx vertx, TenantClient tenantClient, CommandTargetMapper commandTargetMapper, KafkaProducerFactory<String, Buffer> kafkaProducerFactory, KafkaProducerConfigProperties kafkaProducerConfigProperties, KafkaConsumerConfigProperties kafkaConsumerConfigProperties, Tracer tracer) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.tenantClient = (TenantClient) Objects.requireNonNull(tenantClient);
        this.commandTargetMapper = (CommandTargetMapper) Objects.requireNonNull(commandTargetMapper);
        Objects.requireNonNull(kafkaProducerFactory);
        Objects.requireNonNull(kafkaProducerConfigProperties);
        this.kafkaConsumerConfig = (KafkaConsumerConfigProperties) Objects.requireNonNull(kafkaConsumerConfigProperties);
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
        this.internalCommandSender = new KafkaBasedInternalCommandSender(kafkaProducerFactory, kafkaProducerConfigProperties, tracer);
        this.kafkaBasedCommandResponseSender = new KafkaBasedCommandResponseSender(kafkaProducerFactory, kafkaProducerConfigProperties, tracer);
    }

    public void setGroupId(String str) {
        Objects.requireNonNull(str);
        if (this.kafkaConsumer != null) {
            throw new IllegalStateException("must be invoked before start()");
        }
        this.groupId = str;
    }

    public MessagingType getMessagingType() {
        return MessagingType.kafka;
    }

    public Future<Void> start() {
        Context currentContext = Vertx.currentContext();
        if (currentContext == null) {
            return Future.failedFuture(new IllegalStateException("factory must be started in a Vert.x context"));
        }
        KafkaCommandProcessingQueue kafkaCommandProcessingQueue = new KafkaCommandProcessingQueue(currentContext);
        this.commandHandler = new KafkaBasedMappingAndDelegatingCommandHandler(this.tenantClient, kafkaCommandProcessingQueue, this.commandTargetMapper, this.internalCommandSender, this.kafkaBasedCommandResponseSender, this.tracer);
        Map consumerConfig = this.kafkaConsumerConfig.getConsumerConfig("consumer");
        consumerConfig.put("group.id", this.groupId);
        Vertx vertx = this.vertx;
        Pattern pattern = COMMANDS_TOPIC_PATTERN;
        KafkaBasedMappingAndDelegatingCommandHandler kafkaBasedMappingAndDelegatingCommandHandler = this.commandHandler;
        Objects.requireNonNull(kafkaBasedMappingAndDelegatingCommandHandler);
        this.kafkaConsumer = new AsyncHandlingAutoCommitKafkaConsumer(vertx, pattern, kafkaBasedMappingAndDelegatingCommandHandler::mapAndDelegateIncomingCommandMessage, consumerConfig);
        this.kafkaConsumer.setOnRebalanceDoneHandler(set -> {
            kafkaCommandProcessingQueue.setCurrentlyHandledPartitions(Helper.to(set));
        });
        return CompositeFuture.all(this.commandHandler.start(), this.kafkaConsumer.start()).mapEmpty();
    }

    public Future<Void> stop() {
        return CompositeFuture.join(this.kafkaConsumer.stop(), this.commandHandler.stop(), this.internalCommandSender.stop(), this.kafkaBasedCommandResponseSender.stop()).mapEmpty();
    }

    @Override // org.eclipse.hono.commandrouter.CommandConsumerFactory
    public Future<Void> createCommandConsumer(String str, SpanContext spanContext) {
        String honoTopic = new HonoTopic(HonoTopic.Type.COMMAND, str).toString();
        if (this.kafkaConsumer.isAmongKnownSubscribedTopics(honoTopic)) {
            LOG.debug("createCommandConsumer: topic is already subscribed [{}]", honoTopic);
            return Future.succeededFuture();
        }
        LOG.debug("createCommandConsumer: topic not subscribed; check for its existence, triggering auto-creation if enabled [{}]", honoTopic);
        Span start = TracingHelper.buildServerChildSpan(this.tracer, spanContext, "wait for topic subscription update", CommandConsumerFactory.class.getSimpleName()).start();
        TracingHelper.TAG_TENANT_ID.set(start, str);
        Tags.MESSAGE_BUS_DESTINATION.set(start, honoTopic);
        return this.kafkaConsumer.ensureTopicIsAmongSubscribedTopicPatternTopics(honoTopic).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                TracingHelper.logError(start, asyncResult.cause());
            }
            start.finish();
        });
    }
}
