package org.citrusframework.jms.endpoint;

import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.Topic;
import jakarta.jms.TopicConnection;
import jakarta.jms.TopicConnectionFactory;
import jakarta.jms.TopicSession;
import jakarta.jms.TopicSubscriber;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.citrusframework.context.TestContext;
import org.citrusframework.context.TestContextFactory;
import org.citrusframework.endpoint.direct.DirectEndpoint;
import org.citrusframework.endpoint.direct.DirectEndpointConfiguration;
import org.citrusframework.exceptions.CitrusRuntimeException;
import org.citrusframework.message.DefaultMessageQueue;
import org.citrusframework.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/citrusframework/jms/endpoint/JmsTopicSubscriber.class */
public class JmsTopicSubscriber extends JmsConsumer implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(JmsConsumer.class);
    private boolean running;
    private final TestContextFactory testContextFactory;
    private DirectEndpoint messageQueue;
    private Executor subscription;
    private CompletableFuture<Boolean> stopped;
    private CompletableFuture<Boolean> started;

    public JmsTopicSubscriber(String str, JmsEndpointConfiguration jmsEndpointConfiguration, TestContextFactory testContextFactory) {
        super(str, jmsEndpointConfiguration);
        this.running = true;
        this.subscription = Executors.newSingleThreadExecutor();
        this.stopped = new CompletableFuture<>();
        this.started = new CompletableFuture<>();
        this.testContextFactory = testContextFactory;
        DirectEndpointConfiguration directEndpointConfiguration = new DirectEndpointConfiguration();
        directEndpointConfiguration.setQueue(new DefaultMessageQueue(str + ".inbound"));
        this.messageQueue = new DirectEndpoint(directEndpointConfiguration);
    }

    @Override // java.lang.Runnable
    public void run() {
        Topic createTopic;
        TopicSubscriber createSubscriber;
        TopicConnectionFactory topicConnectionFactory = (ConnectionFactory) Optional.ofNullable(this.endpointConfiguration.getConnectionFactory()).orElseGet(() -> {
            return this.endpointConfiguration.getJmsTemplate().getConnectionFactory();
        });
        TopicConnection topicConnection = null;
        try {
            try {
                if (!(topicConnectionFactory instanceof TopicConnectionFactory)) {
                    throw new CitrusRuntimeException("Failed to create JMS topic subscriber for unsupported connection factory type: " + ((String) Optional.ofNullable(topicConnectionFactory).map((v0) -> {
                        return v0.getClass();
                    }).map((v0) -> {
                        return v0.getName();
                    }).orElse("connection factory not set")));
                }
                TopicConnection createTopicConnection = topicConnectionFactory.createTopicConnection();
                TopicSession createTopicSession = createTopicConnection.createTopicSession(false, 1);
                if (this.endpointConfiguration.getDestination() != null && (this.endpointConfiguration.getDestination() instanceof Topic)) {
                    createTopic = (Topic) this.endpointConfiguration.getDestination();
                } else if (StringUtils.hasText(this.endpointConfiguration.getDestinationName())) {
                    createTopic = createTopicSession.createTopic(this.endpointConfiguration.getDestinationName());
                } else if (this.endpointConfiguration.getJmsTemplate().getDefaultDestination() != null && (this.endpointConfiguration.getJmsTemplate().getDefaultDestination() instanceof Topic)) {
                    createTopic = (Topic) this.endpointConfiguration.getJmsTemplate().getDefaultDestination();
                } else {
                    if (!StringUtils.hasText(this.endpointConfiguration.getJmsTemplate().getDefaultDestinationName())) {
                        throw new CitrusRuntimeException("Unable to receive message - JMS destination not set");
                    }
                    createTopic = createTopicSession.createTopic(this.endpointConfiguration.getJmsTemplate().getDefaultDestinationName());
                }
                if (this.endpointConfiguration.isDurableSubscription()) {
                    logger.debug(String.format("Create JMS topic durable subscription '%s'", Optional.ofNullable(this.endpointConfiguration.getDurableSubscriberName()).orElseGet(this::getName)));
                    createSubscriber = createTopicSession.createDurableSubscriber(createTopic, (String) Optional.ofNullable(this.endpointConfiguration.getDurableSubscriberName()).orElseGet(this::getName));
                } else {
                    logger.debug("Create JMS topic subscription");
                    createSubscriber = createTopicSession.createSubscriber(createTopic);
                }
                createTopicConnection.start();
                this.started.complete(true);
                while (this.running) {
                    Message receive = createSubscriber.receive();
                    if (receive != null) {
                        TestContext object = this.testContextFactory.getObject();
                        org.citrusframework.message.Message convertInbound = this.endpointConfiguration.getMessageConverter().convertInbound(receive, this.endpointConfiguration, object);
                        if (logger.isDebugEnabled()) {
                            logger.debug(String.format("Received topic event '%s'", convertInbound.getId()));
                        }
                        this.messageQueue.createProducer().send(convertInbound, object);
                    } else {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Topic subscriber received null message - continue after " + this.endpointConfiguration.getPollingInterval() + " milliseconds");
                        }
                        try {
                            Thread.sleep(this.endpointConfiguration.getPollingInterval());
                        } catch (InterruptedException e) {
                            logger.warn("Interrupted while waiting after null message", e);
                        }
                    }
                }
                if (createTopicConnection != null) {
                    try {
                        createTopicConnection.close();
                    } catch (JMSException e2) {
                        logger.warn("Failed to close JMS topic connection", e2);
                    }
                }
                this.stopped.complete(true);
            } catch (JMSException e3) {
                this.started.completeExceptionally(e3);
                if (0 != 0) {
                    try {
                        topicConnection.close();
                    } catch (JMSException e4) {
                        logger.warn("Failed to close JMS topic connection", e4);
                    }
                }
                this.stopped.complete(true);
            } catch (RuntimeException e5) {
                this.started.completeExceptionally(e5);
                throw e5;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    topicConnection.close();
                } catch (JMSException e6) {
                    logger.warn("Failed to close JMS topic connection", e6);
                }
            }
            this.stopped.complete(true);
            throw th;
        }
    }

    public void start() {
        this.subscription.execute(this);
        try {
            if (this.started.get().booleanValue()) {
                logger.info("Started JMS topic subscription");
            }
        } catch (InterruptedException | ExecutionException e) {
            logger.warn("Failed to wait for topic subscriber to start subscription", e);
        }
    }

    public void stop() {
        this.running = false;
        try {
            this.stopped.get(this.endpointConfiguration.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException e) {
            logger.warn("Failed to wait for topic subscriber to stop gracefully", e);
        } catch (TimeoutException e2) {
            logger.warn("Timeout while waiting for topic subscriber to stop gracefully", e2);
        }
    }

    public org.citrusframework.message.Message receive(TestContext testContext, long j) {
        return this.messageQueue.createConsumer().receive(testContext, j);
    }

    @Override // org.citrusframework.jms.endpoint.JmsConsumer
    public org.citrusframework.message.Message receive(String str, TestContext testContext, long j) {
        return this.messageQueue.createConsumer().receive(str, testContext, j);
    }

    public boolean isRunning() {
        return this.running;
    }
}
