package org.eclipse.hono.commandrouter.impl.amqp;

import io.opentracing.SpanContext;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.proton.ProtonQoS;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.hono.adapter.client.amqp.AbstractServiceClient;
import org.eclipse.hono.client.CommandTargetMapper;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.impl.CachingClientFactory;
import org.eclipse.hono.client.impl.CommandConsumer;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandRouterServiceConfigProperties;
import org.eclipse.hono.util.AddressHelper;

/* loaded from: input_file:org/eclipse/hono/commandrouter/impl/amqp/ProtonBasedCommandConsumerFactoryImpl.class */
public class ProtonBasedCommandConsumerFactoryImpl extends AbstractServiceClient implements CommandConsumerFactory {
    private static final int RECREATE_CONSUMERS_DELAY = 20;
    private CachingClientFactory<MessageConsumer> mappingAndDelegatingCommandConsumerFactory;
    private final AtomicBoolean recreatingConsumers;
    private final AtomicBoolean tryAgainRecreatingConsumers;
    private MappingAndDelegatingCommandHandler mappingAndDelegatingCommandHandler;
    private final AtomicBoolean initialized;
    private final Set<String> consumerLinkTenants;

    public ProtonBasedCommandConsumerFactoryImpl(HonoConnection honoConnection, SendMessageSampler.Factory factory, CommandRouterServiceConfigProperties commandRouterServiceConfigProperties) {
        super(honoConnection, factory, commandRouterServiceConfigProperties);
        this.recreatingConsumers = new AtomicBoolean(false);
        this.tryAgainRecreatingConsumers = new AtomicBoolean(false);
        this.initialized = new AtomicBoolean(false);
        this.consumerLinkTenants = new HashSet();
    }

    @Override // org.eclipse.hono.commandrouter.CommandConsumerFactory
    public void initialize(CommandTargetMapper commandTargetMapper) {
        Objects.requireNonNull(commandTargetMapper);
        this.mappingAndDelegatingCommandHandler = new MappingAndDelegatingCommandHandler(this.connection, commandTargetMapper, this.samplerFactory.create("command"));
        this.mappingAndDelegatingCommandConsumerFactory = new CachingClientFactory<>(this.connection.getVertx(), messageConsumer -> {
            return true;
        });
        this.connection.addReconnectListener(honoConnection -> {
            recreateConsumers();
        });
        recreateConsumers();
        this.initialized.set(true);
    }

    protected void onDisconnect() {
        this.mappingAndDelegatingCommandConsumerFactory.clearState();
        this.consumerLinkTenants.clear();
    }

    @Override // org.eclipse.hono.commandrouter.CommandConsumerFactory
    public final Future<Void> createCommandConsumer(String str, SpanContext spanContext) {
        Objects.requireNonNull(str);
        if (this.initialized.get()) {
            return this.connection.executeOnContext(promise -> {
                getOrCreateMappingAndDelegatingCommandConsumer(str).map((Void) null).onComplete(promise);
            });
        }
        this.log.error("not initialized");
        return Future.failedFuture(new ServerErrorException(500));
    }

    private Future<MessageConsumer> getOrCreateMappingAndDelegatingCommandConsumer(String str) {
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r6 -> {
            return this.connection.executeOnContext(promise -> {
                this.mappingAndDelegatingCommandConsumerFactory.getOrCreateClient(str, () -> {
                    return newMappingAndDelegatingCommandConsumer(str);
                }, promise);
            });
        }).recover(th -> {
            this.log.debug("failed to create mappingAndDelegatingCommandConsumer for tenant {}", str, th);
            return Future.failedFuture(th);
        });
    }

    private Future<MessageConsumer> newMappingAndDelegatingCommandConsumer(String str) {
        this.log.trace("creating new MappingAndDelegatingCommandConsumer [tenant-id: {}]", str);
        String targetAddress = AddressHelper.getTargetAddress("command", str, (String) null, this.connection.getConfig());
        return this.connection.createReceiver(targetAddress, ProtonQoS.AT_LEAST_ONCE, (protonDelivery, message) -> {
            this.mappingAndDelegatingCommandHandler.mapAndDelegateIncomingCommandMessage(str, protonDelivery, message);
        }, this.connection.getConfig().getInitialCredits(), false, str2 -> {
            this.log.debug("MappingAndDelegatingCommandConsumer receiver link [tenant-id: {}] closed remotely", str);
            this.mappingAndDelegatingCommandConsumerFactory.removeClient(str);
            invokeRecreateConsumersWithDelay();
        }).map(protonReceiver -> {
            this.log.debug("successfully created MappingAndDelegatingCommandConsumer [{}]", targetAddress);
            this.consumerLinkTenants.add(str);
            CommandConsumer commandConsumer = new CommandConsumer(this.connection, protonReceiver);
            commandConsumer.setLocalCloseHandler(str3 -> {
                this.log.debug("MappingAndDelegatingCommandConsumer receiver link [tenant-id: {}] closed locally", str);
                this.mappingAndDelegatingCommandConsumerFactory.removeClient(str);
                this.consumerLinkTenants.remove(str);
            });
            return commandConsumer;
        }).recover(th -> {
            this.log.debug("failed to create MappingAndDelegatingCommandConsumer [tenant-id: {}]", str, th);
            return Future.failedFuture(th);
        });
    }

    private void recreateConsumers() {
        if (this.recreatingConsumers.compareAndSet(false, true)) {
            this.log.debug("recreate command consumer links");
            this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r5 -> {
                ArrayList arrayList = new ArrayList();
                this.consumerLinkTenants.forEach(str -> {
                    this.log.debug("recreate command consumer link for tenant {}", str);
                    arrayList.add(getOrCreateMappingAndDelegatingCommandConsumer(str));
                });
                return CompositeFuture.join(arrayList);
            }).onComplete(asyncResult -> {
                this.recreatingConsumers.set(false);
                if (this.tryAgainRecreatingConsumers.compareAndSet(true, false) || asyncResult.failed()) {
                    if (asyncResult.succeeded()) {
                        recreateConsumers();
                    } else {
                        invokeRecreateConsumersWithDelay();
                    }
                }
            });
        } else {
            this.log.debug("already recreating consumers");
            this.tryAgainRecreatingConsumers.set(true);
        }
    }

    private void invokeRecreateConsumersWithDelay() {
        this.connection.getVertx().setTimer(20L, l -> {
            recreateConsumers();
        });
    }
}
