001 package com.mockrunner.mock.jms;
002
003 import java.io.Serializable;
004
005 import javax.jms.InvalidSelectorException;
006 import javax.jms.JMSException;
007 import javax.jms.Message;
008 import javax.jms.MessageConsumer;
009 import javax.jms.MessageListener;
010
011 import org.activemq.filter.mockrunner.Filter;
012 import org.activemq.selector.mockrunner.SelectorParser;
013
014 /**
015 * Mock implementation of JMS <code>MessageConsumer</code>.
016 */
017 public abstract class MockMessageConsumer implements MessageConsumer, Serializable
018 {
019 private MockConnection connection;
020 private String messageSelector;
021 private Filter messageSelectorFilter;
022 private boolean closed;
023 private MessageListener messageListener;
024
025 public MockMessageConsumer(MockConnection connection, String messageSelector)
026 {
027 this.connection = connection;
028 this.messageSelector = messageSelector;
029 parseMessageSelector();
030 closed = false;
031 messageListener = null;
032 }
033
034 private void parseMessageSelector()
035 {
036 if(null == messageSelector || messageSelector.length() == 0)
037 {
038 this.messageSelectorFilter = null;
039 }
040 else
041 {
042 try
043 {
044 this.messageSelectorFilter = new SelectorParser().parse(messageSelector);
045 }
046 catch(InvalidSelectorException exc)
047 {
048 throw new RuntimeException("Error parsing message selector: " + exc.getMessage());
049 }
050 }
051 }
052
053 /**
054 * Returns if this consumer was closed.
055 * @return <code>true</code> if this consumer is closed
056 */
057 public boolean isClosed()
058 {
059 return closed;
060 }
061
062 /**
063 * Returns if this consumer can consume an incoming message,
064 * i.e. if a <code>MessageListener</code> is registered,
065 * the receiver isn't closed and has an approriate selector.
066 * @return <code>true</code> if this receiver can consume the message
067 */
068 public boolean canConsume(Message message)
069 {
070 if(messageListener == null) return false;
071 if(isClosed()) return false;
072 return matchesMessageSelector(message);
073 }
074
075 /**
076 * Adds a message that is immediately propagated to the
077 * message listener. If there's no message listener,
078 * nothing happens.
079 * @param message the message
080 */
081 public void receiveMessage(Message message)
082 {
083 if(null == messageListener) return;
084 messageListener.onMessage(message);
085 }
086
087 public String getMessageSelector() throws JMSException
088 {
089 connection.throwJMSException();
090 return messageSelector;
091 }
092
093 public MessageListener getMessageListener() throws JMSException
094 {
095 connection.throwJMSException();
096 return messageListener;
097 }
098
099 public void setMessageListener(MessageListener messageListener) throws JMSException
100 {
101 connection.throwJMSException();
102 this.messageListener = messageListener;
103 }
104
105 public Message receive(long timeout) throws JMSException
106 {
107 connection.throwJMSException();
108 return receive();
109 }
110
111 public Message receiveNoWait() throws JMSException
112 {
113 connection.throwJMSException();
114 return receive();
115 }
116
117 public void close() throws JMSException
118 {
119 connection.throwJMSException();
120 closed = true;
121 }
122
123 private boolean matchesMessageSelector(Message message)
124 {
125 if(!connection.getConfigurationManager().getUseMessageSelectors()) return true;
126 if(messageSelectorFilter == null) return true;
127 try
128 {
129 return messageSelectorFilter.matches(message);
130 }
131 catch(JMSException exc)
132 {
133 throw new RuntimeException(exc.getMessage());
134 }
135 }
136
137 protected Filter getMessageFilter()
138 {
139 return messageSelectorFilter;
140 }
141
142 protected MockConnection getConnection()
143 {
144 return connection;
145 }
146 }