package org.eclipse.hono.commandrouter.impl.amqp;

import io.opentracing.SpanContext;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.proton.ProtonQoS;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.hono.adapter.client.command.CommandConsumer;
import org.eclipse.hono.adapter.client.registry.TenantClient;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.amqp.AbstractServiceClient;
import org.eclipse.hono.client.impl.CachingClientFactory;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.util.AddressHelper;

/* loaded from: input_file:org/eclipse/hono/commandrouter/impl/amqp/ProtonBasedCommandConsumerFactoryImpl.class */
public class ProtonBasedCommandConsumerFactoryImpl extends AbstractServiceClient implements CommandConsumerFactory {
    private static final int RECREATE_CONSUMERS_DELAY = 20;
    private final CachingClientFactory<CommandConsumer> mappingAndDelegatingCommandConsumerFactory;
    private final AtomicBoolean recreatingConsumers;
    private final AtomicBoolean tryAgainRecreatingConsumers;
    private final ProtonBasedMappingAndDelegatingCommandHandler mappingAndDelegatingCommandHandler;
    private final Set<String> consumerLinkTenants;

    public ProtonBasedCommandConsumerFactoryImpl(HonoConnection honoConnection, TenantClient tenantClient, CommandTargetMapper commandTargetMapper, SendMessageSampler.Factory factory) {
        super(honoConnection, factory);
        this.recreatingConsumers = new AtomicBoolean(false);
        this.tryAgainRecreatingConsumers = new AtomicBoolean(false);
        this.consumerLinkTenants = new HashSet();
        Objects.requireNonNull(tenantClient);
        Objects.requireNonNull(commandTargetMapper);
        this.mappingAndDelegatingCommandHandler = new ProtonBasedMappingAndDelegatingCommandHandler(tenantClient, honoConnection, commandTargetMapper);
        this.mappingAndDelegatingCommandConsumerFactory = new CachingClientFactory<>(honoConnection.getVertx(), commandConsumer -> {
            return true;
        });
    }

    public Future<Void> start() {
        return this.connection.connect().onSuccess(honoConnection -> {
            this.log.info("connection to {} endpoint has been established", this.connection.getConfig().getServerRole());
        }).onFailure(th -> {
            this.log.warn("failed to establish connection to {} endpoint", this.connection.getConfig().getServerRole(), th);
        }).map(honoConnection2 -> {
            this.connection.addReconnectListener(honoConnection2 -> {
                recreateConsumers();
            });
            recreateConsumers();
            return null;
        });
    }

    protected void onDisconnect() {
        this.mappingAndDelegatingCommandConsumerFactory.clearState();
        this.consumerLinkTenants.clear();
    }

    @Override // org.eclipse.hono.commandrouter.CommandConsumerFactory
    public final Future<Void> createCommandConsumer(String str, SpanContext spanContext) {
        Objects.requireNonNull(str);
        return this.connection.executeOnContext(promise -> {
            getOrCreateMappingAndDelegatingCommandConsumer(str).map((Void) null).onComplete(promise);
        });
    }

    private Future<CommandConsumer> getOrCreateMappingAndDelegatingCommandConsumer(String str) {
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r6 -> {
            return this.connection.executeOnContext(promise -> {
                this.mappingAndDelegatingCommandConsumerFactory.getOrCreateClient(str, () -> {
                    return newMappingAndDelegatingCommandConsumer(str);
                }, promise);
            });
        }).recover(th -> {
            this.log.debug("failed to create mappingAndDelegatingCommandConsumer for tenant {}", str, th);
            return Future.failedFuture(th);
        });
    }

    private Future<CommandConsumer> newMappingAndDelegatingCommandConsumer(String str) {
        this.log.trace("creating new MappingAndDelegatingCommandConsumer [tenant-id: {}]", str);
        String targetAddress = AddressHelper.getTargetAddress("command", str, (String) null, this.connection.getConfig());
        return this.connection.createReceiver(targetAddress, ProtonQoS.AT_LEAST_ONCE, (protonDelivery, message) -> {
            this.mappingAndDelegatingCommandHandler.mapAndDelegateIncomingCommandMessage(str, protonDelivery, message);
        }, this.connection.getConfig().getInitialCredits(), false, str2 -> {
            this.log.debug("MappingAndDelegatingCommandConsumer receiver link [tenant-id: {}] closed remotely", str);
            this.mappingAndDelegatingCommandConsumerFactory.removeClient(str);
            invokeRecreateConsumersWithDelay();
        }).map(protonReceiver -> {
            this.log.debug("successfully created MappingAndDelegatingCommandConsumer [{}]", targetAddress);
            this.consumerLinkTenants.add(str);
            return new CommandConsumer() { // from class: org.eclipse.hono.commandrouter.impl.amqp.ProtonBasedCommandConsumerFactoryImpl.1
                public Future<Void> close(SpanContext spanContext) {
                    ProtonBasedCommandConsumerFactoryImpl.this.log.debug("MappingAndDelegatingCommandConsumer receiver link [tenant-id: {}] closed locally", str);
                    ProtonBasedCommandConsumerFactoryImpl.this.mappingAndDelegatingCommandConsumerFactory.removeClient(str);
                    ProtonBasedCommandConsumerFactoryImpl.this.consumerLinkTenants.remove(str);
                    Promise promise = Promise.promise();
                    ProtonBasedCommandConsumerFactoryImpl.this.connection.closeAndFree(protonReceiver, r3 -> {
                        promise.complete();
                    });
                    return promise.future();
                }
            };
        }).recover(th -> {
            this.log.debug("failed to create MappingAndDelegatingCommandConsumer [tenant-id: {}]", str, th);
            return Future.failedFuture(th);
        });
    }

    private void recreateConsumers() {
        if (this.recreatingConsumers.compareAndSet(false, true)) {
            this.log.debug("recreate command consumer links");
            this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r5 -> {
                ArrayList arrayList = new ArrayList();
                this.consumerLinkTenants.forEach(str -> {
                    this.log.debug("recreate command consumer link for tenant {}", str);
                    arrayList.add(getOrCreateMappingAndDelegatingCommandConsumer(str));
                });
                return CompositeFuture.join(arrayList);
            }).onComplete(asyncResult -> {
                this.recreatingConsumers.set(false);
                if (this.tryAgainRecreatingConsumers.compareAndSet(true, false) || asyncResult.failed()) {
                    if (asyncResult.succeeded()) {
                        recreateConsumers();
                    } else {
                        invokeRecreateConsumersWithDelay();
                    }
                }
            });
        } else {
            this.log.debug("already recreating consumers");
            this.tryAgainRecreatingConsumers.set(true);
        }
    }

    private void invokeRecreateConsumersWithDelay() {
        this.connection.getVertx().setTimer(20L, l -> {
            recreateConsumers();
        });
    }
}
