package com.atlassian.psmq.internal.io;

import com.atlassian.psmq.api.QException;
import com.atlassian.psmq.api.internal.Validations;
import com.atlassian.psmq.api.message.QMessage;
import com.atlassian.psmq.api.message.QMessageQuery;
import com.atlassian.psmq.internal.io.db.MessageDbTO;
import com.atlassian.psmq.internal.io.db.QueueDao;
import com.atlassian.psmq.internal.queue.QueueImpl;
import io.atlassian.fugue.Option;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/atlassian/psmq/internal/io/MessageReader.class */
public class MessageReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageReader.class);
    public static final long FAIL_SAFE_MAX_TIME_MS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS);
    private final QueueDao queueDao;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/atlassian/psmq/internal/io/MessageReader$ResolveAction.class */
    public enum ResolveAction {
        RESOLVE,
        UNRESOLVE
    }

    public MessageReader(QueueDao queueDao) {
        this.queueDao = queueDao;
    }

    public Option<QMessage> claimAndResolve(SessionInstructions sessionInstructions, QMessageQuery qMessageQuery) throws QException {
        return claimImpl(sessionInstructions, qMessageQuery, true);
    }

    public Option<QMessage> claim(SessionInstructions sessionInstructions, QMessageQuery qMessageQuery) {
        return claimImpl(sessionInstructions, qMessageQuery, false);
    }

    private Option<QMessage> claimImpl(SessionInstructions sessionInstructions, QMessageQuery qMessageQuery, boolean z) {
        Validations.checkNotNull(sessionInstructions);
        Validations.checkNotNull(qMessageQuery);
        QueueImpl queueImpl = (QueueImpl) sessionInstructions.queue().get();
        long value = queueImpl.id().value();
        return (Option) sessionInstructions.txnBoundary().run(String.format("Unable to read from queue '%s'", queueImpl.name()), txnContext -> {
            return claimFromQueue(txnContext, qMessageQuery, z, value);
        });
    }

    private Option<QMessage> claimFromQueue(TxnContext txnContext, QMessageQuery qMessageQuery, boolean z, long j) {
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            LOGGER.debug("Trying to claim message from {}", Long.valueOf(j));
            this.queueDao.heartBeatQueue(txnContext, j);
            if (System.currentTimeMillis() - currentTimeMillis > FAIL_SAFE_MAX_TIME_MS) {
                LOGGER.debug("Returning to avoid loop as all messages are claimed from queue {}", Long.valueOf(j));
                return Option.none();
            }
            Option<MessageDbTO> nextMsg = this.queueDao.nextMsg(txnContext, j, qMessageQuery);
            if (nextMsg.isEmpty()) {
                LOGGER.debug("Message was empty");
                return Option.none();
            }
            MessageDbTO messageDbTO = (MessageDbTO) nextMsg.get();
            LOGGER.debug("Claiming message {}", Long.valueOf(messageDbTO.getId()));
            if (this.queueDao.claimMsg(txnContext, j, messageDbTO.getId())) {
                LOGGER.debug("Claimed message");
                if (!isExpired(messageDbTO)) {
                    if (z) {
                        LOGGER.debug("Resolving message immediately");
                        this.queueDao.deleteMsg(txnContext, j, messageDbTO.getId());
                    }
                    LOGGER.debug("Returning message");
                    return nextMsg.map(messageDbTO2 -> {
                        return messageDbTO2.toMessage(txnContext.claimant());
                    });
                }
                LOGGER.debug("Deleting message as it was expired");
                this.queueDao.deleteMsg(txnContext, j, messageDbTO.getId());
            } else {
                LOGGER.debug("Failed to claim message");
            }
        }
    }

    public void resolve(SessionInstructions sessionInstructions, QMessage qMessage) {
        resolveOrUnresolveImpl(sessionInstructions, qMessage, ResolveAction.RESOLVE);
    }

    public void unresolve(SessionInstructions sessionInstructions, QMessage qMessage) {
        resolveOrUnresolveImpl(sessionInstructions, qMessage, ResolveAction.UNRESOLVE);
    }

    private void resolveOrUnresolveImpl(SessionInstructions sessionInstructions, QMessage qMessage, ResolveAction resolveAction) {
        Validations.checkNotNull(sessionInstructions);
        Validations.checkNotNull(qMessage);
        QueueImpl queueImpl = (QueueImpl) sessionInstructions.queue().get();
        sessionInstructions.txnBoundary().run(String.format("Unable to resolve in queue '%s'", queueImpl.name()), txnContext -> {
            Validations.checkState(qMessage.systemMetaData().claimant().equals(txnContext.claimant()), "You can only resolve messages that you claimed with this session");
            long value = queueImpl.id().value();
            this.queueDao.heartBeatQueue(txnContext, value);
            long value2 = qMessage.systemMetaData().systemId().value();
            if (resolveAction == ResolveAction.RESOLVE) {
                this.queueDao.deleteMsg(txnContext, value, value2);
            } else {
                this.queueDao.unresolveMsg(txnContext, value, value2);
            }
            return true;
        });
    }

    public boolean heartBeatMessage(SessionInstructions sessionInstructions, QMessage qMessage) {
        return ((Boolean) sessionInstructions.txnBoundary().run(String.format("Unable to heartbeat message '%s'", qMessage.messageId().value()), txnContext -> {
            if (!this.queueDao.heartBeatMsg(txnContext, qMessage.systemMetaData().systemId().value())) {
                return false;
            }
            this.queueDao.heartBeatQueue(txnContext, qMessage.systemMetaData().queueSystemId().value());
            return true;
        })).booleanValue();
    }

    private boolean isExpired(MessageDbTO messageDbTO) {
        Option<Long> expiryTime = messageDbTO.getExpiryTime();
        long currentTimeMillis = System.currentTimeMillis();
        return expiryTime.exists(l -> {
            return currentTimeMillis > l.longValue();
        });
    }
}
