package org.eclipse.hono.commandrouter.impl;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.registry.DeviceRegistrationClient;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.client.util.MessagingClientProvider;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.commandrouter.AdapterInstanceStatusService;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.deviceconnection.infinispan.client.DeviceConnectionInfo;
import org.eclipse.hono.service.HealthCheckProvider;
import org.eclipse.hono.service.commandrouter.CommandRouterResult;
import org.eclipse.hono.service.commandrouter.CommandRouterService;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/commandrouter/impl/CommandRouterServiceImpl.class */
public class CommandRouterServiceImpl implements CommandRouterService, HealthCheckProvider, Lifecycle {
    private static final Logger LOG = LoggerFactory.getLogger(CommandRouterServiceImpl.class);
    private final ServiceConfigProperties config;
    private final DeviceRegistrationClient registrationClient;
    private final TenantClient tenantClient;
    private final DeviceConnectionInfo deviceConnectionInfo;
    private final MessagingClientProvider<CommandConsumerFactory> commandConsumerFactoryProvider;
    private final AdapterInstanceStatusService adapterInstanceStatusService;
    private final Tracer tracer;
    private final Deque<Pair<String, Integer>> tenantsToEnable = new LinkedList();
    private final Set<String> reenabledTenants = new HashSet();
    private final Set<String> tenantsInProcess = new HashSet();
    private final AtomicBoolean running = new AtomicBoolean();
    private Context context;

    public CommandRouterServiceImpl(ServiceConfigProperties serviceConfigProperties, DeviceRegistrationClient deviceRegistrationClient, TenantClient tenantClient, DeviceConnectionInfo deviceConnectionInfo, MessagingClientProvider<CommandConsumerFactory> messagingClientProvider, AdapterInstanceStatusService adapterInstanceStatusService, Tracer tracer) {
        this.config = (ServiceConfigProperties) Objects.requireNonNull(serviceConfigProperties);
        this.registrationClient = (DeviceRegistrationClient) Objects.requireNonNull(deviceRegistrationClient);
        this.tenantClient = (TenantClient) Objects.requireNonNull(tenantClient);
        this.deviceConnectionInfo = (DeviceConnectionInfo) Objects.requireNonNull(deviceConnectionInfo);
        this.commandConsumerFactoryProvider = (MessagingClientProvider) Objects.requireNonNull(messagingClientProvider);
        this.adapterInstanceStatusService = (AdapterInstanceStatusService) Objects.requireNonNull(adapterInstanceStatusService);
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
    }

    void setContext(Context context) {
        this.context = (Context) Objects.requireNonNull(context);
    }

    public Future<Void> start() {
        if (this.context == null) {
            this.context = Vertx.currentContext();
            if (this.context == null) {
                return Future.failedFuture(new IllegalStateException("Service must be started in a Vert.x context"));
            }
        }
        if (!this.commandConsumerFactoryProvider.containsImplementations()) {
            return Future.failedFuture("no command consumer factory provider set");
        }
        if (this.running.compareAndSet(false, true)) {
            this.registrationClient.start();
            this.tenantClient.start();
            if (this.deviceConnectionInfo instanceof Lifecycle) {
                this.deviceConnectionInfo.start();
            }
            this.commandConsumerFactoryProvider.start();
            this.adapterInstanceStatusService.start();
        }
        return Future.succeededFuture();
    }

    public Future<Void> stop() {
        LOG.info("stopping command router");
        if (!this.running.compareAndSet(true, false)) {
            return Future.succeededFuture();
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.registrationClient.stop());
        arrayList.add(this.tenantClient.stop());
        if (this.deviceConnectionInfo instanceof Lifecycle) {
            arrayList.add(this.deviceConnectionInfo.stop());
        }
        arrayList.add(this.commandConsumerFactoryProvider.stop());
        arrayList.add(this.adapterInstanceStatusService.stop());
        this.tenantsToEnable.clear();
        return CompositeFuture.join(arrayList).onFailure(th -> {
            LOG.info("error while stopping command router", th);
        }).map(compositeFuture -> {
            LOG.info("successfully stopped command router");
            return (Void) null;
        });
    }

    public Future<CommandRouterResult> setLastKnownGatewayForDevice(String str, String str2, String str3, Span span) {
        return this.deviceConnectionInfo.setLastKnownGatewayForDevice(str, str2, str3, span).map(r2 -> {
            return CommandRouterResult.from(204);
        });
    }

