package com.atlassian.psmq.internal;

import com.atlassian.pocketknife.api.querydsl.DatabaseAccessor;
import com.atlassian.pocketknife.api.querydsl.DatabaseConnectionConverter;
import com.atlassian.psmq.api.QConnectionProvider;
import com.atlassian.psmq.api.QException;
import com.atlassian.psmq.api.QSession;
import com.atlassian.psmq.api.QSessionDefinition;
import com.atlassian.psmq.api.internal.Validations;
import com.atlassian.psmq.api.message.QMessage;
import com.atlassian.psmq.api.message.QMessageConsumer;
import com.atlassian.psmq.api.message.QMessageProducer;
import com.atlassian.psmq.api.paging.PageResponse;
import com.atlassian.psmq.api.queue.GlobalQueueOperations;
import com.atlassian.psmq.api.queue.QBrowseMessageQuery;
import com.atlassian.psmq.api.queue.QBrowseMessageQueryBuilder;
import com.atlassian.psmq.api.queue.QId;
import com.atlassian.psmq.api.queue.QTopic;
import com.atlassian.psmq.api.queue.Queue;
import com.atlassian.psmq.api.queue.QueueBrowser;
import com.atlassian.psmq.api.queue.QueueDefinition;
import com.atlassian.psmq.api.queue.QueueOperations;
import com.atlassian.psmq.api.queue.QueueQuery;
import com.atlassian.psmq.api.queue.QueueUpdate;
import com.atlassian.psmq.internal.io.MessageReader;
import com.atlassian.psmq.internal.io.MessageWriter;
import com.atlassian.psmq.internal.io.SessionInstructions;
import com.atlassian.psmq.internal.io.TxnBoundary;
import com.atlassian.psmq.internal.io.TxnBoundaryAutoCommit;
import com.atlassian.psmq.internal.io.TxnBoundarySession;
import com.atlassian.psmq.internal.io.TxnBoundarySessionExternal;
import com.atlassian.psmq.internal.io.TxnContext;
import com.atlassian.psmq.internal.io.TxnContextSetup;
import com.atlassian.psmq.internal.io.TxnContextSetupImpl;
import com.atlassian.psmq.internal.io.TxnFixture;
import com.atlassian.psmq.internal.io.db.CommitKit;
import com.atlassian.psmq.internal.message.QMessageConsumerImpl;
import com.atlassian.psmq.internal.message.QMessageProducerImpl;
import com.atlassian.psmq.internal.queue.GlobalQueueOperationsImpl;
import com.atlassian.psmq.internal.queue.QueueImpl;
import com.atlassian.psmq.internal.queue.QueueOperationsImpl;
import io.atlassian.fugue.Option;
import java.sql.Connection;
import java.util.Optional;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/atlassian/psmq/internal/QSessionImpl.class */
public class QSessionImpl extends QCloseableImpl implements QSession {
    private static final Logger LOGGER = LoggerFactory.getLogger(QSessionImpl.class);
    private final DatabaseAccessor databaseAccessor;
    private final DatabaseConnectionConverter databaseConnectionConverter;
    private final Option<QConnectionProvider> connectionProvider;
    private final QSessionDefinition sessionDefinition;
    private final QueueOperationsImpl queueOperations;
    private final GlobalQueueOperationsImpl globalQueueOperations;
    private final MessageWriter messageWriter;
    private final MessageReader messageReader;
    private Option<Connection> currentConnection = Option.none();

    public QSessionImpl(DatabaseConnectionConverter databaseConnectionConverter, DatabaseAccessor databaseAccessor, Option<QConnectionProvider> option, QSessionDefinition qSessionDefinition, QueueOperationsImpl queueOperationsImpl, GlobalQueueOperationsImpl globalQueueOperationsImpl, MessageWriter messageWriter, MessageReader messageReader) {
        this.databaseConnectionConverter = databaseConnectionConverter;
        this.databaseAccessor = databaseAccessor;
        this.connectionProvider = option;
        this.queueOperations = queueOperationsImpl;
        this.sessionDefinition = qSessionDefinition;
        this.globalQueueOperations = globalQueueOperationsImpl;
        this.messageWriter = messageWriter;
        this.messageReader = messageReader;
    }

