package org.eclipse.hono.client.notification.kafka;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer;
import org.eclipse.hono.notification.AbstractNotification;
import org.eclipse.hono.notification.NotificationReceiver;
import org.eclipse.hono.notification.NotificationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/notification/kafka/KafkaBasedNotificationReceiver.class */
public class KafkaBasedNotificationReceiver extends HonoKafkaConsumer<JsonObject> implements NotificationReceiver {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBasedNotificationReceiver.class);
    private static final String NAME = "notification";
    private final Map<Class<? extends AbstractNotification>, Handler<? extends AbstractNotification>> handlerPerType;

    public KafkaBasedNotificationReceiver(Vertx vertx, NotificationKafkaConsumerConfigProperties notificationKafkaConsumerConfigProperties) {
        super(vertx, Set.of(), (Pattern) null, notificationKafkaConsumerConfigProperties.getConsumerConfig(NAME));
        this.handlerPerType = new HashMap();
        if (!notificationKafkaConsumerConfigProperties.isConfigured()) {
            throw new IllegalArgumentException("No Kafka configuration found!");
        }
        setPollTimeout(Duration.ofMillis(notificationKafkaConsumerConfigProperties.getPollTimeout()));
        setConsumerCreationRetriesTimeout(KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
        setRecordHandler(this::handleRecord);
    }

    public <T extends AbstractNotification> void registerConsumer(NotificationType<T> notificationType, Handler<T> handler) {
        if (!this.lifecycleStatus.isStopped()) {
            throw new IllegalStateException("consumers cannot be added when receiver is already started");
        }
        addTopic(NotificationTopicHelper.getTopicName(notificationType));
        this.handlerPerType.put(notificationType.getClazz(), handler);
    }

    private void handleRecord(KafkaConsumerRecord<String, JsonObject> kafkaConsumerRecord) {
        JsonObject jsonObject = (JsonObject) kafkaConsumerRecord.value();
        if (LOG.isTraceEnabled()) {
            LOG.trace("received notification:{}{}", System.lineSeparator(), jsonObject.encodePrettily());
        }
        AbstractNotification abstractNotification = (AbstractNotification) jsonObject.mapTo(AbstractNotification.class);
        Handler<? extends AbstractNotification> handler = this.handlerPerType.get(abstractNotification.getClass());
        if (handler != null) {
            handler.handle(abstractNotification);
        }
    }

    public Future<Void> stop() {
        return super.stop().onComplete(asyncResult -> {
            this.handlerPerType.clear();
        }).mapEmpty();
    }
}