    public Future<CommandRouterResult> setLastKnownGatewayForDevice(String str, Map<String, String> map, Span span) {
        return this.deviceConnectionInfo.setLastKnownGatewayForDevice(str, map, span).map(r2 -> {
            return CommandRouterResult.from(204);
        });
    }

    public Future<CommandRouterResult> registerCommandConsumer(String str, String str2, String str3, Duration duration, Span span) {
        return this.tenantClient.get(str, span.context()).compose(tenantObject -> {
            CommandConsumerFactory commandConsumerFactory = (CommandConsumerFactory) this.commandConsumerFactoryProvider.getClient(tenantObject);
            Future<Void> createCommandConsumer = commandConsumerFactory.createCommandConsumer(str, span.context());
            if (commandConsumerFactory.getMessagingType() != MessagingType.kafka || this.commandConsumerFactoryProvider.getClient(MessagingType.amqp) == null) {
                return createCommandConsumer;
            }
            span.log("also creating secondary, AMQP-based consumer");
            Future<Void> createCommandConsumer2 = ((CommandConsumerFactory) this.commandConsumerFactoryProvider.getClient(MessagingType.amqp)).createCommandConsumer(str, span.context());
            return CompositeFuture.join(createCommandConsumer, createCommandConsumer2).map(compositeFuture -> {
                return (Void) null;
            }).recover(th -> {
                if (createCommandConsumer2.failed()) {
                    span.log("ignoring failure to create secondary, AMQP-based command consumer");
                }
                return createCommandConsumer;
            });
        }).compose(r13 -> {
            return this.deviceConnectionInfo.setCommandHandlingAdapterInstance(str, str2, str3, getSanitizedLifespan(duration), span).recover(th -> {
                LOG.info("error setting command handling adapter instance [tenant: {}, device: {}]", new Object[]{str, str2, th});
                return Future.failedFuture(th);
            });
        }).map(r2 -> {
            return CommandRouterResult.from(204);
        }).otherwise(th -> {
            return CommandRouterResult.from(ServiceInvocationException.extractStatusCode(th));
        });
    }

    private Duration getSanitizedLifespan(Duration duration) {
        return (duration == null || duration.isNegative() || duration.getSeconds() > 9223372036L) ? Duration.ofSeconds(-1L) : duration;
    }

    public Future<CommandRouterResult> unregisterCommandConsumer(String str, String str2, String str3, Span span) {
        return this.deviceConnectionInfo.removeCommandHandlingAdapterInstance(str, str2, str3, span).recover(th -> {
            if (ServiceInvocationException.extractStatusCode(th) != 412) {
                LOG.info("error removing command handling adapter instance [tenant: {}, device: {}]", new Object[]{str, str2, th});
            }
            return Future.failedFuture(th);
        }).map(r2 -> {
            return CommandRouterResult.from(204);
        }).otherwise(th2 -> {
            return CommandRouterResult.from(ServiceInvocationException.extractStatusCode(th2));
        });
    }

    public Future<CommandRouterResult> enableCommandRouting(List<String> list, Span span) {
        if (!this.running.get()) {
            return Future.succeededFuture(CommandRouterResult.from(503));
        }
        Objects.requireNonNull(list);
        boolean isEmpty = this.tenantsToEnable.isEmpty();
        list.stream().filter(str -> {
            return !this.reenabledTenants.contains(str);
        }).filter(str2 -> {
            return !this.tenantsInProcess.contains(str2);
        }).filter(str3 -> {
            return this.tenantsToEnable.stream().allMatch(pair -> {
                return !((String) pair.one()).equals(str3);
            });
        }).forEach(str4 -> {
            this.tenantsToEnable.addLast(Pair.of(str4, 1));
        });
        if (isEmpty) {
            LOG.debug("triggering re-enabling of command routing");
            processTenantQueue(span.context());
        }
        return Future.succeededFuture(CommandRouterResult.from(204));
    }

