package org.eclipse.hono.client.notification.kafka;

import io.opentracing.noop.NoopTracerFactory;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.Objects;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.notification.AbstractNotification;
import org.eclipse.hono.notification.NotificationSender;

/* loaded from: input_file:org/eclipse/hono/client/notification/kafka/KafkaBasedNotificationSender.class */
public class KafkaBasedNotificationSender extends AbstractKafkaBasedMessageSender<JsonObject> implements NotificationSender {
    static final String PRODUCER_NAME = "notification";

    public KafkaBasedNotificationSender(KafkaProducerFactory<String, JsonObject> kafkaProducerFactory, NotificationKafkaProducerConfigProperties notificationKafkaProducerConfigProperties) {
        super(kafkaProducerFactory, PRODUCER_NAME, notificationKafkaProducerConfigProperties, NoopTracerFactory.create());
    }

    public Future<Void> publish(AbstractNotification abstractNotification) {
        Objects.requireNonNull(abstractNotification);
        return !this.lifecycleStatus.isStarted() ? Future.failedFuture(new ServerErrorException(503, "sender not started")) : createProducerRecord(abstractNotification).compose(kafkaProducerRecord -> {
            return getOrCreateProducer().send(kafkaProducerRecord).recover(th -> {
                this.log.debug("error publishing notification [{}]", abstractNotification, th);
                return Future.failedFuture(new ServerErrorException(503, th));
            });
        }).mapEmpty();
    }

    private Future<KafkaProducerRecord<String, JsonObject>> createProducerRecord(AbstractNotification abstractNotification) {
        try {
            return Future.succeededFuture(KafkaProducerRecord.create(NotificationTopicHelper.getTopicName(abstractNotification.getType()), abstractNotification.getKey(), JsonObject.mapFrom(abstractNotification)));
        } catch (RuntimeException e) {
            this.log.error("error creating producer record for notification [{}]", abstractNotification, e);
            return Future.failedFuture(new ServerErrorException(500, e));
        }
    }
}
