package org.eclipse.hono.client.notification.amqp;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonMessageHandler;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.hono.client.amqp.AbstractServiceClient;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.client.util.CachingClientFactory;
import org.eclipse.hono.notification.AbstractNotification;
import org.eclipse.hono.notification.NotificationReceiver;
import org.eclipse.hono.notification.NotificationType;

/* loaded from: input_file:org/eclipse/hono/client/notification/amqp/ProtonBasedNotificationReceiver.class */
public class ProtonBasedNotificationReceiver extends AbstractServiceClient implements NotificationReceiver {
    private static final int RECREATE_CONSUMERS_DELAY_MILLIS = 20;
    private final CachingClientFactory<ProtonReceiver> receiverFactory;
    private final AtomicBoolean recreatingConsumers;
    private final AtomicBoolean tryAgainRecreatingConsumers;
    private final Map<Class<? extends AbstractNotification>, Handler<? extends AbstractNotification>> handlerPerType;
    private final Set<String> addresses;
    private final AtomicBoolean startCalled;
    private final AtomicBoolean stopCalled;

    public ProtonBasedNotificationReceiver(HonoConnection honoConnection) {
        super(honoConnection, SendMessageSampler.Factory.noop());
        this.recreatingConsumers = new AtomicBoolean(false);
        this.tryAgainRecreatingConsumers = new AtomicBoolean(false);
        this.handlerPerType = new HashMap();
        this.addresses = new HashSet();
        this.startCalled = new AtomicBoolean();
        this.stopCalled = new AtomicBoolean();
        this.receiverFactory = new CachingClientFactory<>(honoConnection.getVertx(), protonReceiver -> {
            return true;
        });
    }

    public Future<Void> start() {
        return !this.startCalled.compareAndSet(false, true) ? Future.succeededFuture() : connectOnStart().onComplete(asyncResult -> {
            if (this.addresses.isEmpty()) {
                this.log.warn("no notification consumers registered - nothing to do");
            } else {
                this.connection.addReconnectListener(honoConnection -> {
                    recreateConsumers();
                });
                recreateConsumers();
            }
        });
    }

    public Future<Void> stop() {
        if (!this.stopCalled.compareAndSet(false, true)) {
            return Future.succeededFuture();
        }
        this.addresses.clear();
        this.handlerPerType.clear();
        return disconnectOnStop();
    }

    protected void onDisconnect() {
        this.receiverFactory.onDisconnect();
    }

    public <T extends AbstractNotification> void registerConsumer(NotificationType<T> notificationType, Handler<T> handler) {
        if (this.startCalled.get()) {
            throw new IllegalStateException("consumers cannot be added when consumer is already started");
        }
        String address = NotificationAddressHelper.getAddress(notificationType);
        this.addresses.add(address);
        this.handlerPerType.put(notificationType.getClazz(), handler);
        this.log.debug("registered notification receiver [type: {}; address: {}]", notificationType.getClazz().getSimpleName(), address);
    }

    private void recreateConsumers() {
        if (this.recreatingConsumers.compareAndSet(false, true)) {
            this.log.debug("recreate notification consumer links");
            this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r5 -> {
                ArrayList arrayList = new ArrayList();
                this.addresses.forEach(str -> {
                    arrayList.add(createNotificationConsumerIfNeeded(str));
                });
                return Future.join(arrayList);
            }).onComplete(asyncResult -> {
                this.recreatingConsumers.set(false);
                if (this.tryAgainRecreatingConsumers.compareAndSet(true, false) || asyncResult.failed()) {
                    if (asyncResult.succeeded()) {
                        recreateConsumers();
                    } else {
                        invokeRecreateConsumersWithDelay();
                    }
                }
            });
        } else {
            this.log.debug("already recreating consumers");
            this.tryAgainRecreatingConsumers.set(true);
        }
    }

    private void invokeRecreateConsumersWithDelay() {
        this.connection.getVertx().setTimer(20L, l -> {
            recreateConsumers();
        });
    }

    private Future<ProtonReceiver> createNotificationConsumerIfNeeded(String str) {
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r6 -> {
            return this.connection.executeOnContext(promise -> {
                this.receiverFactory.getOrCreateClient(str, () -> {
                    return createProtonReceiver(str);
                }, promise);
            });
        });
    }

    private Future<ProtonReceiver> createProtonReceiver(String str) {
        this.log.debug("creating new notification receiver link [address: {}]", str);
        return this.connection.createReceiver(str, ProtonQoS.AT_LEAST_ONCE, getProtonMessageHandler(str), str2 -> {
            this.log.debug("notification receiver link [address: {}] closed remotely", str);
            this.receiverFactory.removeClient(str);
            invokeRecreateConsumersWithDelay();
        }).onSuccess(protonReceiver -> {
            this.log.debug("successfully created notification receiver link [address: {}]", str);
        }).onFailure(th -> {
            this.log.debug("failed to create notification receiver link [address: {}]", str, th);
        });
    }

    private ProtonMessageHandler getProtonMessageHandler(String str) {
        return (protonDelivery, message) -> {
            Buffer payload = AmqpUtils.getPayload(message);
            if (payload != null) {
                JsonObject jsonObject = payload.toJsonObject();
                if (this.log.isTraceEnabled()) {
                    this.log.trace("received notification:{}{}", System.lineSeparator(), jsonObject.encodePrettily());
                }
                AbstractNotification abstractNotification = (AbstractNotification) jsonObject.mapTo(AbstractNotification.class);
                String address = NotificationAddressHelper.getAddress(abstractNotification.getType());
                if (!str.equals(address)) {
                    this.log.warn("got notification of type [{}] on unexpected address [{}]; expected address is [{}]", new Object[]{abstractNotification.getClass(), str, address});
                }
                Handler<? extends AbstractNotification> handler = this.handlerPerType.get(abstractNotification.getClass());
                if (handler != null) {
                    handler.handle(abstractNotification);
                }
            }
        };
    }
}
