package org.eclipse.hono.client.command.amqp;

import com.github.benmanes.caffeine.cache.Cache;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.client.amqp.AbstractRequestResponseServiceClient;
import org.eclipse.hono.client.amqp.RequestResponseClient;
import org.eclipse.hono.client.command.CommandRouterClient;
import org.eclipse.hono.client.impl.CachingClientFactory;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CacheDirective;
import org.eclipse.hono.util.CommandRouterConstants;
import org.eclipse.hono.util.Pair;
import org.eclipse.hono.util.RequestResponseResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/command/amqp/ProtonBasedCommandRouterClient.class */
public class ProtonBasedCommandRouterClient extends AbstractRequestResponseServiceClient<JsonObject, RequestResponseResult<JsonObject>> implements CommandRouterClient {
    static final long SET_LAST_KNOWN_GATEWAY_UPDATE_INTERVAL_MILLIS = 400;
    static final int SET_LAST_KNOWN_GATEWAY_UPDATE_MAX_ENTRIES = 100;
    static final int SET_LAST_KNOWN_GATEWAY_UPDATE_MAX_PARALLEL_REQ = 50;
    private static final Logger LOG = LoggerFactory.getLogger(ProtonBasedCommandRouterClient.class);
    private final LinkedHashMap<Pair<String, String>, String> lastKnownGatewaysWorkQueue;
    private Long lastKnownGatewaysUpdateTimerId;
    private boolean stopped;
    private Clock clock;

    public ProtonBasedCommandRouterClient(HonoConnection honoConnection, SendMessageSampler.Factory factory) {
        super(honoConnection, factory, new CachingClientFactory(honoConnection.getVertx(), (v0) -> {
            return v0.isOpen();
        }), (Cache) null);
        this.lastKnownGatewaysWorkQueue = new LinkedHashMap<>();
        this.stopped = false;
        this.clock = Clock.systemUTC();
        honoConnection.getVertx().eventBus().consumer("tenant.timeout", this::handleTenantTimeout);
    }

    void setClock(Clock clock) {
        this.clock = (Clock) Objects.requireNonNull(clock);
    }

    public Future<Void> stop() {
        this.stopped = true;
        Optional.ofNullable(this.lastKnownGatewaysUpdateTimerId).ifPresent(l -> {
            this.connection.getVertx().cancelTimer(l.longValue());
        });
        return super.stop();
    }

    protected String getKey(String str) {
        return String.format("%s-%s", "cmd_router", str);
    }

