package org.eclipse.hono.client.command.amqp;

import io.opentracing.Span;
import io.opentracing.Tracer;
import io.vertx.core.Future;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.amqp.AbstractServiceClient;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.HonoProtonHelper;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandHandlerWrapper;
import org.eclipse.hono.client.command.CommandHandlers;
import org.eclipse.hono.client.command.InternalCommandConsumer;
import org.eclipse.hono.tracing.TracingHelper;

/* loaded from: input_file:org/eclipse/hono/client/command/amqp/ProtonBasedInternalCommandConsumer.class */
public class ProtonBasedInternalCommandConsumer extends AbstractServiceClient implements InternalCommandConsumer {
    private static final int RECREATE_CONSUMER_DELAY = 20;
    private final String adapterInstanceId;
    private final CommandHandlers commandHandlers;
    private final AtomicBoolean recreatingConsumer;
    private final AtomicBoolean tryAgainRecreatingConsumer;
    private final Tracer tracer;
    private ProtonReceiver adapterSpecificConsumer;

    public ProtonBasedInternalCommandConsumer(HonoConnection honoConnection, String str, CommandHandlers commandHandlers) {
        super(honoConnection, SendMessageSampler.Factory.noop());
        this.recreatingConsumer = new AtomicBoolean(false);
        this.tryAgainRecreatingConsumer = new AtomicBoolean(false);
        this.commandHandlers = (CommandHandlers) Objects.requireNonNull(commandHandlers);
        this.adapterInstanceId = (String) Objects.requireNonNull(str);
        this.tracer = honoConnection.getTracer();
    }

    public Future<Void> start() {
        return super.start().onComplete(asyncResult -> {
            this.connection.addReconnectListener(honoConnection -> {
                recreateConsumer();
            });
            recreateConsumer();
        });
    }

    protected void onDisconnect() {
        this.adapterSpecificConsumer = null;
    }

    private Future<ProtonReceiver> createAdapterSpecificConsumer() {
        this.log.trace("creating new adapter instance command consumer");
        return this.connection.createReceiver("command_internal/" + this.adapterInstanceId, ProtonQoS.AT_LEAST_ONCE, this::handleCommandMessage, this.connection.getConfig().getInitialCredits(), false, str -> {
            this.log.debug("command receiver link closed remotely");
            invokeRecreateConsumerWithDelay();
        }).map(protonReceiver -> {
            this.log.debug("successfully created adapter specific command consumer");
            this.adapterSpecificConsumer = protonReceiver;
            return protonReceiver;
        }).recover(th -> {
            this.log.error("failed to create adapter specific command consumer", th);
            return Future.failedFuture(th);
        });
    }

    void handleCommandMessage(ProtonDelivery protonDelivery, Message message) {
        try {
            ProtonBasedCommand fromRoutedCommandMessage = ProtonBasedCommand.fromRoutedCommandMessage(message);
            CommandHandlerWrapper commandHandler = this.commandHandlers.getCommandHandler(fromRoutedCommandMessage.getTenant(), fromRoutedCommandMessage.getGatewayOrDeviceId());
            if (commandHandler != null && commandHandler.getGatewayId() != null) {
                fromRoutedCommandMessage.setGatewayId(commandHandler.getGatewayId());
            }
            Span createSpan = CommandContext.createSpan(this.tracer, fromRoutedCommandMessage, AmqpUtils.extractSpanContext(this.tracer, message), commandHandler != null ? commandHandler.getConsumerCreationSpanContext() : null, getClass().getSimpleName());
            TracingHelper.TAG_ADAPTER_INSTANCE_ID.set(createSpan, this.adapterInstanceId);
            ProtonBasedCommandContext protonBasedCommandContext = new ProtonBasedCommandContext(fromRoutedCommandMessage, protonDelivery, createSpan);
            if (commandHandler != null) {
                this.log.trace("using [{}] for received command [{}]", commandHandler, fromRoutedCommandMessage);
                commandHandler.handleCommand(protonBasedCommandContext);
            } else {
                this.log.info("no command handler found for command [{}]", fromRoutedCommandMessage);
                protonBasedCommandContext.release(new NoConsumerException("no command handler found for command"));
            }
        } catch (IllegalArgumentException e) {
            this.log.debug("address of command message is invalid: {}", message.getAddress());
            Rejected rejected = new Rejected();
            rejected.setError(new ErrorCondition(AmqpUtils.AMQP_BAD_REQUEST, "invalid command target address"));
            protonDelivery.disposition(rejected, true);
        }
    }

    private void recreateConsumer() {
        if (this.recreatingConsumer.compareAndSet(false, true)) {
            this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r4 -> {
                if (HonoProtonHelper.isLinkOpenAndConnected(this.adapterSpecificConsumer)) {
                    return Future.succeededFuture();
                }
                this.log.debug("recreate adapter specific command consumer link");
                return createAdapterSpecificConsumer();
            }).onComplete(asyncResult -> {
                this.recreatingConsumer.set(false);
                if (this.tryAgainRecreatingConsumer.compareAndSet(true, false) || asyncResult.failed()) {
                    if (asyncResult.succeeded()) {
                        recreateConsumer();
                    } else {
                        invokeRecreateConsumerWithDelay();
                    }
                }
            });
        } else {
            this.log.debug("already recreating consumer");
            this.tryAgainRecreatingConsumer.set(true);
        }
    }

    private void invokeRecreateConsumerWithDelay() {
        this.connection.getVertx().setTimer(20L, l -> {
            recreateConsumer();
        });
    }
}
