package org.eclipse.hono.commandrouter.impl.kafka;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.kafka.admin.KafkaAdminClient;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaAdminClientConfigProperties;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.commandrouter.AdapterInstanceStatusService;
import org.eclipse.hono.util.Futures;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/commandrouter/impl/kafka/InternalKafkaTopicCleanupService.class */
public class InternalKafkaTopicCleanupService implements Lifecycle {
    private static final String CLIENT_NAME = "internal-topic-cleanup";
    private final Vertx vertx;
    private final AdapterInstanceStatusService adapterInstanceStatusService;
    private final Supplier<Future<KafkaAdminClient>> kafkaAdminClientCreator;
    private final Set<String> topicsToDelete = new HashSet();
    private final AtomicReference<Promise<Void>> startResultPromiseRef = new AtomicReference<>();
    private final AtomicBoolean stopCalled = new AtomicBoolean();
    private KafkaAdminClient adminClient;
    private long timerId;
    private static final Logger LOG = LoggerFactory.getLogger(InternalKafkaTopicCleanupService.class);
    private static final long CHECK_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(10);
    private static final Pattern INTERNAL_COMMAND_TOPIC_PATTERN = Pattern.compile(Pattern.quote(HonoTopic.Type.COMMAND_INTERNAL.prefix) + "(.+)");

    public InternalKafkaTopicCleanupService(Vertx vertx, AdapterInstanceStatusService adapterInstanceStatusService, KafkaAdminClientConfigProperties kafkaAdminClientConfigProperties) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.adapterInstanceStatusService = (AdapterInstanceStatusService) Objects.requireNonNull(adapterInstanceStatusService);
        Objects.requireNonNull(kafkaAdminClientConfigProperties);
        Map adminClientConfig = kafkaAdminClientConfigProperties.getAdminClientConfig(CLIENT_NAME);
        KafkaClientFactory kafkaClientFactory = new KafkaClientFactory(vertx);
        this.kafkaAdminClientCreator = () -> {
            return kafkaClientFactory.createKafkaAdminClientWithRetries(adminClientConfig, KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
        };
    }

    InternalKafkaTopicCleanupService(Vertx vertx, AdapterInstanceStatusService adapterInstanceStatusService, KafkaAdminClient kafkaAdminClient) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.adapterInstanceStatusService = (AdapterInstanceStatusService) Objects.requireNonNull(adapterInstanceStatusService);
        Objects.requireNonNull(kafkaAdminClient);
        this.kafkaAdminClientCreator = () -> {
            return Future.succeededFuture(kafkaAdminClient);
        };
    }

    public Future<Void> start() {
        Promise<Void> promise = Promise.promise();
        if (this.startResultPromiseRef.compareAndSet(null, promise)) {
            this.kafkaAdminClientCreator.get().onSuccess(kafkaAdminClient -> {
                this.adminClient = kafkaAdminClient;
                this.timerId = this.vertx.setPeriodic(CHECK_INTERVAL_MILLIS, l -> {
                    performCleanup();
                });
                LOG.info("started InternalKafkaTopicCleanupService");
            }).map((Void) null).onComplete(promise);
            return promise.future();
        }
        this.startResultPromiseRef.get().future().onComplete(Futures.onCurrentContextCompletionHandler(promise));
        LOG.trace("start already called");
        return promise.future();
    }

    protected final void performCleanup() {
        if (this.topicsToDelete.isEmpty()) {
            determineToBeDeletedTopics();
        } else {
            this.adminClient.listTopics().onFailure(th -> {
                LOG.warn("error listing topics", th);
            }).onSuccess(set -> {
                Stream<String> stream = this.topicsToDelete.stream();
                Objects.requireNonNull(set);
                List list = (List) stream.filter((v1) -> {
                    return r1.contains(v1);
                }).collect(Collectors.toList());
                if (!list.isEmpty()) {
                    this.adminClient.deleteTopics(list).onSuccess(r6 -> {
                        LOG.info("triggered deletion of {} topics ({})", Integer.valueOf(list.size()), list);
                    }).onFailure(th2 -> {
                        if (th2 instanceof UnknownTopicOrPartitionException) {
                            LOG.info("triggered deletion of {} topics, some had already been deleted ({})", Integer.valueOf(list.size()), list);
                        } else {
                            LOG.warn("error deleting topics {}", list, th2);
                        }
                    }).onComplete(asyncResult -> {
                        this.topicsToDelete.clear();
                        determineToBeDeletedTopics();
                    });
                } else {
                    this.topicsToDelete.clear();
                    determineToBeDeletedTopics(set);
                }
            });
        }
    }

    private void determineToBeDeletedTopics() {
        this.adminClient.listTopics().onSuccess(this::determineToBeDeletedTopics).onFailure(th -> {
            LOG.warn("error listing topics", th);
        });
    }

    private void determineToBeDeletedTopics(Set<String> set) {
        HashMap hashMap = new HashMap();
        for (String str : set) {
            Matcher matcher = INTERNAL_COMMAND_TOPIC_PATTERN.matcher(str);
            if (matcher.matches()) {
                hashMap.put(matcher.group(1), str);
            }
        }
        this.adapterInstanceStatusService.getDeadAdapterInstances(hashMap.keySet()).onFailure(th -> {
            LOG.warn("error determining dead adapter instances", th);
        }).onSuccess(set2 -> {
            set2.forEach(str2 -> {
                this.topicsToDelete.add((String) hashMap.get(str2));
            });
            if (this.topicsToDelete.isEmpty()) {
                LOG.debug("found no topics to be deleted; no. of checked topics: {}", Integer.valueOf(hashMap.size()));
            } else {
                LOG.info("marking topics as to be deleted on next run {}", this.topicsToDelete);
            }
        });
    }

    public Future<Void> stop() {
        if (!this.stopCalled.compareAndSet(false, true) || this.adminClient == null) {
            return Future.succeededFuture();
        }
        this.vertx.cancelTimer(this.timerId);
        Promise promise = Promise.promise();
        this.adminClient.close(promise);
        return promise.future().recover(th -> {
            LOG.warn("error closing admin client", th);
            return Future.succeededFuture();
        });
    }
}
