package com.github.fridujo.rabbitmq.mock;

import com.github.fridujo.rabbitmq.mock.AmqArguments;
import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
import com.github.fridujo.rabbitmq.mock.tool.Exceptions;
import com.github.fridujo.rabbitmq.mock.tool.NamedThreadFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/fridujo/rabbitmq/mock/MockQueue.class */
public class MockQueue implements Receiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(MockQueue.class);
    private static final long SLEEPING_TIME_BETWEEN_SUBMISSIONS_TO_CONSUMERS = 30;
    private final String name;
    private final ReceiverPointer pointer;
    private final AmqArguments arguments;
    private final ReceiverRegistry receiverRegistry;
    private final MockChannel mockChannel;
    private final Queue<Message> messages;
    private final ExecutorService executorService;
    private final Map<String, ConsumerAndTag> consumersByTag = new LinkedHashMap();
    private final AtomicInteger consumerRollingSequence = new AtomicInteger();
    private final AtomicInteger messageSequence = new AtomicInteger();
    private final Map<Long, Message> unackedMessagesByDeliveryTag = new LinkedHashMap();
    private final AtomicBoolean running = new AtomicBoolean(true);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/fridujo/rabbitmq/mock/MockQueue$ConsumerAndTag.class */
    public static class ConsumerAndTag {
        private final String tag;
        private final Consumer consumer;
        private final boolean autoAck;
        private final Supplier<Long> deliveryTagSupplier;

        ConsumerAndTag(String str, Consumer consumer, boolean z, Supplier<Long> supplier) {
            this.tag = str;
            this.consumer = consumer;
            this.autoAck = z;
            this.deliveryTagSupplier = supplier;
        }
    }

    public MockQueue(String str, AmqArguments amqArguments, ReceiverRegistry receiverRegistry, MockChannel mockChannel) {
        this.name = str;
        this.pointer = new ReceiverPointer(ReceiverPointer.Type.QUEUE, str);
        this.arguments = amqArguments;
        this.receiverRegistry = receiverRegistry;
        this.mockChannel = mockChannel;
        this.messages = new PriorityBlockingQueue(11, new MessageComparator(amqArguments));
        this.executorService = Executors.newFixedThreadPool(1, new NamedThreadFactory(num -> {
            return str + "_queue_consuming";
        }));
        start();
    }

    private void start() {
        this.executorService.submit(() -> {
            while (this.running.get()) {
                do {
                } while (deliverToConsumerIfPossible());
                Exceptions.runAndTransformExceptions(() -> {
                    TimeUnit.MILLISECONDS.sleep(SLEEPING_TIME_BETWEEN_SUBMISSIONS_TO_CONSUMERS);
                }, exc -> {
                    return new RuntimeException("Queue " + this.name + " consumer Thread have been interrupted", exc);
                });
            }
        });
    }

    private boolean deliverToConsumerIfPossible() {
        if (!this.running.get()) {
            LOGGER.debug(localized("shutting down"));
            return false;
        }
        boolean deadLetterFirstMessageIfExpired = deadLetterFirstMessageIfExpired();
        if (this.consumersByTag.size() > 0) {
            Message poll = this.messages.poll();
            if (poll == null) {
                LOGGER.trace(localized("no consumer to deliver message to"));
            } else if (poll.isExpired()) {
                deadLetter(poll);
            } else {
                ConsumerAndTag consumerAndTag = (ConsumerAndTag) new ArrayList(this.consumersByTag.values()).get(this.consumerRollingSequence.incrementAndGet() % this.consumersByTag.size());
                long longValue = ((Long) consumerAndTag.deliveryTagSupplier.get()).longValue();
                this.unackedMessagesByDeliveryTag.put(Long.valueOf(longValue), poll);
                Envelope envelope = new Envelope(longValue, false, poll.exchangeName, poll.routingKey);
                try {
                    LOGGER.debug(localized("delivering message to consumer"));
                    consumerAndTag.consumer.handleDelivery(consumerAndTag.tag, envelope, poll.props, poll.body);
                    this.mockChannel.getMetricsCollector().consumedMessage(this.mockChannel, longValue, consumerAndTag.tag);
                    if (consumerAndTag.autoAck) {
                        this.unackedMessagesByDeliveryTag.remove(Long.valueOf(longValue));
                    }
                    deadLetterFirstMessageIfExpired = true;
                } catch (IOException e) {
                    LOGGER.warn(localized("Unable to deliver message to consumer [" + consumerAndTag.tag + "]"));
                    basicReject(longValue, true);
                }
            }
        }
        return deadLetterFirstMessageIfExpired;
    }

    private boolean deadLetterFirstMessageIfExpired() {
        Message poll;
        Message peek = this.messages.peek();
        if (peek == null || !peek.isExpired() || peek != (poll = this.messages.poll())) {
            return false;
        }
        deadLetter(poll);
        return true;
    }

    @Override // com.github.fridujo.rabbitmq.mock.Receiver
    public void publish(String str, String str2, AMQP.BasicProperties basicProperties, byte[] bArr) {
        boolean z = queueLengthLimitReached() || queueLengthBytesLimitReached();
        if (z && this.arguments.overflow() == AmqArguments.Overflow.REJECT_PUBLISH) {
            return;
        }
        Message message = new Message(this.messageSequence.incrementAndGet(), str, str2, basicProperties, bArr, computeExpiryTime(basicProperties));
        if (message.expiryTime != -1) {
            LOGGER.debug(localized("Message published expiring at " + Instant.ofEpochMilli(message.expiryTime)) + ": " + message);
        } else {
            LOGGER.debug(localized("Message published: " + message));
        }
        this.messages.offer(message);
        if (z) {
            deadLetter(this.messages.poll());
        }
    }

    @Override // com.github.fridujo.rabbitmq.mock.Receiver
    public ReceiverPointer pointer() {
        return this.pointer;
    }

    public void basicConsume(String str, Consumer consumer, boolean z, Supplier<Long> supplier) {
        LOGGER.debug(localized("registering consumer"));
        this.consumersByTag.put(str, new ConsumerAndTag(str, consumer, z, supplier));
        consumer.handleConsumeOk(str);
    }

    public GetResponse basicGet(boolean z, Supplier<Long> supplier) {
        long longValue = supplier.get().longValue();
        Message poll = this.messages.poll();
        if (poll == null) {
            LOGGER.debug(localized("basic_get no message available"));
            return null;
        }
        if (poll.isExpired()) {
            deadLetter(poll);
            return null;
        }
        if (!z) {
            this.unackedMessagesByDeliveryTag.put(Long.valueOf(longValue), poll);
        }
        Envelope envelope = new Envelope(longValue, false, poll.exchangeName, poll.routingKey);
        LOGGER.debug(localized("basic_get a message"));
        return new GetResponse(envelope, poll.props, poll.body, this.messages.size());
    }

    public void basicAck(long j, boolean z) {
        if (!z) {
            this.unackedMessagesByDeliveryTag.remove(Long.valueOf(j));
            return;
        }
        Map<Long, Message> map = this.unackedMessagesByDeliveryTag;
        map.getClass();
        doWithUnackedUntil(j, (v1) -> {
            r2.remove(v1);
        });
    }

    public void basicNack(long j, boolean z, boolean z2) {
        if (z) {
            doWithUnackedUntil(j, l -> {
                basicReject(l.longValue(), z2);
            });
        } else {
            basicReject(j, z2);
        }
    }

    public void basicReject(long j, boolean z) {
        Message remove = this.unackedMessagesByDeliveryTag.remove(Long.valueOf(j));
        if (remove != null) {
            if (z) {
                this.messages.offer(remove);
            } else {
                deadLetter(remove);
            }
        }
    }

    private void deadLetter(Message message) {
        Optional<ReceiverPointer> deadLetterExchange = this.arguments.getDeadLetterExchange();
        ReceiverRegistry receiverRegistry = this.receiverRegistry;
        receiverRegistry.getClass();
        deadLetterExchange.flatMap(receiverRegistry::getReceiver).ifPresent(receiver -> {
            LOGGER.debug(localized("dead-lettered to " + receiver + ": " + message));
            receiver.publish(message.exchangeName, this.arguments.getDeadLetterRoutingKey().orElse(message.routingKey), message.props, message.body);
        });
    }

    private String localized(String str) {
        return "[Q " + this.name + "] " + str;
    }

    public void basicCancel(String str) {
        if (this.consumersByTag.containsKey(str)) {
            this.consumersByTag.remove(str).consumer.handleCancelOk(str);
        }
    }

    public void notifyDeleted() {
        this.running.set(false);
        for (ConsumerAndTag consumerAndTag : this.consumersByTag.values()) {
            try {
                consumerAndTag.consumer.handleCancel(consumerAndTag.tag);
            } catch (IOException e) {
                LOGGER.warn("Consumer threw an exception when notified about cancellation", e);
            }
        }
    }

    public void close() {
        this.running.set(false);
        this.executorService.shutdown();
        Exceptions.runAndEatExceptions(() -> {
            this.executorService.awaitTermination(90L, TimeUnit.MILLISECONDS);
        });
    }

    public void basicRecover(boolean z) {
        new LinkedHashSet(this.unackedMessagesByDeliveryTag.keySet()).forEach(l -> {
            this.messages.offer(this.unackedMessagesByDeliveryTag.remove(l));
        });
        this.consumersByTag.values().forEach(consumerAndTag -> {
            consumerAndTag.consumer.handleRecoverOk(consumerAndTag.tag);
        });
    }

    public int messageCount() {
        return this.messages.size();
    }

    public int consumerCount() {
        return this.consumersByTag.size();
    }

    public int purge() {
        int messageCount = messageCount();
        this.messages.clear();
        return messageCount;
    }

    private void doWithUnackedUntil(long j, java.util.function.Consumer<Long> consumer) {
        if (this.unackedMessagesByDeliveryTag.containsKey(Long.valueOf(j))) {
            LinkedHashSet linkedHashSet = new LinkedHashSet();
            for (Long l : this.unackedMessagesByDeliveryTag.keySet()) {
                linkedHashSet.add(l);
                if (Long.valueOf(j).equals(l)) {
                    break;
                }
            }
            linkedHashSet.forEach(consumer);
        }
    }

    private boolean queueLengthLimitReached() {
        return ((Boolean) this.arguments.queueLengthLimit().map(num -> {
            return Boolean.valueOf(num.intValue() <= this.messages.size());
        }).orElse(false)).booleanValue();
    }

    private boolean queueLengthBytesLimitReached() {
        int sum = this.messages.stream().mapToInt(message -> {
            return message.body.length;
        }).sum();
        return ((Boolean) this.arguments.queueLengthBytesLimit().map(num -> {
            return Boolean.valueOf(num.intValue() <= sum);
        }).orElse(false)).booleanValue();
    }

    private long computeExpiryTime(AMQP.BasicProperties basicProperties) {
        return getMessageExpiryTime(basicProperties).orElse(Long.valueOf(((Long) this.arguments.getMessageTtlOfQueue().map((v1) -> {
            return computeExpiry(v1);
        }).orElse(-1L)).longValue())).longValue();
    }

    private Optional<Long> getMessageExpiryTime(AMQP.BasicProperties basicProperties) {
        return Optional.ofNullable(basicProperties.getExpiration()).flatMap(this::toLong).map((v1) -> {
            return computeExpiry(v1);
        });
    }

    private Long computeExpiry(long j) {
        return Long.valueOf(System.currentTimeMillis() + j);
    }

    private Optional<Long> toLong(String str) {
        try {
            return Optional.of(Long.valueOf(Long.parseLong(str)));
        } catch (NumberFormatException e) {
            return Optional.empty();
        }
    }

    public String toString() {
        return "Queue " + this.name;
    }
}