    private void processTenantQueue(SpanContext spanContext) {
        Pair<String, Integer> pollFirst = this.tenantsToEnable.pollFirst();
        if (pollFirst == null) {
            if (this.tenantsInProcess.isEmpty()) {
                this.reenabledTenants.clear();
                LOG.debug("finished re-enabling of command routing");
                return;
            }
            return;
        }
        this.tenantsInProcess.add((String) pollFirst.one());
        long calculateDelayMillis = calculateDelayMillis(((Integer) pollFirst.two()).intValue());
        if (calculateDelayMillis <= 0) {
            this.context.runOnContext(r7 -> {
                activateCommandRouting(pollFirst, spanContext);
            });
        } else {
            this.context.owner().setTimer(calculateDelayMillis, l -> {
                activateCommandRouting(pollFirst, spanContext);
            });
        }
    }

    private long calculateDelayMillis(int i) {
        if (i == 1) {
            return 0L;
        }
        if (i > 6) {
            return 10000L;
        }
        return (1 << i) * 100;
    }

    private void activateCommandRouting(Pair<String, Integer> pair, SpanContext spanContext) {
        if (!this.running.get()) {
            this.tenantsInProcess.remove(pair.one());
            return;
        }
        Span start = this.tracer.buildSpan("re-enable command routing for tenant").addReference("follows_from", spanContext).withTag(TracingHelper.TAG_TENANT_ID, (String) pair.one()).start();
        HashMap hashMap = new HashMap(2);
        hashMap.put("attempt#", pair.two());
        this.tenantClient.get((String) pair.one(), start.context()).map(tenantObject -> {
            return (CommandConsumerFactory) this.commandConsumerFactoryProvider.getClient(tenantObject);
        }).map(commandConsumerFactory -> {
            return commandConsumerFactory.createCommandConsumer((String) pair.one(), start.context());
        }).onSuccess(future -> {
            hashMap.put("message", "successfully created command consumer");
            start.log(hashMap);
            this.reenabledTenants.add((String) pair.one());
        }).onFailure(th -> {
            hashMap.put("message", "failed to create command consumer");
            hashMap.put("error.object", th);
            TracingHelper.logError(start, hashMap);
            if (th instanceof ServerErrorException) {
                LOG.info("failed to create command consumer [attempt#: {}]", pair.two(), th);
                start.log("marking tenant for later re-try to create command consumer");
                this.tenantsToEnable.addLast(Pair.of((String) pair.one(), Integer.valueOf(((Integer) pair.two()).intValue() + 1)));
            }
        }).onComplete(asyncResult -> {
            start.finish();
            this.tenantsInProcess.remove(pair.one());
            processTenantQueue(spanContext);
        });
    }

    public void registerReadinessChecks(HealthCheckHandler healthCheckHandler) {
        if (this.registrationClient instanceof ServiceClient) {
            this.registrationClient.registerReadinessChecks(healthCheckHandler);
        }
        if (this.tenantClient instanceof ServiceClient) {
            this.tenantClient.registerReadinessChecks(healthCheckHandler);
        }
        if (this.deviceConnectionInfo instanceof ServiceClient) {
            this.deviceConnectionInfo.registerReadinessChecks(healthCheckHandler);
        }
        this.commandConsumerFactoryProvider.registerReadinessChecks(healthCheckHandler);
    }

    public void registerLivenessChecks(HealthCheckHandler healthCheckHandler) {
        registerEventLoopBlockedCheck(healthCheckHandler);
        if (this.registrationClient instanceof ServiceClient) {
            this.registrationClient.registerLivenessChecks(healthCheckHandler);
        }
        if (this.tenantClient instanceof ServiceClient) {
            this.tenantClient.registerLivenessChecks(healthCheckHandler);
        }
        if (this.deviceConnectionInfo instanceof ServiceClient) {
            this.deviceConnectionInfo.registerLivenessChecks(healthCheckHandler);
        }
        this.commandConsumerFactoryProvider.registerLivenessChecks(healthCheckHandler);
    }

    protected void registerEventLoopBlockedCheck(HealthCheckHandler healthCheckHandler) {
        healthCheckHandler.register("event-loop-blocked-check", this.config.getEventLoopBlockedCheckTimeout(), promise -> {
            if (Vertx.currentContext() != this.context) {
                this.context.runOnContext(r4 -> {
                    promise.tryComplete(Status.OK());
                });
            } else {
                LOG.debug("Command router - HealthCheck Server context match. Assume protocol adapter is alive.");
                promise.tryComplete(Status.OK());
            }
        });
    }
}
