package org.eclipse.hono.client.notification.kafka;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer;
import org.eclipse.hono.notification.AbstractNotification;
import org.eclipse.hono.notification.NotificationReceiver;
import org.eclipse.hono.notification.NotificationType;

/* loaded from: input_file:org/eclipse/hono/client/notification/kafka/KafkaBasedNotificationReceiver.class */
public class KafkaBasedNotificationReceiver implements NotificationReceiver {
    private static final String NAME = "notification";
    private final Vertx vertx;
    private final NotificationKafkaConsumerConfigProperties consumerConfig;
    private Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier;
    private HonoKafkaConsumer honoKafkaConsumer;
    private final Map<Class<? extends AbstractNotification>, Handler<? extends AbstractNotification>> handlerPerType = new HashMap();
    private final Set<String> topics = new HashSet();
    private boolean started = false;

    public KafkaBasedNotificationReceiver(Vertx vertx, NotificationKafkaConsumerConfigProperties notificationKafkaConsumerConfigProperties) {
        Objects.requireNonNull(vertx);
        Objects.requireNonNull(notificationKafkaConsumerConfigProperties);
        if (!notificationKafkaConsumerConfigProperties.isConfigured()) {
            throw new IllegalArgumentException("No Kafka configuration found!");
        }
        this.vertx = vertx;
        this.consumerConfig = notificationKafkaConsumerConfigProperties;
    }

    void setKafkaConsumerFactory(Supplier<Consumer<String, Buffer>> supplier) {
        this.kafkaConsumerSupplier = (Supplier) Objects.requireNonNull(supplier);
    }

    public Future<Void> start() {
        this.honoKafkaConsumer = new HonoKafkaConsumer(this.vertx, this.topics, getRecordHandler(), this.consumerConfig.getConsumerConfig("notification"));
        this.honoKafkaConsumer.setPollTimeout(Duration.ofMillis(this.consumerConfig.getPollTimeout()));
        this.honoKafkaConsumer.setConsumerCreationRetriesTimeout(KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
        Optional ofNullable = Optional.ofNullable(this.kafkaConsumerSupplier);
        HonoKafkaConsumer honoKafkaConsumer = this.honoKafkaConsumer;
        Objects.requireNonNull(honoKafkaConsumer);
        ofNullable.ifPresent(honoKafkaConsumer::setKafkaConsumerSupplier);
        return this.honoKafkaConsumer.start().onSuccess(r4 -> {
            this.started = true;
        });
    }

    public <T extends AbstractNotification> void registerConsumer(NotificationType<T> notificationType, Handler<T> handler) {
        if (this.started) {
            throw new IllegalStateException("consumers cannot be added when receiver is already started.");
        }
        this.topics.add(NotificationTopicHelper.getTopicName(notificationType));
        this.handlerPerType.put(notificationType.getClazz(), handler);
    }

    private Handler<KafkaConsumerRecord<String, Buffer>> getRecordHandler() {
        return kafkaConsumerRecord -> {
            AbstractNotification abstractNotification = (AbstractNotification) Json.decodeValue((Buffer) kafkaConsumerRecord.value(), AbstractNotification.class);
            Handler<? extends AbstractNotification> handler = this.handlerPerType.get(abstractNotification.getClass());
            if (handler != null) {
                handler.handle(abstractNotification);
            }
        };
    }

    public Future<Void> stop() {
        return stopKafkaConsumer().onComplete(asyncResult -> {
            this.topics.clear();
        }).onComplete(asyncResult2 -> {
            this.handlerPerType.clear();
        }).onComplete(asyncResult3 -> {
            this.started = false;
        }).mapEmpty();
    }

    private Future<Void> stopKafkaConsumer() {
        return this.honoKafkaConsumer != null ? this.honoKafkaConsumer.stop() : Future.succeededFuture();
    }
}