    private Future<RequestResponseClient<RequestResponseResult<JsonObject>>> getOrCreateClient(String str) {
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r6 -> {
            return this.connection.executeOnContext(promise -> {
                this.clientFactory.getOrCreateClient(getKey(str), () -> {
                    return RequestResponseClient.forEndpoint(this.connection, "cmd_router", str, this.samplerFactory.create("cmd_router"), this::removeClient, this::removeClient);
                }, promise);
            });
        });
    }

    protected final RequestResponseResult<JsonObject> getResult(int i, String str, Buffer buffer, CacheDirective cacheDirective, ApplicationProperties applicationProperties) {
        if (buffer == null) {
            return new RequestResponseResult<>(i, (Object) null, (CacheDirective) null, applicationProperties);
        }
        try {
            return new RequestResponseResult<>(i, new JsonObject(buffer), CacheDirective.noCacheDirective(), applicationProperties);
        } catch (DecodeException e) {
            LOG.warn("received malformed payload from Command Router service", e);
            return new RequestResponseResult<>(500, (Object) null, (CacheDirective) null, applicationProperties);
        }
    }

    public Future<Void> setLastKnownGatewayForDevice(String str, String str2, String str3, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        this.lastKnownGatewaysWorkQueue.put(Pair.of(str, str2), str3);
        if (this.lastKnownGatewaysUpdateTimerId == null && !this.stopped) {
            this.lastKnownGatewaysUpdateTimerId = Long.valueOf(this.connection.getVertx().setTimer(SET_LAST_KNOWN_GATEWAY_UPDATE_INTERVAL_MILLIS, l -> {
                processLastKnownGatewaysWorkQueue(null, null, null);
            }));
        }
        return Future.succeededFuture();
    }

    private void processLastKnownGatewaysWorkQueue(Instant instant, Set<String> set, Span span) {
        this.log.debug("processLastKnownGatewaysWorkQueue; queue size: {}", Integer.valueOf(this.lastKnownGatewaysWorkQueue.size()));
        Instant instant2 = (Instant) Optional.ofNullable(instant).orElseGet(() -> {
            return Instant.now(this.clock);
        });
        Span span2 = (Span) Optional.ofNullable(span).orElseGet(() -> {
            return newFollowingSpan(null, "set last known gateways");
        });
        if (set == null) {
            span2.log(Map.of("no_of_device_entries_to_set", Integer.valueOf(this.lastKnownGatewaysWorkQueue.size())));
        }
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        HashSet hashSet = new HashSet();
        Iterator<Map.Entry<Pair<String, String>, String>> it = this.lastKnownGatewaysWorkQueue.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Pair<String, String>, String> next = it.next();
            String str = (String) next.getKey().one();
            if (set == null || set.contains(str)) {
                if (linkedHashMap.containsKey(str) || linkedHashMap.size() < SET_LAST_KNOWN_GATEWAY_UPDATE_MAX_PARALLEL_REQ) {
                    linkedHashMap.putIfAbsent(str, new HashMap());
                    Map map = (Map) linkedHashMap.get(str);
                    if (map.size() < SET_LAST_KNOWN_GATEWAY_UPDATE_MAX_ENTRIES) {
                        map.put((String) next.getKey().two(), next.getValue());
                        it.remove();
                    } else {
                        hashSet.add(str);
                    }
                } else {
                    hashSet.add(str);
                }
            }
        }
        ArrayList arrayList = new ArrayList();
        linkedHashMap.forEach((str2, map2) -> {
            arrayList.add(setLastKnownGateways(str2, map2, span2.context()));
        });
        CompositeFuture.join(arrayList).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                TracingHelper.logError(span2, asyncResult.cause());
            }
            if (this.stopped) {
                span2.finish();
                return;
            }
            if (this.lastKnownGatewaysWorkQueue.isEmpty()) {
                span2.finish();
                this.lastKnownGatewaysUpdateTimerId = null;
                return;
            }
            long millis = SET_LAST_KNOWN_GATEWAY_UPDATE_INTERVAL_MILLIS - Duration.between(instant2, Instant.now(this.clock)).toMillis();
            if (millis < 1) {
                if (!hashSet.isEmpty()) {
                    span2.log(String.format("still remaining entries to be set for %d tenants - will be handled in next overall run", Integer.valueOf(hashSet.size())));
                    this.log.info("processLastKnownGatewaysWorkQueue: not all entries could be set during update interval; current queue size: {}", Integer.valueOf(this.lastKnownGatewaysWorkQueue.size()));
                }
                span2.finish();
                processLastKnownGatewaysWorkQueue(null, null, null);
                return;
            }
            if (!hashSet.isEmpty()) {
                span2.log(String.format("starting another round of requests for %d tenants (request size/count limit was reached)", Integer.valueOf(hashSet.size())));
                processLastKnownGatewaysWorkQueue(instant2, hashSet, span2);
            } else {
                this.log.debug("schedule next processLastKnownGatewaysWorkQueue invocation in {}ms", Long.valueOf(millis));
                span2.finish();
                this.lastKnownGatewaysUpdateTimerId = Long.valueOf(this.connection.getVertx().setTimer(millis, l -> {
                    processLastKnownGatewaysWorkQueue(null, null, null);
                }));
            }
        });
    }

    protected Future<Void> setLastKnownGateways(String str, Map<String, String> map, SpanContext spanContext) {
        Span newChildSpan;
        Future compose;
        if (map.size() == 1) {
            Map.Entry<String, String> next = map.entrySet().iterator().next();
            String key = next.getKey();
            String value = next.getValue();
            Map createDeviceIdProperties = createDeviceIdProperties(key);
            createDeviceIdProperties.put("gateway_id", value);
            newChildSpan = newChildSpan(spanContext, "set last known gateway for device");
            TracingHelper.setDeviceTags(newChildSpan, str, key);
            newChildSpan.setTag("gateway_id", value);
            compose = getOrCreateClient(str).compose(requestResponseClient -> {
                return requestResponseClient.createAndSendRequest(CommandRouterConstants.CommandRouterAction.SET_LAST_KNOWN_GATEWAY.getSubject(), createDeviceIdProperties, (Buffer) null, (String) null, this::getRequestResponseResult, newChildSpan);
            });
        } else {
            newChildSpan = newChildSpan(spanContext, "set last known gateways for tenant devices");
            TracingHelper.setDeviceTags(newChildSpan, str, (String) null);
            newChildSpan.log(Map.of("no_of_entries", Integer.valueOf(map.size())));
            JsonObject jsonObject = new JsonObject();
            Objects.requireNonNull(jsonObject);
            map.forEach(jsonObject::put);
            compose = getOrCreateClient(str).compose(requestResponseClient2 -> {
                return requestResponseClient2.createAndSendRequest(CommandRouterConstants.CommandRouterAction.SET_LAST_KNOWN_GATEWAY.getSubject(), (Map) null, jsonObject.toBuffer(), "application/json", this::getRequestResponseResult, newChildSpan);
            });
        }
        return mapResultAndFinishSpan(compose, requestResponseResult -> {
            switch (requestResponseResult.getStatus()) {
                case 204:
                    return null;
                default:
                    throw StatusCodeMapper.from(requestResponseResult);
            }
        }, newChildSpan).onFailure(th -> {
            this.log.debug("failed to set last known gateway(s) for tenant [{}]", str, th);
        }).mapEmpty();
    }

    public Future<Void> registerCommandConsumer(String str, String str2, String str3, Duration duration, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        int seconds = (duration == null || duration.getSeconds() > 2147483647L) ? -1 : (int) duration.getSeconds();
        Map createDeviceIdProperties = createDeviceIdProperties(str2);
        createDeviceIdProperties.put("adapter_instance_id", str3);
        createDeviceIdProperties.put("lifespan", Integer.valueOf(seconds));
        Span newChildSpan = newChildSpan(spanContext, "register command consumer");
        TracingHelper.setDeviceTags(newChildSpan, str, str2);
        newChildSpan.setTag("adapter_instance_id", str3);
        newChildSpan.setTag("lifespan", Integer.valueOf(seconds));
        return mapResultAndFinishSpan(getOrCreateClient(str).compose(requestResponseClient -> {
            return requestResponseClient.createAndSendRequest(CommandRouterConstants.CommandRouterAction.REGISTER_COMMAND_CONSUMER.getSubject(), createDeviceIdProperties, (Buffer) null, (String) null, this::getRequestResponseResult, newChildSpan);
        }), requestResponseResult -> {
            switch (requestResponseResult.getStatus()) {
                case 204:
                    return null;
                default:
                    throw StatusCodeMapper.from(requestResponseResult);
            }
        }, newChildSpan).mapEmpty();
    }

    public Future<Void> unregisterCommandConsumer(String str, String str2, String str3, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Map createDeviceIdProperties = createDeviceIdProperties(str2);
        createDeviceIdProperties.put("adapter_instance_id", str3);
        Span newChildSpan = newChildSpan(spanContext, "unregister command consumer");
        TracingHelper.setDeviceTags(newChildSpan, str, str2);
        newChildSpan.setTag("adapter_instance_id", str3);
        return getOrCreateClient(str).compose(requestResponseClient -> {
            return requestResponseClient.createAndSendRequest(CommandRouterConstants.CommandRouterAction.UNREGISTER_COMMAND_CONSUMER.getSubject(), createDeviceIdProperties, (Buffer) null, (String) null, this::getRequestResponseResult, newChildSpan);
        }).recover(th -> {
            Tags.HTTP_STATUS.set(newChildSpan, Integer.valueOf(ServiceInvocationException.extractStatusCode(th)));
            TracingHelper.logError(newChildSpan, th);
            return Future.failedFuture(th);
        }).map(requestResponseResult -> {
            Tags.HTTP_STATUS.set(newChildSpan, Integer.valueOf(requestResponseResult.getStatus()));
            if (requestResponseResult.isError() && requestResponseResult.getStatus() != 412) {
                Tags.ERROR.set(newChildSpan, Boolean.TRUE);
            }
            switch (requestResponseResult.getStatus()) {
                case 204:
                    return null;
                default:
                    throw StatusCodeMapper.from(requestResponseResult);
            }
        }).onComplete(asyncResult -> {
            newChildSpan.finish();
        }).mapEmpty();
    }

    public Future<Void> enableCommandRouting(List<String> list, SpanContext spanContext) {
        Objects.requireNonNull(list);
        if (list.isEmpty()) {
            return Future.succeededFuture();
        }
        Span newChildSpan = newChildSpan(spanContext, "enable command routing");
        newChildSpan.log(Map.of("no_of_tenants", Integer.valueOf(list.size())));
        JsonArray jsonArray = new JsonArray(list);
        return mapResultAndFinishSpan(getOrCreateClient(list.get(0)).compose(requestResponseClient -> {
            return requestResponseClient.createAndSendRequest(CommandRouterConstants.CommandRouterAction.ENABLE_COMMAND_ROUTING.getSubject(), (Map) null, jsonArray.toBuffer(), "application/json", this::getRequestResponseResult, newChildSpan);
        }), requestResponseResult -> {
            switch (requestResponseResult.getStatus()) {
                case 204:
                    this.log.info("successfully enabled routing of commands for {} tenants in Command Router", Integer.valueOf(list.size()));
                    return null;
                default:
                    ServiceInvocationException from = StatusCodeMapper.from(requestResponseResult);
                    this.log.info("failed to enable routing of commands in Command Router", from);
                    throw from;
            }
        }, newChildSpan).mapEmpty();
    }
}
