package org.eclipse.hono.commandrouter.impl;

import io.opentracing.Span;
import io.opentracing.Tracer;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.eclipse.hono.adapter.client.registry.DeviceRegistrationClient;
import org.eclipse.hono.adapter.client.registry.TenantClient;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.util.MessagingClient;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandRouterServiceConfigProperties;
import org.eclipse.hono.deviceconnection.infinispan.client.DeviceConnectionInfo;
import org.eclipse.hono.service.HealthCheckProvider;
import org.eclipse.hono.service.commandrouter.CommandRouterResult;
import org.eclipse.hono.service.commandrouter.CommandRouterService;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/commandrouter/impl/CommandRouterServiceImpl.class */
public class CommandRouterServiceImpl implements CommandRouterService, HealthCheckProvider, Lifecycle {
    private static final Logger LOG = LoggerFactory.getLogger(CommandRouterServiceImpl.class);
    private final CommandRouterServiceConfigProperties config;
    private final DeviceRegistrationClient registrationClient;
    private final TenantClient tenantClient;
    private final DeviceConnectionInfo deviceConnectionInfo;
    private final MessagingClient<CommandConsumerFactory> commandConsumerFactories;
    private final Tracer tracer;
    private final Deque<String> tenantsToEnable = new LinkedList();
    private Context context;

    public CommandRouterServiceImpl(CommandRouterServiceConfigProperties commandRouterServiceConfigProperties, DeviceRegistrationClient deviceRegistrationClient, TenantClient tenantClient, DeviceConnectionInfo deviceConnectionInfo, MessagingClient<CommandConsumerFactory> messagingClient, Tracer tracer) {
        this.config = (CommandRouterServiceConfigProperties) Objects.requireNonNull(commandRouterServiceConfigProperties);
        this.registrationClient = (DeviceRegistrationClient) Objects.requireNonNull(deviceRegistrationClient);
        this.tenantClient = (TenantClient) Objects.requireNonNull(tenantClient);
        this.deviceConnectionInfo = (DeviceConnectionInfo) Objects.requireNonNull(deviceConnectionInfo);
        this.commandConsumerFactories = (MessagingClient) Objects.requireNonNull(messagingClient);
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
    }

    void setContext(Context context) {
        this.context = (Context) Objects.requireNonNull(context);
    }

    public Future<Void> start() {
        if (this.context == null) {
            this.context = Vertx.currentContext();
            if (this.context == null) {
                return Future.failedFuture(new IllegalStateException("Service must be started in a Vert.x context"));
            }
        }
        if (!this.commandConsumerFactories.containsImplementations()) {
            return Future.failedFuture("no command consumer factories set");
        }
        this.registrationClient.start();
        this.tenantClient.start();
        if (this.deviceConnectionInfo instanceof Lifecycle) {
            this.deviceConnectionInfo.start();
        }
        this.commandConsumerFactories.start();
        return Future.succeededFuture();
    }

