package org.eclipse.hono.adapter.mqtt;

import io.vertx.core.Vertx;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.eclipse.hono.adapter.mqtt.MqttProtocolAdapterProperties;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.TenantObject;
import org.eclipse.hono.util.TriTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/adapter/mqtt/CommandHandler.class */
public final class CommandHandler<T extends MqttProtocolAdapterProperties> {
    private static final Logger LOG = LoggerFactory.getLogger(CommandHandler.class);
    private final Map<String, TriTuple<CommandSubscription, MessageConsumer, Object>> subscriptions = new ConcurrentHashMap();
    private final Map<Integer, PendingCommandRequest> waitingForAcknowledgement = new ConcurrentHashMap();
    private final Vertx vertx;
    private final T config;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/hono/adapter/mqtt/CommandHandler$PendingCommandRequest.class */
    public static class PendingCommandRequest {
        private final Long timerId;
        private final TenantObject tenantObject;
        private final CommandSubscription subscription;
        private final CommandContext commandContext;

        private PendingCommandRequest(Long l, TenantObject tenantObject, CommandSubscription commandSubscription, CommandContext commandContext) {
            Objects.requireNonNull(l);
            Objects.requireNonNull(tenantObject);
            Objects.requireNonNull(commandSubscription);
            Objects.requireNonNull(commandContext);
            this.timerId = l;
            this.tenantObject = tenantObject;
            this.subscription = commandSubscription;
            this.commandContext = commandContext;
        }

        private static PendingCommandRequest from(Long l, TenantObject tenantObject, CommandSubscription commandSubscription, CommandContext commandContext) {
            Objects.requireNonNull(l);
            Objects.requireNonNull(tenantObject);
            Objects.requireNonNull(commandSubscription);
            Objects.requireNonNull(commandContext);
            return new PendingCommandRequest(l, tenantObject, commandSubscription, commandContext);
        }
    }

    public CommandHandler(Vertx vertx, T t) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.config = (T) Objects.requireNonNull(t);
    }

    public void handlePubAck(Integer num, Function<TenantObject, BiConsumer<CommandSubscription, CommandContext>> function) {
        Objects.requireNonNull(num);
        LOG.trace("Acknowledgement received for command [Msg-id: {}] that has been sent to device.", num);
        Optional.ofNullable(removeFromWaitingForAcknowledgement(num)).ifPresent(pendingCommandRequest -> {
            cancelTimer(pendingCommandRequest.timerId);
            CommandSubscription commandSubscription = pendingCommandRequest.subscription;
            if (function != null) {
                ((BiConsumer) function.apply(pendingCommandRequest.tenantObject)).accept(commandSubscription, pendingCommandRequest.commandContext);
            }
            LOG.debug("Acknowledged [Msg-id: {}] command to device [tenant-id: {}, device-id: {}, MQTT client-id: {}, QoS: {}]", new Object[]{num, commandSubscription.getTenant(), commandSubscription.getDeviceId(), commandSubscription.getClientId(), commandSubscription.getQos()});
            HashMap hashMap = new HashMap(3);
            hashMap.put("event", "Published command has been acknowledged");
            hashMap.put(TracingHelper.TAG_CLIENT_ID.getKey(), commandSubscription.getClientId());
            hashMap.put(TracingHelper.TAG_QOS.getKey(), commandSubscription.getQos().toString());
            pendingCommandRequest.commandContext.getCurrentSpan().log(hashMap);
        });
    }

    public void addToWaitingForAcknowledgement(Integer num, TenantObject tenantObject, CommandSubscription commandSubscription, CommandContext commandContext) {
        Objects.requireNonNull(num);
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(commandSubscription);
        Objects.requireNonNull(commandContext);
        this.waitingForAcknowledgement.put(num, PendingCommandRequest.from(Long.valueOf(startTimer(num)), tenantObject, commandSubscription, commandContext));
    }

    private PendingCommandRequest removeFromWaitingForAcknowledgement(Integer num) {
        return this.waitingForAcknowledgement.remove(num);
    }

    public void addSubscription(CommandSubscription commandSubscription, MessageConsumer messageConsumer) {
        Objects.requireNonNull(commandSubscription);
        Objects.requireNonNull(messageConsumer);
        this.subscriptions.put(commandSubscription.getTopic(), TriTuple.of(commandSubscription, messageConsumer, (Object) null));
    }

    public void removeSubscription(String str, BiConsumer<String, String> biConsumer) {
        Objects.requireNonNull(str);
        Optional.ofNullable(this.subscriptions.remove(str)).ifPresent(triTuple -> {
            CommandSubscription commandSubscription = (CommandSubscription) triTuple.one();
            if (biConsumer != null) {
                biConsumer.accept(commandSubscription.getTenant(), commandSubscription.getDeviceId());
            }
            closeCommandConsumer(commandSubscription, (MessageConsumer) triTuple.two());
        });
    }

    public void removeAllSubscriptions(BiConsumer<String, String> biConsumer) {
        this.subscriptions.keySet().forEach(str -> {
            removeSubscription(str, biConsumer);
        });
    }

    private void closeCommandConsumer(CommandSubscription commandSubscription, MessageConsumer messageConsumer) {
        messageConsumer.close(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.trace("Command consumer closed [tenant-it: {}, device-id :{}]", commandSubscription.getTenant(), commandSubscription.getDeviceId());
            } else {
                LOG.debug("Error closing command consumer [tenant-it: {}, device-id :{}]", new Object[]{commandSubscription.getTenant(), commandSubscription.getDeviceId(), asyncResult.cause()});
            }
        });
    }

    private long startTimer(Integer num) {
        return this.vertx.setTimer(this.config.getCommandAckTimeout(), l -> {
            Optional.ofNullable(removeFromWaitingForAcknowledgement(num)).ifPresent(pendingCommandRequest -> {
                CommandSubscription commandSubscription = pendingCommandRequest.subscription;
                LOG.debug("Timed out waiting for acknowledgment for command sent to device [tenant-id: {}, device-id: {}, MQTT client-id: {}, QoS: {}]", new Object[]{commandSubscription.getTenant(), commandSubscription.getDeviceId(), commandSubscription.getClientId(), commandSubscription.getQos()});
                HashMap hashMap = new HashMap(3);
                hashMap.put("event", "Timed out waiting for acknowledgment for command sent to device");
                hashMap.put(TracingHelper.TAG_CLIENT_ID.getKey(), commandSubscription.getClientId());
                hashMap.put(TracingHelper.TAG_QOS.getKey(), commandSubscription.getQos().toString());
                pendingCommandRequest.commandContext.getCurrentSpan().log(hashMap);
                pendingCommandRequest.commandContext.release(1);
            });
        });
    }

    private void cancelTimer(Long l) {
        this.vertx.cancelTimer(l.longValue());
        LOG.trace("Canceled Timer [timer-id: {}}", l);
    }
}
