001 package com.mockrunner.mock.jms;
002
003 import java.util.Iterator;
004 import java.util.List;
005 import java.util.Map;
006
007 import javax.jms.JMSException;
008 import javax.jms.Message;
009 import javax.jms.MessageListener;
010 import javax.jms.Topic;
011
012 /**
013 * Mock implementation of JMS <code>Topic</code>.
014 */
015 public class MockTopic extends MockDestination implements Topic
016 {
017 private String name;
018
019 public MockTopic(String name)
020 {
021 this.name = name;
022 }
023
024 public String getTopicName() throws JMSException
025 {
026 return name;
027 }
028
029 /**
030 * Adds a message to this <code>Topic</code> that will
031 * be propagated to the corresponding receivers.
032 * @param message the message
033 */
034 public void addMessage(Message message) throws JMSException
035 {
036 addReceivedMessage(message);
037 boolean isConsumed = false;
038 Iterator sessionsIterator = sessionSet().iterator();
039 while(sessionsIterator.hasNext())
040 {
041 MockSession session = (MockSession)sessionsIterator.next();
042 MessageListener globalListener = session.getMessageListener();
043 if(null != globalListener)
044 {
045 globalListener.onMessage(message);
046 isConsumed = true;
047 acknowledgeMessage(message, session);
048 }
049 else
050 {
051 List subscribers = session.getTopicTransmissionManager().getTopicSubscriberList(name);
052 for(int ii = 0; ii < subscribers.size(); ii++)
053 {
054 MockTopicSubscriber subscriber = (MockTopicSubscriber)subscribers.get(ii);
055 if(subscriber.canConsume(message))
056 {
057 subscriber.receiveMessage(message);
058 isConsumed = true;
059 acknowledgeMessage(message, session);
060 }
061 }
062 Map durableSubscribers = session.getTopicTransmissionManager().getDurableTopicSubscriberMap(name);
063 Iterator keys = durableSubscribers.keySet().iterator();
064 while(keys.hasNext())
065 {
066 MockTopicSubscriber subscriber = (MockTopicSubscriber)durableSubscribers.get(keys.next());
067 if(subscriber.canConsume(message))
068 {
069 subscriber.receiveMessage(message);
070 isConsumed = true;
071 acknowledgeMessage(message, session);
072 }
073 }
074 }
075 }
076 if(!isConsumed)
077 {
078 addCurrentMessage(message);
079 }
080 }
081 }