package org.zeromq.jms;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import org.zeromq.ZMQ;
import org.zeromq.jms.jmx.ZmqMBeanUtils;
import org.zeromq.jms.protocol.ZmqGateway;
import org.zeromq.jms.protocol.ZmqGatewayFactory;
import org.zeromq.jms.protocol.ZmqSocketType;

/* loaded from: input_file:WEB-INF/lib/jeromq-jms-1.0-RELEASE.jar:org/zeromq/jms/ZmqSession.class */
public class ZmqSession implements QueueSession, TopicSession {
    private final Map<String, ZmqURI> destinationSchema;
    private final boolean transacted;
    private final int acknowledgeMode;
    private final ExceptionListener exceptionHandler;
    private final ZmqGatewayFactory gatewayFactory;
    private static final Logger LOGGER = Logger.getLogger(ZmqSession.class.getCanonicalName());
    private static volatile int gatewayCount = 0;
    private final List<ZmqGateway> gateways = new ArrayList();
    private final Set<ZMQ.Context> contexts = new HashSet();
    private ZMQ.Context defaultContext = ZMQ.context(1);
    private final List<ObjectName> mbeanNames = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZmqSession(ZmqGatewayFactory zmqGatewayFactory, Map<String, ZmqURI> map, boolean z, int i, ExceptionListener exceptionListener) {
        this.destinationSchema = map;
        this.transacted = z;
        this.acknowledgeMode = i;
        this.exceptionHandler = exceptionListener;
        this.gatewayFactory = zmqGatewayFactory;
    }

    protected ZMQ.Context getContext(AbstractZmqDestination abstractZmqDestination) {
        return this.defaultContext;
    }

    protected void open(ZMQ.Context context, ZmqGateway zmqGateway) throws JMSException {
        try {
            zmqGateway.open();
            List<ObjectName> register = ZmqMBeanUtils.register(zmqGateway);
            synchronized (this.mbeanNames) {
                this.mbeanNames.addAll(register);
            }
            synchronized (this.gateways) {
                this.gateways.add(zmqGateway);
                this.contexts.add(context);
            }
        } catch (RuntimeException e) {
            LOGGER.log(Level.SEVERE, "Unable to open to ZMQ gateway: " + zmqGateway, (Throwable) e);
            throw new ZmqException("Unable to open to ZMQ gateway: " + zmqGateway, e);
        }
    }

    @Override // javax.jms.Session
    public void close() throws JMSException {
        synchronized (this.gateways) {
            for (ZmqGateway zmqGateway : this.gateways) {
                if (zmqGateway.isActive()) {
                    zmqGateway.close();
                }
            }
            this.gateways.clear();
            Iterator<ZMQ.Context> it = this.contexts.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.contexts.clear();
        }
        Iterator<ObjectName> it2 = this.mbeanNames.iterator();
        while (it2.hasNext()) {
            ZmqMBeanUtils.unregister(it2.next());
        }
        this.mbeanNames.clear();
    }

    @Override // javax.jms.Session
    public void commit() throws JMSException {
        if (!this.transacted) {
            throw new ZmqException("Session was not enabled for transactions.");
        }
        try {
            synchronized (this.gateways) {
                Iterator<ZmqGateway> it = this.gateways.iterator();
                while (it.hasNext()) {
                    it.next().commit();
                }
            }
            if (LOGGER.isLoggable(Level.FINEST)) {
                LOGGER.finest("Commited messages");
            }
        } catch (ZmqException e) {
            throw new ZmqException("Unable to commit messages.", e);
        }
    }

    @Override // javax.jms.Session
    public BytesMessage createBytesMessage() throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public MessageProducer createProducer(Destination destination) throws JMSException {
        return destination instanceof Topic ? createPublisher((Topic) destination) : createSender((Queue) destination);
    }

    @Override // javax.jms.Session
    public MessageConsumer createConsumer(Destination destination) throws JMSException {
        return createConsumer(destination, null, false);
    }

    @Override // javax.jms.Session
    public MessageConsumer createConsumer(Destination destination, String str) throws JMSException {
        return createConsumer(destination, str, false);
    }

