package org.eclipse.hono.commandrouter.impl.kafka;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.common.TopicPartition;
import org.eclipse.hono.adapter.client.command.kafka.KafkaBasedCommandContext;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/commandrouter/impl/kafka/KafkaCommandProcessingQueue.class */
public class KafkaCommandProcessingQueue {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCommandProcessingQueue.class);
    private final Map<TopicPartition, TopicPartitionCommandQueue> commandQueues = new HashMap();
    private final Context vertxContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/eclipse/hono/commandrouter/impl/kafka/KafkaCommandProcessingQueue$TopicPartitionCommandQueue.class */
    public class TopicPartitionCommandQueue {
        private static final String KEY_COMMAND_SEND_ACTION_SUPPLIER_AND_RESULT_PROMISE = "commandSendActionSupplierAndResultPromise";
        private final Deque<KafkaBasedCommandContext> queue = new LinkedList();

        TopicPartitionCommandQueue() {
        }

        public void add(KafkaBasedCommandContext kafkaBasedCommandContext) {
            Objects.requireNonNull(kafkaBasedCommandContext);
            this.queue.add(kafkaBasedCommandContext);
        }

        public void remove(KafkaBasedCommandContext kafkaBasedCommandContext) {
            Objects.requireNonNull(kafkaBasedCommandContext);
            if (this.queue.remove(kafkaBasedCommandContext)) {
                sendNextCommandInQueueIfPossible();
            }
        }

        public boolean isEmpty() {
            return this.queue.isEmpty();
        }

        public void markAsUnusedAndClear() {
            this.queue.forEach(kafkaBasedCommandContext -> {
                Pair<Supplier<Future<Void>>, Promise<Void>> sendActionSupplierAndResultPromise = getSendActionSupplierAndResultPromise(kafkaBasedCommandContext);
                if (sendActionSupplierAndResultPromise != null) {
                    KafkaCommandProcessingQueue.LOG.info("command won't be sent - its partition isn't being handled anymore [{}]", kafkaBasedCommandContext.getCommand());
                    TracingHelper.logError(kafkaBasedCommandContext.getTracingSpan(), "command won't be sent - its partition isn't being handled anymore");
                    kafkaBasedCommandContext.release();
                    ((Promise) sendActionSupplierAndResultPromise.two()).fail(new ServerErrorException(503));
                }
            });
            this.queue.clear();
        }

        public Future<Void> applySendCommandAction(KafkaBasedCommandContext kafkaBasedCommandContext, Supplier<Future<Void>> supplier) {
            Objects.requireNonNull(kafkaBasedCommandContext);
            Objects.requireNonNull(supplier);
            Promise<Void> promise = Promise.promise();
            if (kafkaBasedCommandContext.equals(this.queue.peek())) {
                sendGivenCommandAndNextInQueueIfPossible(this.queue.remove(), supplier, promise, true);
            } else if (this.queue.contains(kafkaBasedCommandContext)) {
                KafkaCommandProcessingQueue.LOG.debug("sending of command with offset {} gets delayed; waiting for processing of offset {} [delayed {}]", new Object[]{Long.valueOf(getRecordOffset(kafkaBasedCommandContext)), Long.valueOf(getRecordOffset(this.queue.peek())), kafkaBasedCommandContext.getCommand()});
                kafkaBasedCommandContext.getTracingSpan().log(String.format("waiting for an earlier command with offset %d to be processed first", Long.valueOf(getRecordOffset(this.queue.peek()))));
                kafkaBasedCommandContext.put(KEY_COMMAND_SEND_ACTION_SUPPLIER_AND_RESULT_PROMISE, Pair.of(supplier, promise));
            } else {
                KafkaCommandProcessingQueue.LOG.info("command won't be sent - not in queue [{}]", kafkaBasedCommandContext.getCommand());
                TracingHelper.logError(kafkaBasedCommandContext.getTracingSpan(), "command won't be sent - not in queue");
                kafkaBasedCommandContext.release();
                promise.fail(new ServerErrorException(503));
            }
            return promise.future();
        }

        private void sendGivenCommandAndNextInQueueIfPossible(KafkaBasedCommandContext kafkaBasedCommandContext, Supplier<Future<Void>> supplier, Promise<Void> promise, boolean z) {
            KafkaCommandProcessingQueue.LOG.trace("sending [{}]", kafkaBasedCommandContext.getCommand());
            Future<Void> future = supplier.get();
            future.onComplete(promise);
            if (this.queue.isEmpty()) {
                return;
            }
            if (future.isComplete() && z) {
                KafkaCommandProcessingQueue.this.vertxContext.runOnContext(r3 -> {
                    sendNextCommandInQueueIfPossible();
                });
            } else {
                sendNextCommandInQueueIfPossible();
            }
        }

        private void sendNextCommandInQueueIfPossible() {
            Optional.ofNullable(this.queue.peek()).map(this::getSendActionSupplierAndResultPromise).ifPresent(pair -> {
                sendGivenCommandAndNextInQueueIfPossible(this.queue.remove(), (Supplier) pair.one(), (Promise) pair.two(), false);
            });
        }

        private Pair<Supplier<Future<Void>>, Promise<Void>> getSendActionSupplierAndResultPromise(KafkaBasedCommandContext kafkaBasedCommandContext) {
            return (Pair) kafkaBasedCommandContext.get(KEY_COMMAND_SEND_ACTION_SUPPLIER_AND_RESULT_PROMISE);
        }

        private long getRecordOffset(KafkaBasedCommandContext kafkaBasedCommandContext) {
            if (kafkaBasedCommandContext == null) {
                return -1L;
            }
            return kafkaBasedCommandContext.getCommand().getRecord().offset();
        }
    }

    public KafkaCommandProcessingQueue(Context context) {
        this.vertxContext = (Context) Objects.requireNonNull(context);
    }

    public void add(KafkaBasedCommandContext kafkaBasedCommandContext) {
        Objects.requireNonNull(kafkaBasedCommandContext);
        KafkaConsumerRecord record = kafkaBasedCommandContext.getCommand().getRecord();
        this.commandQueues.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), topicPartition -> {
            return new TopicPartitionCommandQueue();
        }).add(kafkaBasedCommandContext);
    }

    public void remove(KafkaBasedCommandContext kafkaBasedCommandContext) {
        Objects.requireNonNull(kafkaBasedCommandContext);
        KafkaConsumerRecord record = kafkaBasedCommandContext.getCommand().getRecord();
        Optional.ofNullable(this.commandQueues.get(new TopicPartition(record.topic(), record.partition()))).ifPresent(topicPartitionCommandQueue -> {
            topicPartitionCommandQueue.remove(kafkaBasedCommandContext);
        });
    }

    public Future<Void> applySendCommandAction(KafkaBasedCommandContext kafkaBasedCommandContext, Supplier<Future<Void>> supplier) {
        TopicPartition topicPartition = new TopicPartition(kafkaBasedCommandContext.getCommand().getRecord().topic(), kafkaBasedCommandContext.getCommand().getRecord().partition());
        TopicPartitionCommandQueue topicPartitionCommandQueue = this.commandQueues.get(topicPartition);
        if (topicPartitionCommandQueue != null) {
            return topicPartitionCommandQueue.applySendCommandAction(kafkaBasedCommandContext, supplier);
        }
        LOG.info("command won't be sent - commands received from partition [{}] aren't handled by this consumer anymore [{}]", topicPartition, kafkaBasedCommandContext.getCommand());
        TracingHelper.logError(kafkaBasedCommandContext.getTracingSpan(), String.format("command won't be sent - commands received from partition [%s] aren't handled by this consumer anymore", topicPartition));
        kafkaBasedCommandContext.release();
        return Future.failedFuture(new ServerErrorException(503));
    }

    public void setCurrentlyHandledPartitions(Collection<TopicPartition> collection) {
        Objects.requireNonNull(collection);
        Iterator<Map.Entry<TopicPartition, TopicPartitionCommandQueue>> it = this.commandQueues.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TopicPartition, TopicPartitionCommandQueue> next = it.next();
            if (!collection.contains(next.getKey())) {
                if (next.getValue().isEmpty()) {
                    LOG.debug("partition {} not being handled anymore; command queue is empty", next.getKey());
                } else {
                    LOG.info("partition {} not being handled anymore but its command queue isn't empty!", next.getKey());
                    next.getValue().markAsUnusedAndClear();
                }
                it.remove();
            }
        }
    }
}
