/*
 * Decompiled with CFR 0.152.
 */
package uk.ac.warwick.util.queue.activemq;

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.util.StringUtils;
import uk.ac.warwick.util.queue.Queue;
import uk.ac.warwick.util.queue.QueueListener;
import uk.ac.warwick.util.queue.QueueProvider;
import uk.ac.warwick.util.queue.activemq.MassListenerController;

public class ActiveMQQueueProvider
implements DisposableBean,
QueueProvider,
MassListenerController {
    private ActiveMQConnectionFactory connectionFactory;
    private CachingConnectionFactory cachingConnectionFactory;
    private Map<String, NativeQueue> queues = new HashMap<String, NativeQueue>();

    public ActiveMQQueueProvider(String brokerURL) {
        this.connectionFactory = new ActiveMQConnectionFactory(brokerURL);
        this.cachingConnectionFactory = new CachingConnectionFactory();
        this.cachingConnectionFactory.setTargetConnectionFactory((ConnectionFactory)this.connectionFactory);
        this.cachingConnectionFactory.setSessionCacheSize(10);
    }

    public ActiveMQQueueProvider(String brokerURL, String username, String password) {
        this(brokerURL);
        if (StringUtils.hasText((String)username) && StringUtils.hasText((String)password)) {
            this.connectionFactory.setUserName(username);
            this.connectionFactory.setPassword(password);
        }
    }

    public static ActiveMQQueueProvider createEmbeddedBroker() {
        return new ActiveMQQueueProvider("vm://embedded?broker.persistent=false&broker.useJmx=false");
    }

    @Override
    public void destroy() {
        for (NativeQueue queue : this.queues.values()) {
            queue.destroy();
        }
        this.cachingConnectionFactory.destroy();
    }

    @Override
    public Queue getQueue(String queueName) {
        NativeQueue queue = new NativeQueue(queueName);
        this.queues.put(queueName, queue);
        return queue;
    }

    @Override
    public void startAllListeners() {
        for (NativeQueue queue : this.queues.values()) {
            queue.startAllListeners();
        }
    }

    @Override
    public void stopAllListeners() {
        for (NativeQueue queue : this.queues.values()) {
            queue.stopAllListeners();
        }
    }

    class NativeQueue
    implements Queue,
    DisposableBean,
    MassListenerController {
        private JmsTemplate jms;
        private String name;
        private Map<QueueListener, DefaultMessageListenerContainer> listeners = Maps.newHashMap();

        public NativeQueue(String name) {
            this.name = name;
            this.jms = new JmsTemplate((ConnectionFactory)ActiveMQQueueProvider.this.cachingConnectionFactory);
        }

        public void stopListener(QueueListener listener) {
            this.listeners.get(listener).stop();
        }

        @Override
        public void stopAllListeners() {
            for (DefaultMessageListenerContainer container : this.listeners.values()) {
                container.stop();
            }
        }

        @Override
        public void startAllListeners() {
            for (DefaultMessageListenerContainer container : this.listeners.values()) {
                container.start();
            }
        }

        public void startListener(QueueListener listener) {
            this.listeners.get(listener).start();
        }

        private void setPubSubValues(DefaultMessageListenerContainer container) {
            container.setPubSubDomain(this.jms.isPubSubDomain());
            container.setPubSubNoLocal(this.jms.isPubSubNoLocal());
        }

        @Override
        public void addListener(String itemType, final QueueListener listener) {
            if (!listener.isListeningToQueue()) {
                return;
            }
            DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
            container.setSessionTransacted(true);
            container.setConnectionFactory((ConnectionFactory)ActiveMQQueueProvider.this.cachingConnectionFactory);
            container.setDestinationName(this.name);
            this.setPubSubValues(container);
            if (itemType != null) {
                container.setMessageSelector("itemType = '" + itemType.replace("'", "''") + "'");
            }
            container.setMessageListener((Object)new MessageListener(){

                public void onMessage(Message message) {
                    try {
                        listener.onReceive(NativeQueue.this.jms.getMessageConverter().fromMessage(message));
                    }
                    catch (MessageConversionException e) {
                        throw new IllegalStateException(e);
                    }
                    catch (JMSException e) {
                        throw new IllegalStateException(e);
                    }
                }
            });
            container.afterPropertiesSet();
            this.listeners.put(listener, container);
            container.start();
        }

        @Override
        public void send(Object message) {
            this.jms.convertAndSend(this.name, message);
        }

        @Override
        public void setPersistent(boolean persistent) {
            this.jms.setDeliveryPersistent(persistent);
        }

        @Override
        public void setPubSub(boolean pubSub) {
            this.jms.setPubSubDomain(pubSub);
            for (DefaultMessageListenerContainer container : this.listeners.values()) {
                container.setPubSubDomain(pubSub);
            }
        }

        @Override
        public void setPubSubNoLocal(boolean pubSubNoLocal) {
            this.jms.setPubSubNoLocal(pubSubNoLocal);
            for (DefaultMessageListenerContainer container : this.listeners.values()) {
                container.setPubSubNoLocal(pubSubNoLocal);
            }
        }

        public void destroy() {
            for (DefaultMessageListenerContainer container : this.listeners.values()) {
                container.destroy();
            }
        }

        @Override
        public void setMessageConverter(MessageConverter converter) {
            this.jms.setMessageConverter(converter);
        }

        @Override
        public void setSingleListener(QueueListener listener) {
            if (!this.listeners.isEmpty()) {
                throw new IllegalStateException("Can only set a single listener if no other listeners have been set on this queue");
            }
            this.addListener(null, listener);
        }

        class ListenerContext {
            public DefaultMessageListenerContainer container;
            public QueueListener listener;

            private ListenerContext(DefaultMessageListenerContainer container, QueueListener listener) {
                this.container = container;
                this.listener = listener;
            }
        }
    }
}

