001    package com.mockrunner.mock.jms;
002    
003    import java.util.Iterator;
004    import java.util.List;
005    import java.util.Map;
006    
007    import javax.jms.JMSException;
008    import javax.jms.Message;
009    import javax.jms.MessageListener;
010    import javax.jms.Topic;
011    
012    /**
013     * Mock implementation of JMS <code>Topic</code>.
014     */
015    public class MockTopic extends MockDestination implements Topic
016    {
017        private String name;
018        
019        public MockTopic(String name)
020        {
021            this.name = name;
022        }
023            
024        public String getTopicName() throws JMSException
025        {
026            return name;
027        }
028        
029        /**
030         * Adds a message to this <code>Topic</code> that will
031         * be propagated to the corresponding receivers.
032         * @param message the message
033         */
034        public void addMessage(Message message) throws JMSException
035        {
036            addReceivedMessage(message);    
037            boolean isConsumed = false;
038            Iterator sessionsIterator = sessionSet().iterator();
039            while(sessionsIterator.hasNext())
040            {
041                MockSession session = (MockSession)sessionsIterator.next();
042                MessageListener globalListener = session.getMessageListener();
043                if(null != globalListener)
044                {
045                    globalListener.onMessage(message);
046                    isConsumed = true;
047                    acknowledgeMessage(message, session);
048                }
049                else
050                {
051                    List subscribers = session.getTopicTransmissionManager().getTopicSubscriberList(name);
052                    for(int ii = 0; ii < subscribers.size(); ii++)
053                    {
054                        MockTopicSubscriber subscriber = (MockTopicSubscriber)subscribers.get(ii);
055                        if(subscriber.canConsume(message))
056                        {
057                            subscriber.receiveMessage(message);
058                            isConsumed = true;
059                            acknowledgeMessage(message, session);
060                        }
061                    }
062                    Map durableSubscribers = session.getTopicTransmissionManager().getDurableTopicSubscriberMap(name);
063                    Iterator keys = durableSubscribers.keySet().iterator();
064                    while(keys.hasNext())
065                    {
066                        MockTopicSubscriber subscriber = (MockTopicSubscriber)durableSubscribers.get(keys.next());
067                        if(subscriber.canConsume(message))
068                        {
069                            subscriber.receiveMessage(message);
070                            isConsumed = true;
071                            acknowledgeMessage(message, session);
072                        }
073                    }
074                }
075            }
076            if(!isConsumed)
077            {
078                addCurrentMessage(message);
079            }
080        }
081    }