001    package com.mockrunner.mock.jms;
002    
003    import java.io.Serializable;
004    
005    import javax.jms.InvalidSelectorException;
006    import javax.jms.JMSException;
007    import javax.jms.Message;
008    import javax.jms.MessageConsumer;
009    import javax.jms.MessageListener;
010    
011    import org.activemq.filter.mockrunner.Filter;
012    import org.activemq.selector.mockrunner.SelectorParser;
013    
014    /**
015     * Mock implementation of JMS <code>MessageConsumer</code>.
016     */
017    public abstract class MockMessageConsumer implements MessageConsumer, Serializable
018    {
019        private MockConnection connection;
020        private String messageSelector;
021        private Filter messageSelectorFilter;
022        private boolean closed;
023        private MessageListener messageListener;
024            
025        public MockMessageConsumer(MockConnection connection, String messageSelector)
026        {
027            this.connection = connection;
028            this.messageSelector = messageSelector;
029            parseMessageSelector();
030            closed = false;
031            messageListener = null;
032        }
033    
034        private void parseMessageSelector()
035        {
036            if(null == messageSelector || messageSelector.length() == 0)
037            {
038                this.messageSelectorFilter = null;
039            }
040            else
041            {
042                try
043                {
044                    this.messageSelectorFilter = new SelectorParser().parse(messageSelector);
045                }
046                catch(InvalidSelectorException exc)
047                {
048                    throw new RuntimeException("Error parsing message selector: " + exc.getMessage());
049                }
050            }
051        }
052    
053        /**
054         * Returns if this consumer was closed.
055         * @return <code>true</code> if this consumer is closed
056         */
057        public boolean isClosed()
058        {
059            return closed;
060        }
061        
062        /**
063         * Returns if this consumer can consume an incoming message,
064         * i.e. if a <code>MessageListener</code> is registered,
065         * the receiver isn't closed and has an approriate selector.
066         * @return <code>true</code> if this receiver can consume the message
067         */
068        public boolean canConsume(Message message)
069        {
070            if(messageListener == null) return false;
071            if(isClosed()) return false;
072            return matchesMessageSelector(message);
073        }
074        
075        /**
076         * Adds a message that is immediately propagated to the
077         * message listener. If there's no message listener,
078         * nothing happens.
079         * @param message the message
080         */
081        public void receiveMessage(Message message)
082        {
083            if(null == messageListener) return;
084            messageListener.onMessage(message);
085        }
086    
087        public String getMessageSelector() throws JMSException
088        {
089            connection.throwJMSException();
090            return messageSelector;
091        }
092    
093        public MessageListener getMessageListener() throws JMSException
094        {
095            connection.throwJMSException();
096            return messageListener;
097        }
098    
099        public void setMessageListener(MessageListener messageListener) throws JMSException
100        {
101            connection.throwJMSException();
102            this.messageListener = messageListener;
103        }
104    
105        public Message receive(long timeout) throws JMSException
106        {
107            connection.throwJMSException();
108            return receive();
109        }
110    
111        public Message receiveNoWait() throws JMSException
112        {
113            connection.throwJMSException();
114            return receive();
115        }
116    
117        public void close() throws JMSException
118        {
119            connection.throwJMSException();
120            closed = true;
121        }
122        
123        private boolean matchesMessageSelector(Message message)
124        {
125            if(!connection.getConfigurationManager().getUseMessageSelectors()) return true;
126            if(messageSelectorFilter == null) return true;
127            try
128            {
129                return messageSelectorFilter.matches(message);
130            }
131            catch(JMSException exc)
132            {
133                throw new RuntimeException(exc.getMessage());
134            }
135        }
136        
137        protected Filter getMessageFilter()
138        {
139            return messageSelectorFilter;
140        }
141        
142        protected MockConnection getConnection()
143        {
144            return connection;
145        }
146    }