    public QueueOperations queueOperations() {
        return new QueueOperations() { // from class: com.atlassian.psmq.internal.QSessionImpl.1
            public PageResponse<Queue> queryQueues(QueueQuery queueQuery) {
                QSessionImpl.this.checkNotClosed();
                return QSessionImpl.this.queueOperations.queryQueues(QSessionImpl.this.instructions(), (QueueQuery) Validations.checkNotNull(queueQuery));
            }

            public Queue getQueue(QId qId) throws QException {
                QSessionImpl.this.checkNotClosed();
                return QSessionImpl.this.queueOperations.getQueue(QSessionImpl.this.instructions(), qId);
            }

            public Queue updateQueue(Queue queue, QueueUpdate queueUpdate) throws QException {
                QSessionImpl.this.checkNotClosed();
                return QSessionImpl.this.queueOperations.updateQueue(QSessionImpl.this.instructions(), QSessionImpl.this.asQueueImpl(queue), queueUpdate);
            }

            public Queue accessQueue(QueueDefinition queueDefinition) {
                QSessionImpl.this.checkNotClosed();
                return QSessionImpl.this.queueOperations.accessQueue(QSessionImpl.this.instructions(), (QueueDefinition) Validations.checkNotNull(queueDefinition));
            }

            public Optional<Queue> exclusiveAccessQueue(QueueDefinition queueDefinition) throws QException {
                QSessionImpl.this.checkNotClosed();
                return QSessionImpl.this.queueOperations.exclusiveAccessQueue(QSessionImpl.this.instructions(), (QueueDefinition) Validations.checkNotNull(queueDefinition)).toOptional();
            }

            public void heartBeatQueue(Queue queue) throws QException {
                QSessionImpl.this.checkNotClosed();
                QSessionImpl.LOGGER.debug("Sending heartbeat for queue {}", queue.name());
                QSessionImpl.this.queueOperations.heartBeatQueue(QSessionImpl.this.instructions(), (Queue) Validations.checkNotNull(queue));
                QSessionImpl.LOGGER.debug("Heartbeat sent for queue {}", queue.name());
            }

            public void releaseQueue(Queue queue) throws QException {
                QSessionImpl.this.checkNotClosed();
                QSessionImpl.this.queueOperations.releaseQueue(QSessionImpl.this.instructions(), (Queue) Validations.checkNotNull(queue));
            }

            public boolean releaseQueueIfEmpty(Queue queue) throws QException {
                QSessionImpl.this.checkNotClosed();
                return QSessionImpl.this.queueOperations.releaseQueueIfEmpty(QSessionImpl.this.instructions(), (Queue) Validations.checkNotNull(queue));
            }

            public void releaseAllQueues() throws QException {
                QSessionImpl.this.checkNotClosed();
                QSessionImpl.this.queueOperations.releaseAllQueues(QSessionImpl.this.instructions());
            }

            public void unresolveAllClaimedMessages(Queue queue) {
                QSessionImpl.this.checkNotClosed();
                QSessionImpl.this.queueOperations.unresolveAllClaimedMessages(QSessionImpl.this.instructions(), QSessionImpl.this.asQueueImpl(queue));
            }

            public void unresolveAllClaimedMessages() {
                QSessionImpl.this.checkNotClosed();
                QSessionImpl.this.queueOperations.unresolveAllClaimedMessages(QSessionImpl.this.instructions());
            }

            public void purgeQueue(Queue queue) {
                QSessionImpl.this.checkNotClosed();
                QSessionImpl.this.queueOperations.purgeQueue(QSessionImpl.this.instructions(), QSessionImpl.this.asQueueImpl(queue));
            }

            public void deleteQueue(Queue queue) {
                QSessionImpl.this.checkNotClosed();
                QSessionImpl.this.queueOperations.deleteQueue(QSessionImpl.this.instructions(), QSessionImpl.this.asQueueImpl(queue));
            }

            public void syncQueueMessageCount(Queue queue) {
                QSessionImpl.this.checkNotClosed();
                QSessionImpl.this.queueOperations.syncQueueMessageCount(QSessionImpl.this.instructions(), QSessionImpl.this.asQueueImpl(queue));
            }

            public QueueBrowser browser(final Queue queue) {
                QSessionImpl.this.checkNotClosed();
                return new QueueBrowser() { // from class: com.atlassian.psmq.internal.QSessionImpl.1.1
                    public long getMessageCount() {
                        return QSessionImpl.this.queueOperations.getMessageCount(QSessionImpl.this.instructions(), QSessionImpl.this.asQueueImpl(queue), QBrowseMessageQueryBuilder.anyMessage());
                    }

                    public long getMessageCount(QBrowseMessageQuery qBrowseMessageQuery) {
                        return QSessionImpl.this.queueOperations.getMessageCount(QSessionImpl.this.instructions(), QSessionImpl.this.asQueueImpl(queue), qBrowseMessageQuery);
                    }

                    public PageResponse<QMessage> browse(QBrowseMessageQuery qBrowseMessageQuery) {
                        return QSessionImpl.this.queueOperations.browse(QSessionImpl.this.instructions(), QSessionImpl.this.asQueueImpl(queue), qBrowseMessageQuery);
                    }
                };
            }
        };
    }

