package org.eclipse.hono.client.notification.pubsub;

import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.PubsubMessage;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory;
import org.eclipse.hono.notification.AbstractNotification;
import org.eclipse.hono.notification.NotificationReceiver;
import org.eclipse.hono.notification.NotificationType;
import org.eclipse.hono.util.LifecycleStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/notification/pubsub/PubSubBasedNotificationReceiver.class */
public class PubSubBasedNotificationReceiver implements NotificationReceiver {
    private static final String TOPIC_ENDPOINT = "notification";
    private static final Logger log = LoggerFactory.getLogger(PubSubBasedNotificationReceiver.class);
    private final Map<Class<? extends AbstractNotification>, Handler<? extends AbstractNotification>> handlerPerType;
    private final PubSubSubscriberFactory factory;
    private final LifecycleStatus lifecycleStatus;
    private final Set<String> notificationTypes;
    private final MessageReceiver receiver;

    public PubSubBasedNotificationReceiver(PubSubSubscriberFactory pubSubSubscriberFactory) {
        this.handlerPerType = new HashMap();
        this.lifecycleStatus = new LifecycleStatus();
        this.notificationTypes = new HashSet();
        this.factory = (PubSubSubscriberFactory) Objects.requireNonNull(pubSubSubscriberFactory);
        this.receiver = (pubsubMessage, ackReplyConsumer) -> {
            handleMessage(pubsubMessage);
            ackReplyConsumer.ack();
        };
    }

    protected PubSubBasedNotificationReceiver(PubSubSubscriberFactory pubSubSubscriberFactory, MessageReceiver messageReceiver) {
        this.handlerPerType = new HashMap();
        this.lifecycleStatus = new LifecycleStatus();
        this.notificationTypes = new HashSet();
        this.factory = (PubSubSubscriberFactory) Objects.requireNonNull(pubSubSubscriberFactory);
        this.receiver = (MessageReceiver) Objects.requireNonNull(messageReceiver);
    }

    public <T extends AbstractNotification> void registerConsumer(NotificationType<T> notificationType, Handler<T> handler) {
        if (this.notificationTypes.contains(notificationType.getAddress())) {
            log.debug("Notification receiver {} is already registered", notificationType.getAddress());
            return;
        }
        this.notificationTypes.add(notificationType.getAddress());
        this.handlerPerType.put(notificationType.getClazz(), handler);
        this.factory.getOrCreateSubscriber(PubSubMessageHelper.getTopicName(TOPIC_ENDPOINT, notificationType.getAddress()), this.receiver).subscribe(true).onFailure(th -> {
            log.error("Error subscribing for notification {}", notificationType.getAddress(), th);
            throw new IllegalStateException("Error, can not subscribe for notification", th);
        });
    }

    public Future<Void> start() {
        if (!this.lifecycleStatus.isStarting()) {
            return !this.lifecycleStatus.setStarting() ? Future.failedFuture(new IllegalStateException("Consumer is already started/stopping")) : Future.succeededFuture();
        }
        log.debug("Pub/Sub based notification receiver already started");
        return Future.succeededFuture();
    }

    public Future<Void> stop() {
        return this.factory.closeAllSubscribers();
    }

    private void handleMessage(PubsubMessage pubsubMessage) {
        try {
            JsonObject jsonObject = Buffer.buffer(PubSubMessageHelper.getPayload(pubsubMessage)).toJsonObject();
            if (log.isTraceEnabled()) {
                log.trace("received notification: {}{}", System.lineSeparator(), jsonObject.encodePrettily());
            }
            AbstractNotification abstractNotification = (AbstractNotification) jsonObject.mapTo(AbstractNotification.class);
            Handler<? extends AbstractNotification> handler = this.handlerPerType.get(abstractNotification.getClass());
            if (handler != null) {
                handler.handle(abstractNotification);
            }
        } catch (RuntimeException e) {
            log.debug("Could not handle Pub/Sub message notification, buffer is empty");
        }
    }
}
