package com.atlassian.psmq.internal.io;

import com.atlassian.psmq.api.QException;
import com.atlassian.psmq.api.internal.Validations;
import com.atlassian.psmq.api.message.QMessage;
import com.atlassian.psmq.api.message.QMessageUpdate;
import com.atlassian.psmq.api.queue.QTopic;
import com.atlassian.psmq.api.queue.Queue;
import com.atlassian.psmq.api.queue.QueueQueryBuilder;
import com.atlassian.psmq.internal.io.db.MessageDbTO;
import com.atlassian.psmq.internal.io.db.QueueDao;
import io.atlassian.fugue.Option;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

/* loaded from: input_file:com/atlassian/psmq/internal/io/MessageWriter.class */
public class MessageWriter {
    private final QueueDao queueDao;

    public MessageWriter(QueueDao queueDao) {
        this.queueDao = queueDao;
    }

    public long writeMessage(SessionInstructions sessionInstructions, QMessage qMessage) throws QException {
        Validations.checkNotNull(sessionInstructions);
        Validations.checkNotNull(qMessage);
        return sessionInstructions.queue().isDefined() ? writeToQueue(sessionInstructions, qMessage) : writeToQueuesAssociatedWithTopic(sessionInstructions, qMessage);
    }

    public void updateMessage(SessionInstructions sessionInstructions, QMessage qMessage, QMessageUpdate qMessageUpdate) {
        Validations.checkNotNull(sessionInstructions);
        Validations.checkNotNull(qMessage);
        sessionInstructions.txnBoundary().run(String.format("Unable to update message '%s'", qMessage.systemMetaData().systemId()), txnContext -> {
            this.queueDao.updateMsg(txnContext, qMessage, qMessageUpdate);
            return true;
        });
    }

    private long writeToQueue(SessionInstructions sessionInstructions, QMessage qMessage) {
        Queue queue = (Queue) sessionInstructions.queue().get();
        return ((Long) sessionInstructions.txnBoundary().run(String.format("Unable to write to queue '%s'", queue.name()), txnContext -> {
            Option<Long> checkQueueExists = this.queueDao.checkQueueExists(txnContext, queue.id().value());
            if (checkQueueExists.isEmpty()) {
                throw new QException(String.format("Unable to find queue named '%s", queue.name()));
            }
            writeMsgToQueue(qMessage, txnContext, ((Long) checkQueueExists.get()).longValue());
            return 1L;
        })).longValue();
    }

    private long writeToQueuesAssociatedWithTopic(SessionInstructions sessionInstructions, QMessage qMessage) {
        QTopic qTopic = (QTopic) sessionInstructions.topic().get();
        return ((Long) sessionInstructions.txnBoundary().run(String.format("Unable to write to topic '%s'", qTopic), txnContext -> {
            Stream<Long> queryQueueIds = this.queueDao.queryQueueIds(txnContext, QueueQueryBuilder.newQuery().withTopic(qTopic).build());
            Throwable th = null;
            try {
                AtomicLong atomicLong = new AtomicLong();
                queryQueueIds.forEach(l -> {
                    writeMsgToQueue(qMessage, txnContext, l.longValue());
                    atomicLong.incrementAndGet();
                });
                Long valueOf = Long.valueOf(atomicLong.longValue());
                if (queryQueueIds != null) {
                    if (0 != 0) {
                        try {
                            queryQueueIds.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        queryQueueIds.close();
                    }
                }
                return valueOf;
            } catch (Throwable th3) {
                if (queryQueueIds != null) {
                    if (0 != 0) {
                        try {
                            queryQueueIds.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        queryQueueIds.close();
                    }
                }
                throw th3;
            }
        })).longValue();
    }

    private Long writeMsgToQueue(QMessage qMessage, TxnContext txnContext, long j) {
        this.queueDao.heartBeatQueue(txnContext, j);
        return this.queueDao.putMsg(txnContext, Long.valueOf(j), new MessageDbTO(qMessage), serialiseData(qMessage));
    }

    private Option<String> serialiseData(QMessage qMessage) {
        return Option.some(qMessage.buffer().asString());
    }
}