    public GlobalQueueOperations globalQueueOperations() {
        return new GlobalQueueOperations() { // from class: com.atlassian.psmq.internal.QSessionImpl.2
            public long getTotalMessageCountWithClaimant(boolean z) throws QException {
                QSessionImpl.this.checkNotClosed();
                return QSessionImpl.this.globalQueueOperations.getTotalMessageCountWithClaimant(QSessionImpl.this.instructions(), z);
            }

            public long getTotalMessageCount() throws QException {
                QSessionImpl.this.checkNotClosed();
                return QSessionImpl.this.globalQueueOperations.getTotalMessageCount(QSessionImpl.this.instructions());
            }

            public long getQueueMessageCountWithPartialName(String str) throws QException {
                QSessionImpl.this.checkNotClosed();
                return QSessionImpl.this.globalQueueOperations.getQueueMessageCountWithPartialName(QSessionImpl.this.instructions(), str);
            }

            public long getQueueMessageCountWithExactName(String str) throws QException {
                QSessionImpl.this.checkNotClosed();
                return QSessionImpl.this.globalQueueOperations.getQueueMessageCountWithExactName(QSessionImpl.this.instructions(), str);
            }
        };
    }

    public QMessageConsumer createConsumer(Queue queue) {
        checkNotClosed();
        return new QMessageConsumerImpl(this.messageReader, instructions((Queue) Validations.checkNotNull(queue)));
    }

    public QMessageProducer createProducer(Queue queue) {
        checkNotClosed();
        return new QMessageProducerImpl(this.messageWriter, instructions((Queue) Validations.checkNotNull(queue)));
    }

    public QMessageProducer createProducer(QTopic qTopic) throws QException {
        checkNotClosed();
        return new QMessageProducerImpl(this.messageWriter, instructions((QTopic) Validations.checkNotNull(qTopic)));
    }

    public void commit() throws QException {
        checkNotClosed();
        Validations.checkState(this.sessionDefinition.commitStrategy() == QSessionDefinition.CommitStrategy.SESSION_COMMIT, "You must have defined this session as using session commit strategy to call this method");
        if (this.currentConnection.isDefined()) {
            synchronized (this) {
                try {
                    CommitKit.commit((Connection) this.currentConnection.get(), "Unable to commit QSession");
                    returnCurrentConnection(this.currentConnection);
                } catch (Throwable th) {
                    returnCurrentConnection(this.currentConnection);
                    throw th;
                }
            }
        }
    }

    public void rollback() throws QException {
        checkNotClosed();
        Validations.checkState(this.sessionDefinition.commitStrategy() == QSessionDefinition.CommitStrategy.SESSION_COMMIT, "You must have defined this session as using session commit strategy to call this method");
        if (this.currentConnection.isDefined()) {
            synchronized (this) {
                try {
                    CommitKit.rollback((Connection) this.currentConnection.get(), "Unable to commit QSession");
                    returnCurrentConnection(this.currentConnection);
                } catch (Throwable th) {
                    returnCurrentConnection(this.currentConnection);
                    throw th;
                }
            }
        }
    }

    @Override // com.atlassian.psmq.internal.QCloseableImpl, java.lang.AutoCloseable
    public void close() {
        synchronized (this) {
            super.close();
            returnCurrentConnection(this.currentConnection);
        }
    }

    private void returnCurrentConnection(Option<Connection> option) {
        if (this.currentConnection.isDefined()) {
            ((QConnectionProvider) this.connectionProvider.get()).returnConnection((Connection) option.get());
            this.currentConnection = Option.none();
        }
    }