    @Override // javax.jms.Session
    public MessageConsumer createConsumer(Destination destination, String str, boolean z) throws JMSException {
        return destination instanceof Topic ? createSubscriber((Topic) destination, str, z) : createReceiver((Queue) destination, str);
    }

    @Override // javax.jms.Session
    public TopicSubscriber createDurableSubscriber(Topic topic, String str) throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public TopicSubscriber createDurableSubscriber(Topic topic, String str, String str2, boolean z) throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public MapMessage createMapMessage() throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public Message createMessage() throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public ObjectMessage createObjectMessage() throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public StreamMessage createStreamMessage() throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public TemporaryTopic createTemporaryTopic() throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public TextMessage createTextMessage() throws JMSException {
        return new ZmqTextMessage();
    }

    @Override // javax.jms.Session
    public TextMessage createTextMessage(String str) throws JMSException {
        TextMessage createTextMessage = createTextMessage();
        createTextMessage.setText(str);
        return createTextMessage;
    }

    @Override // javax.jms.Session
    public Topic createTopic(String str) throws JMSException {
        String str2 = str;
        if (str.startsWith("jms:topic")) {
            ZmqURI create = ZmqURI.create(str);
            str2 = create.getDestinationName();
            if (this.destinationSchema.containsKey(str2)) {
                LOGGER.warning("Creating topic with URI already exists in scheam: " + create);
            } else {
                this.destinationSchema.put(str2, create);
            }
        }
        if (!this.destinationSchema.containsKey(str2)) {
            throw new ZmqException("Unable to resolve topic within schema store for name: " + str2);
        }
        ZmqURI zmqURI = this.destinationSchema.get(str2);
        if (zmqURI.getOptionValue("gateway.addr", (String) null) == null) {
            throw new ZmqException("Unable to resolve 'gateway.addr' for topic URI: " + zmqURI);
        }
        return new ZmqTopic(zmqURI);
    }

    @Override // javax.jms.Session
    public int getAcknowledgeMode() throws JMSException {
        return this.acknowledgeMode;
    }

    @Override // javax.jms.Session
    public MessageListener getMessageListener() throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public boolean getTransacted() throws JMSException {
        return this.transacted;
    }

    @Override // javax.jms.Session
    public void recover() throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public void rollback() throws JMSException {
        if (!this.transacted) {
            throw new ZmqException("Session was not enabled for transactions.");
        }
        try {
            synchronized (this.gateways) {
                Iterator<ZmqGateway> it = this.gateways.iterator();
                while (it.hasNext()) {
                    it.next().rollback();
                }
            }
            if (LOGGER.isLoggable(Level.FINEST)) {
                LOGGER.finest("rollback messages");
            }
        } catch (ZmqException e) {
            throw new ZmqException("Unable to rollback messages", e);
        }
    }

