package uk.ac.warwick.util.queue.activemq;

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.util.StringUtils;
import uk.ac.warwick.util.queue.Queue;
import uk.ac.warwick.util.queue.QueueListener;
import uk.ac.warwick.util.queue.QueueProvider;

/* loaded from: input_file:uk/ac/warwick/util/queue/activemq/ActiveMQQueueProvider.class */
public class ActiveMQQueueProvider implements DisposableBean, QueueProvider, MassListenerController {
    private ActiveMQConnectionFactory connectionFactory;
    private CachingConnectionFactory cachingConnectionFactory;
    private Map<String, NativeQueue> queues;

    /* loaded from: input_file:uk/ac/warwick/util/queue/activemq/ActiveMQQueueProvider$NativeQueue.class */
    class NativeQueue implements Queue, DisposableBean, MassListenerController {
        private JmsTemplate jms;
        private String name;
        private Map<QueueListener, DefaultMessageListenerContainer> listeners = Maps.newHashMap();

        /* loaded from: input_file:uk/ac/warwick/util/queue/activemq/ActiveMQQueueProvider$NativeQueue$ListenerContext.class */
        class ListenerContext {
            public DefaultMessageListenerContainer container;
            public QueueListener listener;

            private ListenerContext(DefaultMessageListenerContainer defaultMessageListenerContainer, QueueListener queueListener) {
                this.container = defaultMessageListenerContainer;
                this.listener = queueListener;
            }
        }

        public NativeQueue(String str) {
            this.name = str;
            this.jms = new JmsTemplate(ActiveMQQueueProvider.this.cachingConnectionFactory);
        }

        public void stopListener(QueueListener queueListener) {
            this.listeners.get(queueListener).stop();
        }

        @Override // uk.ac.warwick.util.queue.activemq.MassListenerController
        public void stopAllListeners() {
            Iterator<DefaultMessageListenerContainer> it = this.listeners.values().iterator();
            while (it.hasNext()) {
                it.next().stop();
            }
        }

        @Override // uk.ac.warwick.util.queue.activemq.MassListenerController
        public void startAllListeners() {
            Iterator<DefaultMessageListenerContainer> it = this.listeners.values().iterator();
            while (it.hasNext()) {
                it.next().start();
            }
        }

        public void startListener(QueueListener queueListener) {
            this.listeners.get(queueListener).start();
        }

        private void setPubSubValues(DefaultMessageListenerContainer defaultMessageListenerContainer) {
            defaultMessageListenerContainer.setPubSubDomain(this.jms.isPubSubDomain());
            defaultMessageListenerContainer.setPubSubNoLocal(this.jms.isPubSubNoLocal());
        }

        @Override // uk.ac.warwick.util.queue.Queue
        public void addListener(String str, final QueueListener queueListener) {
            if (queueListener.isListeningToQueue()) {
                DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
                defaultMessageListenerContainer.setSessionTransacted(true);
                defaultMessageListenerContainer.setConnectionFactory(ActiveMQQueueProvider.this.cachingConnectionFactory);
                defaultMessageListenerContainer.setDestinationName(this.name);
                setPubSubValues(defaultMessageListenerContainer);
                if (str != null) {
                    defaultMessageListenerContainer.setMessageSelector("itemType = '" + str.replace("'", "''") + "'");
                }
                defaultMessageListenerContainer.setMessageListener(new MessageListener() { // from class: uk.ac.warwick.util.queue.activemq.ActiveMQQueueProvider.NativeQueue.1
                    public void onMessage(Message message) {
                        try {
                            queueListener.onReceive(NativeQueue.this.jms.getMessageConverter().fromMessage(message));
                        } catch (JMSException e) {
                            throw new IllegalStateException((Throwable) e);
                        } catch (MessageConversionException e2) {
                            throw new IllegalStateException((Throwable) e2);
                        }
                    }
                });
                defaultMessageListenerContainer.afterPropertiesSet();
                this.listeners.put(queueListener, defaultMessageListenerContainer);
                defaultMessageListenerContainer.start();
            }
        }

        @Override // uk.ac.warwick.util.queue.Queue
        public void send(Object obj) {
            this.jms.convertAndSend(this.name, obj);
        }

        @Override // uk.ac.warwick.util.queue.Queue
        public void setPersistent(boolean z) {
            this.jms.setDeliveryPersistent(z);
        }

        @Override // uk.ac.warwick.util.queue.Queue
        public void setPubSub(boolean z) {
            this.jms.setPubSubDomain(z);
            Iterator<DefaultMessageListenerContainer> it = this.listeners.values().iterator();
            while (it.hasNext()) {
                it.next().setPubSubDomain(z);
            }
        }

        @Override // uk.ac.warwick.util.queue.Queue
        public void setPubSubNoLocal(boolean z) {
            this.jms.setPubSubNoLocal(z);
            Iterator<DefaultMessageListenerContainer> it = this.listeners.values().iterator();
            while (it.hasNext()) {
                it.next().setPubSubNoLocal(z);
            }
        }

        public void destroy() {
            Iterator<DefaultMessageListenerContainer> it = this.listeners.values().iterator();
            while (it.hasNext()) {
                it.next().destroy();
            }
        }

        @Override // uk.ac.warwick.util.queue.Queue
        public void setMessageConverter(MessageConverter messageConverter) {
            this.jms.setMessageConverter(messageConverter);
        }

        @Override // uk.ac.warwick.util.queue.Queue
        public void setSingleListener(QueueListener queueListener) {
            if (!this.listeners.isEmpty()) {
                throw new IllegalStateException("Can only set a single listener if no other listeners have been set on this queue");
            }
            addListener(null, queueListener);
        }
    }

    public ActiveMQQueueProvider(String str) {
        this.queues = new HashMap();
        this.connectionFactory = new ActiveMQConnectionFactory(str);
        this.cachingConnectionFactory = new CachingConnectionFactory();
        this.cachingConnectionFactory.setTargetConnectionFactory(this.connectionFactory);
        this.cachingConnectionFactory.setSessionCacheSize(10);
    }

    public ActiveMQQueueProvider(String str, String str2, String str3) {
        this(str);
        if (StringUtils.hasText(str2) && StringUtils.hasText(str3)) {
            this.connectionFactory.setUserName(str2);
            this.connectionFactory.setPassword(str3);
        }
    }

    public static ActiveMQQueueProvider createEmbeddedBroker() {
        return new ActiveMQQueueProvider("vm://embedded?broker.persistent=false&broker.useJmx=false");
    }

    @Override // uk.ac.warwick.util.queue.QueueProvider
    public void destroy() {
        Iterator<NativeQueue> it = this.queues.values().iterator();
        while (it.hasNext()) {
            it.next().destroy();
        }
        this.cachingConnectionFactory.destroy();
    }

    @Override // uk.ac.warwick.util.queue.QueueProvider
    public Queue getQueue(String str) {
        NativeQueue nativeQueue = new NativeQueue(str);
        this.queues.put(str, nativeQueue);
        return nativeQueue;
    }

    @Override // uk.ac.warwick.util.queue.activemq.MassListenerController
    public void startAllListeners() {
        Iterator<NativeQueue> it = this.queues.values().iterator();
        while (it.hasNext()) {
            it.next().startAllListeners();
        }
    }

    @Override // uk.ac.warwick.util.queue.activemq.MassListenerController
    public void stopAllListeners() {
        Iterator<NativeQueue> it = this.queues.values().iterator();
        while (it.hasNext()) {
            it.next().stopAllListeners();
        }
    }
}