    private SessionInstructions instructions(Queue queue) {
        Validations.checkNotNull(queue);
        return new SessionInstructions(txnBoundary(this.sessionDefinition.commitStrategy()), asQueueImpl(queue));
    }

    private SessionInstructions instructions(QTopic qTopic) {
        Validations.checkNotNull(qTopic);
        return new SessionInstructions(txnBoundary(this.sessionDefinition.commitStrategy()), qTopic);
    }

    private SessionInstructions instructions() {
        return new SessionInstructions(txnBoundary(this.sessionDefinition.commitStrategy()));
    }

    private Supplier<TxnBoundary> txnBoundary(QSessionDefinition.CommitStrategy commitStrategy) {
        return () -> {
            return commitStrategy == QSessionDefinition.CommitStrategy.EXTERNAL_COMMIT ? new TxnBoundarySessionExternal(externalTxnFixture(), this.databaseConnectionConverter) : commitStrategy == QSessionDefinition.CommitStrategy.AUTO_COMMIT ? new TxnBoundaryAutoCommit(autoCommitFixture(), this.databaseAccessor) : new TxnBoundarySession(sessionLinkedTxnFixture(), this.databaseConnectionConverter);
        };
    }

    private TxnFixture autoCommitFixture() {
        return new TxnFixture() { // from class: com.atlassian.psmq.internal.QSessionImpl.3
            @Override // com.atlassian.psmq.internal.io.TxnFixture
            public TxnContextSetup preTxn() {
                return new TxnContextSetupImpl(Option.none(), QSessionImpl.this.sessionDefinition.claimant(), QSessionImpl.this.sessionDefinition.claimantHeartBeatMillis());
            }

            @Override // com.atlassian.psmq.internal.io.TxnFixture
            public void onTxnException(TxnContext txnContext) {
            }

            @Override // com.atlassian.psmq.internal.io.TxnFixture
            public void onTxnSuccess(TxnContext txnContext) {
            }
        };
    }

    private TxnFixture sessionLinkedTxnFixture() {
        return new TxnFixture() { // from class: com.atlassian.psmq.internal.QSessionImpl.4
            @Override // com.atlassian.psmq.internal.io.TxnFixture
            public TxnContextSetup preTxn() {
                TxnContextSetupImpl txnContextSetupImpl;
                synchronized (this) {
                    Validations.checkState(QSessionImpl.this.connectionProvider.isDefined());
                    if (QSessionImpl.this.currentConnection.isEmpty()) {
                        QSessionImpl.this.currentConnection = Option.some(((QConnectionProvider) QSessionImpl.this.connectionProvider.get()).borrowConnection());
                    }
                    txnContextSetupImpl = new TxnContextSetupImpl(QSessionImpl.this.currentConnection, QSessionImpl.this.sessionDefinition.claimant(), QSessionImpl.this.sessionDefinition.claimantHeartBeatMillis());
                }
                return txnContextSetupImpl;
            }

            @Override // com.atlassian.psmq.internal.io.TxnFixture
            public void onTxnException(TxnContext txnContext) {
            }

            @Override // com.atlassian.psmq.internal.io.TxnFixture
            public void onTxnSuccess(TxnContext txnContext) {
            }
        };
    }

    private TxnFixture externalTxnFixture() {
        return new TxnFixture() { // from class: com.atlassian.psmq.internal.QSessionImpl.5
            @Override // com.atlassian.psmq.internal.io.TxnFixture
            public TxnContextSetup preTxn() {
                Validations.checkState(QSessionImpl.this.connectionProvider.isDefined());
                return new TxnContextSetupImpl(Option.option(((QConnectionProvider) QSessionImpl.this.connectionProvider.get()).borrowConnection()), QSessionImpl.this.sessionDefinition.claimant(), QSessionImpl.this.sessionDefinition.claimantHeartBeatMillis());
            }

            @Override // com.atlassian.psmq.internal.io.TxnFixture
            public void onTxnException(TxnContext txnContext) {
            }

            @Override // com.atlassian.psmq.internal.io.TxnFixture
            public void onTxnSuccess(TxnContext txnContext) {
            }
        };
    }

    private QueueImpl asQueueImpl(Queue queue) {
        Validations.checkArgument(queue instanceof QueueImpl, "must be a QueueImpl");
        return (QueueImpl) queue;
    }

    private void checkNotClosed() {
        checkNotClosed("The session has been closed");
    }
}