    @Override // javax.jms.Session, java.lang.Runnable
    public void run() {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public void setMessageListener(MessageListener messageListener) throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public void unsubscribe(String str) throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.QueueSession, javax.jms.Session
    public QueueBrowser createBrowser(Queue queue) throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.QueueSession, javax.jms.Session
    public QueueBrowser createBrowser(Queue queue, String str) throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.QueueSession, javax.jms.Session
    public Queue createQueue(String str) throws JMSException {
        String str2 = str;
        if (str.startsWith("jms:queue")) {
            ZmqURI create = ZmqURI.create(str);
            str2 = create.getDestinationName();
            if (this.destinationSchema.containsKey(str2)) {
                LOGGER.warning("Creating queue with URI already exists in scheam: " + create);
            } else {
                this.destinationSchema.put(str2, create);
            }
        }
        if (!this.destinationSchema.containsKey(str2)) {
            throw new ZmqException("Unable to resolve queue within schema store for name: " + str2);
        }
        ZmqURI zmqURI = this.destinationSchema.get(str2);
        if (zmqURI.getOptionValue("gateway.addr", (String) null) == null) {
            throw new ZmqException("Unable to resolve 'gateway.addr' for queue URI: " + zmqURI);
        }
        return new ZmqQueue(str);
    }

    @Override // javax.jms.QueueSession, javax.jms.Session
    public TemporaryQueue createTemporaryQueue() throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.QueueSession
    public QueueReceiver createReceiver(Queue queue) throws JMSException {
        return createReceiver(queue, null);
    }

    @Override // javax.jms.QueueSession
    public QueueReceiver createReceiver(Queue queue, String str) throws JMSException {
        ZmqQueue zmqQueue = (ZmqQueue) queue;
        boolean transacted = getTransacted();
        ZMQ.Context context = getContext(zmqQueue);
        StringBuilder append = new StringBuilder().append("receiver-");
        int i = gatewayCount + 1;
        gatewayCount = i;
        ZmqGateway newConsumerGateway = this.gatewayFactory.newConsumerGateway(append.append(i).append("@").toString(), zmqQueue, context, ZmqSocketType.PULL, true, str, transacted);
        open(context, newConsumerGateway);
        ZmqQueueReciever zmqQueueReciever = new ZmqQueueReciever(newConsumerGateway, queue, str, this.exceptionHandler);
        LOGGER.info("Created recevier: " + zmqQueueReciever);
        return zmqQueueReciever;
    }

    @Override // javax.jms.QueueSession
    public QueueSender createSender(Queue queue) throws JMSException {
        ZmqQueue zmqQueue = (ZmqQueue) queue;
        boolean transacted = getTransacted();
        ZMQ.Context context = getContext(zmqQueue);
        StringBuilder append = new StringBuilder().append("sender-");
        int i = gatewayCount + 1;
        gatewayCount = i;
        ZmqGateway newProducerGateway = this.gatewayFactory.newProducerGateway(append.append(i).append("@").toString(), zmqQueue, context, ZmqSocketType.PUSH, false, transacted);
        open(context, newProducerGateway);
        ZmqQueueSender zmqQueueSender = new ZmqQueueSender(newProducerGateway, queue);
        LOGGER.info("Created sender: " + zmqQueueSender);
        return zmqQueueSender;
    }

    @Override // javax.jms.TopicSession
    public TopicPublisher createPublisher(Topic topic) throws JMSException {
        ZmqTopic zmqTopic = (ZmqTopic) topic;
        boolean transacted = getTransacted();
        ZMQ.Context context = getContext(zmqTopic);
        StringBuilder append = new StringBuilder().append("publisher-");
        int i = gatewayCount + 1;
        gatewayCount = i;
        ZmqGateway newProducerGateway = this.gatewayFactory.newProducerGateway(append.append(i).append("@").toString(), zmqTopic, context, ZmqSocketType.PUB, true, transacted);
        open(context, newProducerGateway);
        ZmqTopicPublisher zmqTopicPublisher = new ZmqTopicPublisher(newProducerGateway, topic);
        LOGGER.info("Created publisher: " + zmqTopicPublisher);
        return zmqTopicPublisher;
    }

    @Override // javax.jms.TopicSession
    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
        return createSubscriber(topic, null, false);
    }

    @Override // javax.jms.TopicSession
    public TopicSubscriber createSubscriber(Topic topic, String str, boolean z) throws JMSException {
        ZmqTopic zmqTopic = (ZmqTopic) topic;
        boolean transacted = getTransacted();
        ZMQ.Context context = getContext(zmqTopic);
        StringBuilder append = new StringBuilder().append("subscriber-");
        int i = gatewayCount + 1;
        gatewayCount = i;
        ZmqGateway newConsumerGateway = this.gatewayFactory.newConsumerGateway(append.append(i).append("@").toString(), zmqTopic, context, ZmqSocketType.SUB, false, str, transacted);
        open(context, newConsumerGateway);
        ZmqTopicSubscriber zmqTopicSubscriber = new ZmqTopicSubscriber(newConsumerGateway, topic, str, z, this.exceptionHandler);
        LOGGER.info("Created subscriber: " + zmqTopicSubscriber);
        return zmqTopicSubscriber;
    }

    public String toString() {
        return "ZmqSession [destinationSchema=" + this.destinationSchema + ", transacted=" + this.transacted + ", acknowledgeMode=" + this.acknowledgeMode + ", gateways=" + this.gateways + "]";
    }
}
