package org.eclipse.hono.client.command.pubsub;

import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.PubsubMessage;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import java.util.Objects;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandHandlerWrapper;
import org.eclipse.hono.client.command.CommandHandlers;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.command.InternalCommandConsumer;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory;
import org.eclipse.hono.client.pubsub.tracing.PubSubTracingHelper;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.LifecycleStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.class */
public class PubSubBasedInternalCommandConsumer implements InternalCommandConsumer {
    private static final Logger log = LoggerFactory.getLogger(PubSubBasedInternalCommandConsumer.class);
    private final CommandResponseSender commandResponseSender;
    private final String adapterInstanceId;
    private final CommandHandlers commandHandlers;
    private final TenantClient tenantClient;
    private final Tracer tracer;
    private final PubSubSubscriberFactory subscriberFactory;
    private final LifecycleStatus lifecycleStatus = new LifecycleStatus();
    private final PubSubBasedAdminClientManager adminClientManager;
    private final Vertx vertx;
    private MessageReceiver receiver;

    public PubSubBasedInternalCommandConsumer(CommandResponseSender commandResponseSender, Vertx vertx, String str, CommandHandlers commandHandlers, TenantClient tenantClient, Tracer tracer, PubSubSubscriberFactory pubSubSubscriberFactory, String str2, CredentialsProvider credentialsProvider) {
        Objects.requireNonNull(str2);
        Objects.requireNonNull(credentialsProvider);
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.commandResponseSender = (CommandResponseSender) Objects.requireNonNull(commandResponseSender);
        this.adapterInstanceId = (String) Objects.requireNonNull(str);
        this.commandHandlers = (CommandHandlers) Objects.requireNonNull(commandHandlers);
        this.tenantClient = (TenantClient) Objects.requireNonNull(tenantClient);
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
        this.subscriberFactory = (PubSubSubscriberFactory) Objects.requireNonNull(pubSubSubscriberFactory);
        this.adminClientManager = new PubSubBasedAdminClientManager(str2, credentialsProvider);
        createReceiver();
        this.adminClientManager.getOrCreateTopic("command_internal", str).onFailure(th -> {
            log.error("Could not create topic for endpoint {} and {}", new Object[]{"command_internal", str, th});
        }).compose(str3 -> {
            return this.adminClientManager.getOrCreateSubscription("command_internal", str);
        }).onFailure(th2 -> {
            log.error("Could not create subscription for endpoint {} and {}", new Object[]{"command_internal", str, th2});
        }).onSuccess(str4 -> {
            pubSubSubscriberFactory.getOrCreateSubscriber(str4, this.receiver);
        });
    }

