001 package com.mockrunner.mock.jms; 002 003 import java.io.Serializable; 004 005 import javax.jms.InvalidSelectorException; 006 import javax.jms.JMSException; 007 import javax.jms.Message; 008 import javax.jms.MessageConsumer; 009 import javax.jms.MessageListener; 010 011 import org.activemq.filter.mockrunner.Filter; 012 import org.activemq.selector.mockrunner.SelectorParser; 013 014 /** 015 * Mock implementation of JMS <code>MessageConsumer</code>. 016 */ 017 public abstract class MockMessageConsumer implements MessageConsumer, Serializable 018 { 019 private MockConnection connection; 020 private String messageSelector; 021 private Filter messageSelectorFilter; 022 private boolean closed; 023 private MessageListener messageListener; 024 025 public MockMessageConsumer(MockConnection connection, String messageSelector) 026 { 027 this.connection = connection; 028 this.messageSelector = messageSelector; 029 parseMessageSelector(); 030 closed = false; 031 messageListener = null; 032 } 033 034 private void parseMessageSelector() 035 { 036 if(null == messageSelector || messageSelector.length() == 0) 037 { 038 this.messageSelectorFilter = null; 039 } 040 else 041 { 042 try 043 { 044 this.messageSelectorFilter = new SelectorParser().parse(messageSelector); 045 } 046 catch(InvalidSelectorException exc) 047 { 048 throw new RuntimeException("Error parsing message selector: " + exc.getMessage()); 049 } 050 } 051 } 052 053 /** 054 * Returns if this consumer was closed. 055 * @return <code>true</code> if this consumer is closed 056 */ 057 public boolean isClosed() 058 { 059 return closed; 060 } 061 062 /** 063 * Returns if this consumer can consume an incoming message, 064 * i.e. if a <code>MessageListener</code> is registered, 065 * the receiver isn't closed and has an approriate selector. 066 * @return <code>true</code> if this receiver can consume the message 067 */ 068 public boolean canConsume(Message message) 069 { 070 if(messageListener == null) return false; 071 if(isClosed()) return false; 072 return matchesMessageSelector(message); 073 } 074 075 /** 076 * Adds a message that is immediately propagated to the 077 * message listener. If there's no message listener, 078 * nothing happens. 079 * @param message the message 080 */ 081 public void receiveMessage(Message message) 082 { 083 if(null == messageListener) return; 084 messageListener.onMessage(message); 085 } 086 087 public String getMessageSelector() throws JMSException 088 { 089 connection.throwJMSException(); 090 return messageSelector; 091 } 092 093 public MessageListener getMessageListener() throws JMSException 094 { 095 connection.throwJMSException(); 096 return messageListener; 097 } 098 099 public void setMessageListener(MessageListener messageListener) throws JMSException 100 { 101 connection.throwJMSException(); 102 this.messageListener = messageListener; 103 } 104 105 public Message receive(long timeout) throws JMSException 106 { 107 connection.throwJMSException(); 108 return receive(); 109 } 110 111 public Message receiveNoWait() throws JMSException 112 { 113 connection.throwJMSException(); 114 return receive(); 115 } 116 117 public void close() throws JMSException 118 { 119 connection.throwJMSException(); 120 closed = true; 121 } 122 123 private boolean matchesMessageSelector(Message message) 124 { 125 if(!connection.getConfigurationManager().getUseMessageSelectors()) return true; 126 if(messageSelectorFilter == null) return true; 127 try 128 { 129 return messageSelectorFilter.matches(message); 130 } 131 catch(JMSException exc) 132 { 133 throw new RuntimeException(exc.getMessage()); 134 } 135 } 136 137 protected Filter getMessageFilter() 138 { 139 return messageSelectorFilter; 140 } 141 142 protected MockConnection getConnection() 143 { 144 return connection; 145 } 146 }