package org.eclipse.hono.adapter.client.command.amqp;

import io.opentracing.SpanContext;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.hono.adapter.client.amqp.AbstractServiceClient;
import org.eclipse.hono.adapter.client.command.CommandConsumer;
import org.eclipse.hono.adapter.client.command.CommandConsumerFactory;
import org.eclipse.hono.adapter.client.command.CommandContext;
import org.eclipse.hono.adapter.client.command.CommandRouterClient;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.impl.AdapterInstanceCommandHandler;
import org.eclipse.hono.client.impl.CommandHandlerWrapper;
import org.eclipse.hono.config.ProtocolAdapterProperties;

/* loaded from: input_file:org/eclipse/hono/adapter/client/command/amqp/ProtonBasedCommandRouterCommandConsumerFactoryImpl.class */
public class ProtonBasedCommandRouterCommandConsumerFactoryImpl extends AbstractServiceClient implements CommandConsumerFactory {
    private static final int RECREATE_CONSUMER_DELAY = 20;
    private final String adapterInstanceId;
    private final AdapterInstanceCommandHandler adapterInstanceCommandHandler;
    private final AtomicBoolean recreatingConsumer;
    private final AtomicBoolean tryAgainRecreatingConsumer;
    private final CommandRouterClient commandRouterClient;
    private ProtonReceiver adapterSpecificConsumer;

    public ProtonBasedCommandRouterCommandConsumerFactoryImpl(HonoConnection honoConnection, SendMessageSampler.Factory factory, ProtocolAdapterProperties protocolAdapterProperties, CommandRouterClient commandRouterClient) {
        super(honoConnection, factory, protocolAdapterProperties);
        this.recreatingConsumer = new AtomicBoolean(false);
        this.tryAgainRecreatingConsumer = new AtomicBoolean(false);
        this.commandRouterClient = (CommandRouterClient) Objects.requireNonNull(commandRouterClient);
        this.adapterInstanceId = honoConnection.getContainerId();
        this.adapterInstanceCommandHandler = new AdapterInstanceCommandHandler(honoConnection.getTracer(), this.adapterInstanceId);
    }

    @Override // org.eclipse.hono.adapter.client.amqp.AbstractServiceClient
    public Future<Void> start() {
        return super.start().onComplete(asyncResult -> {
            this.connection.addReconnectListener(honoConnection -> {
                recreateConsumer();
            });
            recreateConsumer();
        });
    }

    @Override // org.eclipse.hono.adapter.client.amqp.AbstractServiceClient
    protected void onDisconnect() {
        this.adapterSpecificConsumer = null;
    }

    public final Future<CommandConsumer> createCommandConsumer(String str, String str2, Handler<CommandContext> handler, Duration duration, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(handler);
        return doCreateCommandConsumer(str, str2, null, handler, duration, spanContext);
    }

    public final Future<CommandConsumer> createCommandConsumer(String str, String str2, String str3, Handler<CommandContext> handler, Duration duration, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Objects.requireNonNull(handler);
        return doCreateCommandConsumer(str, str2, str3, handler, duration, spanContext);
    }

    private Future<CommandConsumer> doCreateCommandConsumer(String str, String str2, String str3, Handler<CommandContext> handler, Duration duration, SpanContext spanContext) {
        Duration ofSeconds = (duration == null || duration.isNegative() || duration.getSeconds() > 9223372036L) ? Duration.ofSeconds(-1L) : duration;
        this.log.trace("create command consumer [tenant-id: {}, device-id: {}, gateway-id: {}]", new Object[]{str, str2, str3});
        return this.connection.executeOnContext(promise -> {
            CommandHandlerWrapper commandHandlerWrapper = new CommandHandlerWrapper(str, str2, str3, commandContext -> {
                handler.handle(new ProtonBasedCommandContext(commandContext));
            });
            if (this.adapterInstanceCommandHandler.putDeviceSpecificCommandHandler(commandHandlerWrapper) != null) {
            }
            Instant now = Instant.now();
            registerCommandConsumer(str, str2, ofSeconds, spanContext).map(r11 -> {
                return new CommandConsumer() { // from class: org.eclipse.hono.adapter.client.command.amqp.ProtonBasedCommandRouterCommandConsumerFactoryImpl.1
                    public Future<Void> close(SpanContext spanContext2) {
                        return ProtonBasedCommandRouterCommandConsumerFactoryImpl.this.removeCommandConsumer(commandHandlerWrapper, ofSeconds, now, spanContext2);
                    }
                };
            }).onComplete(promise);
        });
    }

