001 package com.mockrunner.mock.jms; 002 003 import java.util.Iterator; 004 import java.util.List; 005 import java.util.Map; 006 007 import javax.jms.JMSException; 008 import javax.jms.Message; 009 import javax.jms.MessageListener; 010 import javax.jms.Topic; 011 012 /** 013 * Mock implementation of JMS <code>Topic</code>. 014 */ 015 public class MockTopic extends MockDestination implements Topic 016 { 017 private String name; 018 019 public MockTopic(String name) 020 { 021 this.name = name; 022 } 023 024 public String getTopicName() throws JMSException 025 { 026 return name; 027 } 028 029 /** 030 * Adds a message to this <code>Topic</code> that will 031 * be propagated to the corresponding receivers. 032 * @param message the message 033 */ 034 public void addMessage(Message message) throws JMSException 035 { 036 addReceivedMessage(message); 037 boolean isConsumed = false; 038 Iterator sessionsIterator = sessionSet().iterator(); 039 while(sessionsIterator.hasNext()) 040 { 041 MockSession session = (MockSession)sessionsIterator.next(); 042 MessageListener globalListener = session.getMessageListener(); 043 if(null != globalListener) 044 { 045 globalListener.onMessage(message); 046 isConsumed = true; 047 acknowledgeMessage(message, session); 048 } 049 else 050 { 051 List subscribers = session.getTopicTransmissionManager().getTopicSubscriberList(name); 052 for(int ii = 0; ii < subscribers.size(); ii++) 053 { 054 MockTopicSubscriber subscriber = (MockTopicSubscriber)subscribers.get(ii); 055 if(subscriber.canConsume(message)) 056 { 057 subscriber.receiveMessage(message); 058 isConsumed = true; 059 acknowledgeMessage(message, session); 060 } 061 } 062 Map durableSubscribers = session.getTopicTransmissionManager().getDurableTopicSubscriberMap(name); 063 Iterator keys = durableSubscribers.keySet().iterator(); 064 while(keys.hasNext()) 065 { 066 MockTopicSubscriber subscriber = (MockTopicSubscriber)durableSubscribers.get(keys.next()); 067 if(subscriber.canConsume(message)) 068 { 069 subscriber.receiveMessage(message); 070 isConsumed = true; 071 acknowledgeMessage(message, session); 072 } 073 } 074 } 075 } 076 if(!isConsumed) 077 { 078 addCurrentMessage(message); 079 } 080 } 081 }