    public PubSubBasedInternalCommandConsumer(CommandResponseSender commandResponseSender, Vertx vertx, String str, CommandHandlers commandHandlers, TenantClient tenantClient, Tracer tracer, PubSubSubscriberFactory pubSubSubscriberFactory, PubSubBasedAdminClientManager pubSubBasedAdminClientManager, MessageReceiver messageReceiver) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.commandResponseSender = (CommandResponseSender) Objects.requireNonNull(commandResponseSender);
        this.adapterInstanceId = (String) Objects.requireNonNull(str);
        this.commandHandlers = (CommandHandlers) Objects.requireNonNull(commandHandlers);
        this.tenantClient = (TenantClient) Objects.requireNonNull(tenantClient);
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
        this.subscriberFactory = (PubSubSubscriberFactory) Objects.requireNonNull(pubSubSubscriberFactory);
        this.adminClientManager = (PubSubBasedAdminClientManager) Objects.requireNonNull(pubSubBasedAdminClientManager);
        this.receiver = (MessageReceiver) Objects.requireNonNull(messageReceiver);
        pubSubBasedAdminClientManager.getOrCreateTopic("command_internal", str).onFailure(th -> {
            log.error("Could not create topic for endpoint {} and {}", new Object[]{"command_internal", str, th});
        }).compose(str2 -> {
            return pubSubBasedAdminClientManager.getOrCreateSubscription("command_internal", str);
        }).onFailure(th2 -> {
            log.error("Could not create subscription for endpoint {} and {}", new Object[]{"command_internal", str, th2});
        }).onSuccess(str3 -> {
            pubSubSubscriberFactory.getOrCreateSubscriber(str3, messageReceiver);
        });
    }

    public void registerReadinessChecks(HealthCheckHandler healthCheckHandler) {
        log.trace("registering readiness check using Pub/Sub based internal command consumer [adapter instance id: {}]", this.adapterInstanceId);
        healthCheckHandler.register("internal-command-consumer[%s]-readiness".formatted(this.adapterInstanceId), promise -> {
            if (this.lifecycleStatus.isStarted()) {
                promise.tryComplete(Status.OK());
                return;
            }
            JsonObject jsonObject = new JsonObject();
            if (this.lifecycleStatus.isStarting()) {
                if (this.subscriberFactory.getSubscriber("command_internal", this.adapterInstanceId).isEmpty()) {
                    log.debug("readiness check failed, subscriber not created yet");
                    jsonObject.put("status", "subscriber not created yet");
                } else {
                    log.debug("readiness check failed");
                }
            }
            promise.tryComplete(Status.KO(jsonObject));
        });
    }

    public void registerLivenessChecks(HealthCheckHandler healthCheckHandler) {
    }

    public Future<Void> start() {
        if (this.lifecycleStatus.isStarting()) {
            return Future.succeededFuture();
        }
        if (!this.lifecycleStatus.setStarting()) {
            return Future.failedFuture(new IllegalStateException("subscriber is already started/stopping"));
        }
        return this.subscriberFactory.getOrCreateSubscriber(PubSubMessageHelper.getTopicName("command_internal", this.adapterInstanceId), this.receiver).subscribe(true).onSuccess(r3 -> {
            this.lifecycleStatus.setStarted();
        }).onFailure(th -> {
            log.warn("Error starting Internal Command Consumer for adapter {}", this.adapterInstanceId, th);
        });
    }

    private void createReceiver() {
        this.receiver = this::handleCommandMessage;
    }

    Future<Void> handleCommandMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
        ackReplyConsumer.ack();
        try {
            PubSubBasedCommand fromRoutedCommandMessage = PubSubBasedCommand.fromRoutedCommandMessage(pubsubMessage);
            CommandHandlerWrapper commandHandler = this.commandHandlers.getCommandHandler(fromRoutedCommandMessage.getTenant(), fromRoutedCommandMessage.getGatewayOrDeviceId());
            if (commandHandler != null && commandHandler.getGatewayId() != null) {
                fromRoutedCommandMessage.setGatewayId(commandHandler.getGatewayId());
            }
            Span createSpan = CommandContext.createSpan(this.tracer, fromRoutedCommandMessage, PubSubTracingHelper.extractSpanContext(this.tracer, pubsubMessage), commandHandler != null ? commandHandler.getConsumerCreationSpanContext() : null, getClass().getSimpleName());
            TracingHelper.TAG_ADAPTER_INSTANCE_ID.set(createSpan, this.adapterInstanceId);
            PubSubBasedCommandContext pubSubBasedCommandContext = new PubSubBasedCommandContext(fromRoutedCommandMessage, this.commandResponseSender, createSpan);
            return this.tenantClient.get(fromRoutedCommandMessage.getTenant(), (SpanContext) null).recover(th -> {
                log.warn("error retrieving tenant configuration [{}]", fromRoutedCommandMessage);
                ServerErrorException serverErrorException = new ServerErrorException(fromRoutedCommandMessage.getTenant(), 503, "error retrieving tenant configuration", th);
                pubSubBasedCommandContext.release(serverErrorException);
                return Future.failedFuture(serverErrorException);
            }).compose(tenantObject -> {
                pubSubBasedCommandContext.put("tenant-config", tenantObject);
                if (commandHandler != null) {
                    log.debug("using [{}] for received command [{}]", commandHandler, fromRoutedCommandMessage);
                    return commandHandler.handleCommand(pubSubBasedCommandContext);
                }
                log.info("no command handler found for command [{}]", fromRoutedCommandMessage);
                NoConsumerException noConsumerException = new NoConsumerException("no command handler found for command");
                pubSubBasedCommandContext.release(noConsumerException);
                return Future.failedFuture(noConsumerException);
            });
        } catch (IllegalArgumentException e) {
            log.warn("Command record is invalid [tenant-id: {}, device-id: {}]", new Object[]{PubSubMessageHelper.getTenantId(pubsubMessage.getAttributesMap()).orElse(null), PubSubMessageHelper.getDeviceId(pubsubMessage.getAttributesMap()).orElse(null), e});
            return Future.failedFuture("invalid command message");
        }
    }

    public Future<Void> stop() {
        return this.lifecycleStatus.runStopAttempt(() -> {
            return CompositeFuture.all(this.subscriberFactory.closeSubscriber("command_internal", this.adapterInstanceId), this.vertx.executeBlocking(promise -> {
                this.adminClientManager.closeAdminClients();
                promise.complete();
            })).mapEmpty();
        });
    }
}