    public Future<Void> stop() {
        LOG.info("stopping command router");
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.registrationClient.stop());
        arrayList.add(this.tenantClient.stop());
        if (this.deviceConnectionInfo instanceof Lifecycle) {
            arrayList.add(this.deviceConnectionInfo.stop());
        }
        arrayList.add(this.commandConsumerFactories.stop());
        return CompositeFuture.all(arrayList).recover(th -> {
            LOG.info("error while stopping command router", th);
            return Future.failedFuture(th);
        }).map(compositeFuture -> {
            LOG.info("successfully stopped command router");
            return (Void) null;
        });
    }

    public Future<CommandRouterResult> setLastKnownGatewayForDevice(String str, String str2, String str3, Span span) {
        return this.deviceConnectionInfo.setLastKnownGatewayForDevice(str, str2, str3, span).map(r2 -> {
            return CommandRouterResult.from(204);
        });
    }

    public Future<CommandRouterResult> registerCommandConsumer(String str, String str2, String str3, Duration duration, Span span) {
        return this.tenantClient.get(str, span.context()).compose(tenantObject -> {
            return ((CommandConsumerFactory) this.commandConsumerFactories.getClient(tenantObject)).createCommandConsumer(str, span.context());
        }).compose(r13 -> {
            return this.deviceConnectionInfo.setCommandHandlingAdapterInstance(str, str2, str3, getSanitizedLifespan(duration), span).recover(th -> {
                LOG.info("error setting command handling adapter instance [tenant: {}, device: {}]", new Object[]{str, str2, th});
                return Future.failedFuture(th);
            });
        }).map(r2 -> {
            return CommandRouterResult.from(204);
        }).otherwise(th -> {
            return CommandRouterResult.from(ServiceInvocationException.extractStatusCode(th));
        });
    }

    private Duration getSanitizedLifespan(Duration duration) {
        return (duration == null || duration.isNegative() || duration.getSeconds() > 9223372036L) ? Duration.ofSeconds(-1L) : duration;
    }

    public Future<CommandRouterResult> unregisterCommandConsumer(String str, String str2, String str3, Span span) {
        return this.deviceConnectionInfo.removeCommandHandlingAdapterInstance(str, str2, str3, span).recover(th -> {
            if (ServiceInvocationException.extractStatusCode(th) != 412) {
                LOG.info("error removing command handling adapter instance [tenant: {}, device: {}]", new Object[]{str, str2, th});
            }
            return Future.failedFuture(th);
        }).map(r2 -> {
            return CommandRouterResult.from(204);
        }).otherwise(th2 -> {
            return CommandRouterResult.from(ServiceInvocationException.extractStatusCode(th2));
        });
    }

    public Future<CommandRouterResult> enableCommandRouting(List<String> list, Span span) {
        Objects.requireNonNull(list);
        boolean isEmpty = this.tenantsToEnable.isEmpty();
        this.tenantsToEnable.addAll(list);
        if (isEmpty) {
            processTenantQueue(new ConcurrentHashSet(), this.tracer.buildSpan("re-enable command routing for tenants").addReference("follows_from", span.context()).start());
        }
        return Future.succeededFuture(CommandRouterResult.from(204));
    }

    private void processTenantQueue(Set<String> set, Span span) {
        String poll = this.tenantsToEnable.poll();
        if (poll == null) {
            span.finish();
        } else {
            this.context.runOnContext(r10 -> {
                if (set.contains(poll)) {
                    span.log(Map.of("message", "skipping tenant, already processed ...", TracingHelper.TAG_TENANT_ID.getKey(), poll));
                } else {
                    Span start = this.tracer.buildSpan("re-enable command routing for tenant").addReference("child_of", span.context()).withTag(TracingHelper.TAG_TENANT_ID, poll).start();
                    this.tenantClient.get(poll, start.context()).map(tenantObject -> {
                        return (CommandConsumerFactory) this.commandConsumerFactories.getClient(tenantObject);
                    }).map(commandConsumerFactory -> {
                        return commandConsumerFactory.createCommandConsumer(poll, start.context());
                    }).onSuccess(future -> {
                        start.log("successfully created command consumer");
                        set.add(poll);
                    }).onFailure(th -> {
                        TracingHelper.logError(start, "failed to create command consumer", th);
                        if (th instanceof ServerErrorException) {
                            start.log("marking tenant for later re-try to create command consumer");
                            this.tenantsToEnable.add(poll);
                        }
                    }).onComplete(asyncResult -> {
                        start.finish();
                    });
                }
                processTenantQueue(set, span);
            });
        }
    }

    public void registerReadinessChecks(HealthCheckHandler healthCheckHandler) {
        if (this.registrationClient instanceof ServiceClient) {
            this.registrationClient.registerReadinessChecks(healthCheckHandler);
        }
        if (this.tenantClient instanceof ServiceClient) {
            this.tenantClient.registerReadinessChecks(healthCheckHandler);
        }
        if (this.deviceConnectionInfo instanceof ServiceClient) {
            this.deviceConnectionInfo.registerReadinessChecks(healthCheckHandler);
        }
        this.commandConsumerFactories.registerReadinessChecks(healthCheckHandler);
    }

    public void registerLivenessChecks(HealthCheckHandler healthCheckHandler) {
        registerEventLoopBlockedCheck(healthCheckHandler);
        if (this.registrationClient instanceof ServiceClient) {
            this.registrationClient.registerLivenessChecks(healthCheckHandler);
        }
        if (this.tenantClient instanceof ServiceClient) {
            this.tenantClient.registerLivenessChecks(healthCheckHandler);
        }
        if (this.deviceConnectionInfo instanceof ServiceClient) {
            this.deviceConnectionInfo.registerLivenessChecks(healthCheckHandler);
        }
        this.commandConsumerFactories.registerLivenessChecks(healthCheckHandler);
    }

    protected void registerEventLoopBlockedCheck(HealthCheckHandler healthCheckHandler) {
        healthCheckHandler.register("event-loop-blocked-check", this.config.getEventLoopBlockedCheckTimeout(), promise -> {
            if (Vertx.currentContext() != this.context) {
                this.context.runOnContext(r4 -> {
                    promise.tryComplete(Status.OK());
                });
            } else {
                LOG.debug("Command router - HealthCheck Server context match. Assume protocol adapter is alive.");
                promise.tryComplete(Status.OK());
            }
        });
    }
}
