package org.eclipse.hono.commandrouter.impl.kafka;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.PartitionInfo;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.hono.adapter.client.command.kafka.KafkaBasedInternalCommandSender;
import org.eclipse.hono.adapter.client.registry.TenantClient;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.tracing.TracingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/commandrouter/impl/kafka/KafkaBasedCommandConsumerFactoryImpl.class */
public class KafkaBasedCommandConsumerFactoryImpl implements CommandConsumerFactory {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBasedCommandConsumerFactoryImpl.class);
    private static final Pattern COMMANDS_TOPIC_PATTERN = Pattern.compile(Pattern.quote(HonoTopic.Type.COMMAND.prefix) + ".*");
    private static final long WAIT_FOR_REBALANCE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
    private final Supplier<KafkaConsumer<String, Buffer>> consumerCreator;
    private final KafkaBasedMappingAndDelegatingCommandHandler commandHandler;
    private final Tracer tracer;
    private final Vertx vertx;
    private KafkaConsumer<String, Buffer> kafkaConsumer;
    private final AtomicReference<Promise<Void>> onSubscribedTopicsNextUpdated = new AtomicReference<>();
    private Set<String> subscribedTopics = new HashSet();

    public KafkaBasedCommandConsumerFactoryImpl(Vertx vertx, TenantClient tenantClient, CommandTargetMapper commandTargetMapper, KafkaProducerFactory<String, Buffer> kafkaProducerFactory, KafkaProducerConfigProperties kafkaProducerConfigProperties, KafkaConsumerConfigProperties kafkaConsumerConfigProperties, Tracer tracer) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        Objects.requireNonNull(tenantClient);
        Objects.requireNonNull(commandTargetMapper);
        Objects.requireNonNull(kafkaProducerFactory);
        Objects.requireNonNull(kafkaProducerConfigProperties);
        Objects.requireNonNull(kafkaConsumerConfigProperties);
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
        this.commandHandler = new KafkaBasedMappingAndDelegatingCommandHandler(tenantClient, commandTargetMapper, new KafkaBasedInternalCommandSender(kafkaProducerFactory, kafkaProducerConfigProperties, tracer), tracer);
        Map consumerConfig = kafkaConsumerConfigProperties.getConsumerConfig("cmd-router");
        consumerConfig.put("group.id", "cmd-router-group");
        this.consumerCreator = () -> {
            return KafkaConsumer.create(vertx, consumerConfig, String.class, Buffer.class);
        };
    }

    public Future<Void> start() {
        this.kafkaConsumer = this.consumerCreator.get();
        KafkaConsumer<String, Buffer> kafkaConsumer = this.kafkaConsumer;
        KafkaBasedMappingAndDelegatingCommandHandler kafkaBasedMappingAndDelegatingCommandHandler = this.commandHandler;
        Objects.requireNonNull(kafkaBasedMappingAndDelegatingCommandHandler);
        kafkaConsumer.handler(kafkaBasedMappingAndDelegatingCommandHandler::mapAndDelegateIncomingCommandMessage).partitionsAssignedHandler(this::onPartitionsAssigned).partitionsRevokedHandler(this::onPartitionsRevoked).exceptionHandler(th -> {
            LOG.error("consumer error occurred", th);
        });
        return CompositeFuture.all(this.commandHandler.start(), subscribeAndWaitForRebalanceAndTopicsUpdate()).map(compositeFuture -> {
            LOG.debug("subscribed to topic pattern [{}], matching {} topics", COMMANDS_TOPIC_PATTERN, Integer.valueOf(this.subscribedTopics.size()));
            return null;
        });
    }

    private Future<Void> subscribeAndWaitForRebalanceAndTopicsUpdate() {
        Promise<Void> updateAndGet = this.onSubscribedTopicsNextUpdated.updateAndGet(promise -> {
            return promise == null ? Promise.promise() : promise;
        });
        Promise promise2 = Promise.promise();
        this.kafkaConsumer.subscribe(COMMANDS_TOPIC_PATTERN, promise2);
        this.vertx.setTimer(WAIT_FOR_REBALANCE_TIMEOUT, l -> {
            if (updateAndGet.future().isComplete()) {
                return;
            }
            LOG.warn("timed out waiting for rebalance and update of subscribed topics");
            updateAndGet.tryFail(new ServerErrorException(503, "timed out waiting for rebalance and update of subscribed topics"));
        });
        return CompositeFuture.all(promise2.future(), updateAndGet.future()).mapEmpty();
    }

    private void onPartitionsAssigned(Set<TopicPartition> set) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("partitions assigned: [{}]", getPartitionsDebugString(set));
        }
        this.kafkaConsumer.subscription(asyncResult -> {
            if (asyncResult.succeeded()) {
                this.subscribedTopics = new HashSet((Collection) asyncResult.result());
            } else {
                LOG.warn("failed to get subscription", asyncResult.cause());
            }
            Optional.ofNullable(this.onSubscribedTopicsNextUpdated.getAndSet(null)).ifPresent(promise -> {
                if (asyncResult.succeeded()) {
                    promise.tryComplete();
                } else {
                    promise.tryFail(asyncResult.cause());
                }
            });
        });
    }

    private void onPartitionsRevoked(Set<TopicPartition> set) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("partitions revoked: [{}]", getPartitionsDebugString(set));
        }
    }

    private String getPartitionsDebugString(Set<TopicPartition> set) {
        return set.size() <= 20 ? ((Map) set.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getTopic();
        }, Collectors.mapping((v0) -> {
            return v0.getPartition();
        }, Collectors.toCollection(TreeSet::new))))).toString() : set.size() + " topic partitions";
    }

    public Future<Void> stop() {
        if (this.kafkaConsumer == null) {
            return Future.failedFuture("not started");
        }
        Promise promise = Promise.promise();
        this.kafkaConsumer.close(promise);
        return CompositeFuture.all(this.commandHandler.stop(), promise.future()).mapEmpty();
    }

    @Override // org.eclipse.hono.commandrouter.CommandConsumerFactory
    public Future<Void> createCommandConsumer(String str, SpanContext spanContext) {
        if (this.kafkaConsumer == null) {
            return Future.failedFuture(new ServerErrorException(500, "not started"));
        }
        String honoTopic = new HonoTopic(HonoTopic.Type.COMMAND, str).toString();
        if (this.subscribedTopics.contains(honoTopic)) {
            LOG.debug("createCommandConsumer: topic is already subscribed [{}]", honoTopic);
            return Future.succeededFuture();
        }
        LOG.debug("createCommandConsumer: topic not subscribed; check for its existence, triggering auto-creation if enabled [{}]", honoTopic);
        Span start = TracingHelper.buildServerChildSpan(this.tracer, spanContext, "wait for topic subscription update", CommandConsumerFactory.class.getSimpleName()).start();
        TracingHelper.TAG_TENANT_ID.set(start, str);
        Tags.MESSAGE_BUS_DESTINATION.set(start, honoTopic);
        Promise promise = Promise.promise();
        partitionsFor(honoTopic, promise);
        return promise.future().recover(th -> {
            LOG.warn("createCommandConsumer: error getting partitions for topic [{}]", honoTopic, th);
            return Future.failedFuture(new ServerErrorException(503, "error getting topic partitions", th));
        }).compose(list -> {
            if (list.isEmpty()) {
                LOG.warn("createCommandConsumer: topic doesn't exist and didn't get auto-created: {}", honoTopic);
                return Future.failedFuture(new ServerErrorException(503, "topic doesn't exist and didn't get auto-created"));
            }
            if (this.subscribedTopics.contains(honoTopic)) {
                return Future.succeededFuture();
            }
            LOG.debug("createCommandConsumer: verified topic existence, wait for subscription update and rebalance [{}]", honoTopic);
            start.log("verified topic existence, wait for subscription update and rebalance");
            return subscribeAndWaitForRebalanceAndTopicsUpdate().compose(r7 -> {
                if (this.subscribedTopics.contains(honoTopic)) {
                    LOG.debug("createCommandConsumer: done updating topic subscription");
                    return Future.succeededFuture(r7);
                }
                LOG.warn("createCommandConsumer: subscription not updated with topic after rebalance [topic: {}]", honoTopic);
                return Future.failedFuture(new ServerErrorException(503, "subscription not updated with topic after rebalance"));
            });
        }).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                TracingHelper.logError(start, asyncResult.cause());
            }
            start.finish();
        });
    }

    private KafkaConsumer<String, Buffer> partitionsFor(String str, Handler<AsyncResult<List<PartitionInfo>>> handler) {
        this.kafkaConsumer.asStream().partitionsFor(str, asyncResult -> {
            if (!asyncResult.succeeded()) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
                return;
            }
            if (asyncResult.result() == null) {
                handler.handle(Future.succeededFuture(List.of()));
                return;
            }
            ArrayList arrayList = new ArrayList();
            for (org.apache.kafka.common.PartitionInfo partitionInfo : (List) asyncResult.result()) {
                PartitionInfo partitionInfo2 = new PartitionInfo();
                partitionInfo2.setInSyncReplicas((List) Stream.of((Object[]) partitionInfo.inSyncReplicas()).map(Helper::from).collect(Collectors.toList())).setLeader(Helper.from(partitionInfo.leader())).setPartition(partitionInfo.partition()).setReplicas((List) Stream.of((Object[]) partitionInfo.replicas()).map(Helper::from).collect(Collectors.toList())).setTopic(partitionInfo.topic());
                arrayList.add(partitionInfo2);
            }
            handler.handle(Future.succeededFuture(arrayList));
        });
        return this.kafkaConsumer;
    }
}