    private Future<Void> registerCommandConsumer(String str, String str2, Duration duration, SpanContext spanContext) {
        return this.commandRouterClient.registerCommandConsumer(str, str2, this.adapterInstanceId, duration, spanContext).recover(th -> {
            this.log.info("error registering consumer with the command router service [tenant: {}, device: {}]", new Object[]{str, str2, th});
            this.adapterInstanceCommandHandler.removeDeviceSpecificCommandHandler(str, str2);
            return Future.failedFuture(th);
        });
    }

    private Future<Void> removeCommandConsumer(CommandHandlerWrapper commandHandlerWrapper, Duration duration, Instant instant, SpanContext spanContext) {
        String tenantId = commandHandlerWrapper.getTenantId();
        String deviceId = commandHandlerWrapper.getDeviceId();
        this.log.trace("remove command consumer [tenant-id: {}, device-id: {}]", tenantId, deviceId);
        if (this.adapterInstanceCommandHandler.removeDeviceSpecificCommandHandler(commandHandlerWrapper)) {
            return this.commandRouterClient.unregisterCommandConsumer(tenantId, deviceId, this.adapterInstanceId, spanContext).recover(th -> {
                if (ServiceInvocationException.extractStatusCode(th) != 412) {
                    this.log.info("error unregistering consumer with the command router service [tenant: {}, device: {}]", new Object[]{tenantId, deviceId, th});
                    return Future.failedFuture(th);
                }
                if (!duration.isNegative() && Instant.now().isAfter(instant.plus((TemporalAmount) duration))) {
                    this.log.trace("ignoring 412 error when unregistering consumer with the command router service; entry may have already expired [tenant: {}, device: {}]", tenantId, deviceId);
                    return Future.succeededFuture();
                }
                this.log.debug("consumer not unregistered - not matched or already removed [tenant: {}, device: {}]", tenantId, deviceId);
                return Future.failedFuture(new ClientErrorException(412, "no matching command consumer mapping found to be removed"));
            });
        }
        this.log.debug("command consumer not removed - handler already replaced or removed [tenant: {}, device: {}]", tenantId, deviceId);
        return Future.failedFuture(new ClientErrorException(412, "local command handler already replaced or removed"));
    }

    private Future<ProtonReceiver> createAdapterSpecificConsumer() {
        this.log.trace("creating new adapter instance command consumer");
        return this.connection.createReceiver("command_internal/" + this.adapterInstanceId, ProtonQoS.AT_LEAST_ONCE, (protonDelivery, message) -> {
            this.adapterInstanceCommandHandler.handleCommandMessage(message, protonDelivery);
        }, this.connection.getConfig().getInitialCredits(), false, str -> {
            this.log.debug("command receiver link closed remotely");
            invokeRecreateConsumerWithDelay();
        }).map(protonReceiver -> {
            this.log.debug("successfully created adapter specific command consumer");
            this.adapterSpecificConsumer = protonReceiver;
            return protonReceiver;
        }).recover(th -> {
            this.log.error("failed to create adapter specific command consumer", th);
            return Future.failedFuture(th);
        });
    }

    private void recreateConsumer() {
        if (this.recreatingConsumer.compareAndSet(false, true)) {
            this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r4 -> {
                if (this.adapterSpecificConsumer != null && this.adapterSpecificConsumer.isOpen()) {
                    return Future.succeededFuture();
                }
                this.log.debug("recreate adapter specific command consumer link");
                return createAdapterSpecificConsumer();
            }).onComplete(asyncResult -> {
                this.recreatingConsumer.set(false);
                if (this.tryAgainRecreatingConsumer.compareAndSet(true, false) || asyncResult.failed()) {
                    if (asyncResult.succeeded()) {
                        recreateConsumer();
                    } else {
                        invokeRecreateConsumerWithDelay();
                    }
                }
            });
        } else {
            this.log.debug("already recreating consumer");
            this.tryAgainRecreatingConsumer.set(true);
        }
    }

    private void invokeRecreateConsumerWithDelay() {
        this.connection.getVertx().setTimer(20L, l -> {
            recreateConsumer();
        });
    }
}
