package org.eclipse.hono.adapter.mqtt;

import io.opentracing.SpanContext;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.eclipse.hono.adapter.mqtt.MqttProtocolAdapterProperties;
import org.eclipse.hono.client.ProtocolAdapterCommandConsumer;
import org.eclipse.hono.util.TriTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/adapter/mqtt/CommandSubscriptionsManager.class */
public final class CommandSubscriptionsManager<T extends MqttProtocolAdapterProperties> {
    private static final Logger LOG = LoggerFactory.getLogger(CommandSubscriptionsManager.class);
    private final Map<String, TriTuple<CommandSubscription, ProtocolAdapterCommandConsumer, Object>> subscriptions = new ConcurrentHashMap();
    private final Map<Integer, PendingCommandRequest> waitingForAcknowledgement = new ConcurrentHashMap();
    private final Vertx vertx;
    private final T config;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/hono/adapter/mqtt/CommandSubscriptionsManager$PendingCommandRequest.class */
    public static class PendingCommandRequest {
        private final Long timerId;
        private final Handler<Integer> onAckHandler;
        private final Handler<Void> onAckTimeoutHandler;

        private PendingCommandRequest(Long l, Handler<Integer> handler, Handler<Void> handler2) {
            this.timerId = l;
            this.onAckHandler = (Handler) Objects.requireNonNull(handler);
            this.onAckTimeoutHandler = (Handler) Objects.requireNonNull(handler2);
        }

        private static PendingCommandRequest from(Long l, Handler<Integer> handler, Handler<Void> handler2) {
            return new PendingCommandRequest(l, handler, handler2);
        }
    }

    public CommandSubscriptionsManager(Vertx vertx, T t) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.config = (T) Objects.requireNonNull(t);
    }

    public void handlePubAck(Integer num) {
        Objects.requireNonNull(num);
        LOG.trace("Acknowledgement received for command [Msg-id: {}] that has been sent to device", num);
        Optional.ofNullable(removeFromWaitingForAcknowledgement(num)).ifPresent(pendingCommandRequest -> {
            if (pendingCommandRequest.timerId != null) {
                cancelTimer(pendingCommandRequest.timerId);
            }
            pendingCommandRequest.onAckHandler.handle(num);
        });
    }

    public void addToWaitingForAcknowledgement(Integer num, Handler<Integer> handler, Handler<Void> handler2) {
        Objects.requireNonNull(num);
        Objects.requireNonNull(handler);
        Objects.requireNonNull(handler2);
        this.waitingForAcknowledgement.put(num, PendingCommandRequest.from(startTimer(num), handler, handler2));
    }

    private PendingCommandRequest removeFromWaitingForAcknowledgement(Integer num) {
        return this.waitingForAcknowledgement.remove(num);
    }

    public void addSubscription(CommandSubscription commandSubscription, ProtocolAdapterCommandConsumer protocolAdapterCommandConsumer) {
        Objects.requireNonNull(commandSubscription);
        Objects.requireNonNull(protocolAdapterCommandConsumer);
        this.subscriptions.put(commandSubscription.getTopic(), TriTuple.of(commandSubscription, protocolAdapterCommandConsumer, (Object) null));
    }

    public Future<Void> removeSubscription(String str, BiFunction<String, String, Future<Void>> biFunction, SpanContext spanContext) {
        Objects.requireNonNull(str);
        TriTuple<CommandSubscription, ProtocolAdapterCommandConsumer, Object> remove = this.subscriptions.remove(str);
        if (remove != null) {
            CommandSubscription commandSubscription = (CommandSubscription) remove.one();
            return CompositeFuture.join(biFunction != null ? biFunction.apply(commandSubscription.getTenant(), commandSubscription.getDeviceId()) : Future.succeededFuture(), closeCommandConsumer(commandSubscription, (ProtocolAdapterCommandConsumer) remove.two(), spanContext)).mapEmpty();
        }
        LOG.debug("Cannot remove subscription; none registered for topic [{}]", str);
        return Future.failedFuture(String.format("Cannot remove subscription; none registered for topic [%s]", str));
    }

    public CompositeFuture removeAllSubscriptions(BiFunction<String, String, Future<Void>> biFunction, SpanContext spanContext) {
        return CompositeFuture.join((List) this.subscriptions.keySet().stream().map(str -> {
            return removeSubscription(str, biFunction, spanContext);
        }).collect(Collectors.toList()));
    }

    private Future<Void> closeCommandConsumer(CommandSubscription commandSubscription, ProtocolAdapterCommandConsumer protocolAdapterCommandConsumer, SpanContext spanContext) {
        return protocolAdapterCommandConsumer.close(spanContext).map(r6 -> {
            LOG.trace("Command consumer closed [tenant-it: {}, device-id :{}]", commandSubscription.getTenant(), commandSubscription.getDeviceId());
            return r6;
        }).recover(th -> {
            LOG.debug("Error closing command consumer [tenant-it: {}, device-id :{}]", new Object[]{commandSubscription.getTenant(), commandSubscription.getDeviceId(), th});
            return Future.failedFuture(th);
        });
    }

    private Long startTimer(Integer num) {
        if (this.config.getEffectiveSendMessageToDeviceTimeout() < 1) {
            return null;
        }
        return Long.valueOf(this.vertx.setTimer(this.config.getEffectiveSendMessageToDeviceTimeout(), l -> {
            Optional.ofNullable(removeFromWaitingForAcknowledgement(num)).ifPresent(pendingCommandRequest -> {
                pendingCommandRequest.onAckTimeoutHandler.handle((Object) null);
            });
        }));
    }

    private void cancelTimer(Long l) {
        this.vertx.cancelTimer(l.longValue());
        LOG.trace("Canceled Timer [timer-id: {}}", l);
    }
}
