001    package com.mockrunner.mock.jms;
002    
003    import java.io.Serializable;
004    import java.util.ArrayList;
005    import java.util.Collection;
006    import java.util.Collections;
007    import java.util.HashSet;
008    import java.util.Iterator;
009    import java.util.List;
010    import java.util.Set;
011    
012    import javax.jms.BytesMessage;
013    import javax.jms.Destination;
014    import javax.jms.InvalidDestinationException;
015    import javax.jms.JMSException;
016    import javax.jms.MapMessage;
017    import javax.jms.Message;
018    import javax.jms.MessageConsumer;
019    import javax.jms.MessageListener;
020    import javax.jms.MessageProducer;
021    import javax.jms.ObjectMessage;
022    import javax.jms.Queue;
023    import javax.jms.QueueBrowser;
024    import javax.jms.Session;
025    import javax.jms.StreamMessage;
026    import javax.jms.TemporaryQueue;
027    import javax.jms.TemporaryTopic;
028    import javax.jms.TextMessage;
029    import javax.jms.Topic;
030    import javax.jms.TopicSubscriber;
031    
032    import com.mockrunner.jms.GenericTransmissionManager;
033    import com.mockrunner.jms.MessageManager;
034    import com.mockrunner.jms.QueueTransmissionManager;
035    import com.mockrunner.jms.TopicTransmissionManager;
036    import com.mockrunner.jms.TransmissionManagerWrapper;
037    
038    /**
039     * Mock implementation of JMS <code>Session</code>.
040     * 
041     * Please note that this implementation does not
042     * implement transaction isolation at the moment.
043     * Messages are immediately sent. If acknowledge
044     * mode is AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE,
045     * the message will be automatically acknowledged,
046     * otherwise, it will not be acknowledged. According
047     * to JMS specification, the acknowledged mode must
048     * be ignored for transacted sessions. This is currently
049     * not implemented, i.e. transacted sessions behave like
050     * sessions with acknowledge mode AUTO_ACKNOWLEDGE. 
051     * Messages are acknowledged even if the transaction is 
052     * rolled back. However, the framework keeps track if a
053     * transaction is committed or rolled back, so you can test 
054     * this and rely on the container for the rest.
055     * You can set a <code>MessageListener</code> directly to
056     * the session. This is an application server internal feature 
057     * and not meant for application use in JMS. 
058     * This mock session dispatches any message of any 
059     * known <code>Queue</code> and <code>Topic</code> to the 
060     * distinguished <code>MessageListener</code>, if such 
061     * a <code>MessageListener</code> is registered.
062     */
063    public class MockSession implements Session, Serializable
064    {
065        private MockConnection connection;
066        private QueueTransmissionManager queueTransManager;
067        private TopicTransmissionManager topicTransManager;
068        private GenericTransmissionManager genericTransManager;
069        private TransmissionManagerWrapper transManager;
070        private MessageManager messageManager;
071        private MessageListener messageListener;
072        private List tempQueues;
073        private List tempTopics;
074        private Set queues;
075        private Set topics;
076        private boolean transacted;
077        private int acknowledgeMode;
078        private int numberCommits;
079        private int numberRollbacks;
080        private boolean recovered;
081        private boolean closed;
082        
083        public MockSession(MockConnection connection, boolean transacted, int acknowledgeMode)
084        {
085            this.connection = connection;
086            this.transacted = transacted;
087            this.acknowledgeMode = acknowledgeMode;
088            queueTransManager = new QueueTransmissionManager(connection, this);
089            topicTransManager = new TopicTransmissionManager(connection, this);
090            genericTransManager = new GenericTransmissionManager(connection, this);
091            transManager = new TransmissionManagerWrapper(queueTransManager, topicTransManager, genericTransManager);
092            messageManager = new MessageManager();
093            tempQueues = new ArrayList();
094            tempTopics = new ArrayList();
095            queues = new HashSet();
096            topics = new HashSet();
097            messageListener = null;
098            numberCommits = 0;
099            numberRollbacks = 0;
100            recovered = false;
101            closed = false;
102        }
103        
104        /**
105         * Returns the {@link com.mockrunner.jms.QueueTransmissionManager}.
106         * @return the {@link com.mockrunner.jms.QueueTransmissionManager}
107         */
108        public QueueTransmissionManager getQueueTransmissionManager()
109        {
110            return queueTransManager;
111        }
112        
113        /**
114         * Returns the {@link com.mockrunner.jms.TopicTransmissionManager}.
115         * @return the {@link com.mockrunner.jms.TopicTransmissionManager}
116         */
117        public TopicTransmissionManager getTopicTransmissionManager()
118        {
119            return topicTransManager;
120        }
121        
122        /**
123         * Returns the {@link com.mockrunner.jms.GenericTransmissionManager}.
124         * @return the {@link com.mockrunner.jms.GenericTransmissionManager}
125         */
126        public GenericTransmissionManager getGenericTransmissionManager()
127        {
128            return genericTransManager;
129        }
130        
131        /**
132         * @deprecated use {@link #getTransmissionManagerWrapper}
133         */
134        public TransmissionManagerWrapper getTransmissionManager()
135        {
136            return getTransmissionManagerWrapper();
137        }
138        
139        /**
140         * Returns the {@link com.mockrunner.jms.TransmissionManagerWrapper}.
141         * @return the {@link com.mockrunner.jms.TransmissionManagerWrapper}
142         */
143        public TransmissionManagerWrapper getTransmissionManagerWrapper()
144        {
145            return transManager;
146        }
147        
148        /**
149         * Returns the {@link MessageManager} for this session.
150         * @return the {@link MessageManager}
151         */
152        public MessageManager getMessageManager()
153        {
154            return messageManager;
155        }
156        
157        /**
158         * Returns the list of temporary queues.
159         * @return the <code>TemporaryQueue</code> list
160         */
161        public List getTemporaryQueueList()
162        {
163            return Collections.unmodifiableList(tempQueues);
164        }
165    
166        /**
167         * Returns a <code>TemporaryQueue</code> by its index. The
168         * index represent the number of the queue. Returns <code>null</code>
169         * if no such <code>TemporaryQueue</code> is present.
170         * @param index the index
171         * @return the <code>TemporaryQueue</code>
172         */
173        public MockTemporaryQueue getTemporaryQueue(int index)
174        {
175            if(tempQueues.size() <= index || index < 0) return null;
176            return (MockTemporaryQueue)tempQueues.get(index);
177        }
178        
179        /**
180         * Returns the list of temporary topics.
181         * @return the <code>TemporaryTopic</code> list
182         */
183        public List getTemporaryTopicList()
184        {
185            return Collections.unmodifiableList(tempTopics);
186        }
187    
188        /**
189         * Returns a <code>TemporaryTopic</code> by its index. The
190         * index represent the number of the topic. Returns <code>null</code>
191         * if no such <code>TemporaryTopic</code> is present.
192         * @param index the index
193         * @return the <code>TemporaryTopic</code>
194         */
195        public MockTemporaryTopic getTemporaryTopic(int index)
196        {
197            if(tempTopics.size() <= index || index < 0) return null;
198            return (MockTemporaryTopic)tempTopics.get(index);
199        }
200        
201        /**
202         * Returns if this session was closed.
203         * @return <code>true</code> if this session is closed
204         */
205        public boolean isClosed()
206        {
207            return closed;
208        }
209    
210        /**
211         * Returns if this session was recovered.
212         * @return <code>true</code> if this session was recovered
213         */
214        public boolean isRecovered()
215        {
216            return recovered;
217        }
218    
219        /**
220         * Returns if the current transaction was committed.
221         * @return <code>true</code> if the transaction was committed
222         */
223        public boolean isCommitted()
224        {
225            return (numberCommits > 0);
226        }
227        
228        /**
229         * Returns the number of commits.
230         * @return the number of commits
231         */
232        public int getNumberCommits()
233        {
234            return numberCommits;
235        }
236    
237        /**
238         * Returns if the current transaction was rolled back.
239         * @return <code>true</code> if the transaction was rolled back
240         */
241        public boolean isRolledBack()
242        {
243            return (numberRollbacks > 0);
244        }
245        
246        /**
247         * Returns the number of rollbacks.
248         * @return the number of rollbacks
249         */
250        public int getNumberRollbacks()
251        {
252            return numberRollbacks;
253        }
254        
255        /**
256         * Returns if messages should be automatically acknowledged,
257         * i.e. if the acknowledge mode is not <code>CLIENT_ACKNOWLEDGE</code>.
258         * @return <code>true</code> if messages are automatically acknowledged
259         */
260        public boolean isAutoAcknowledge()
261        {
262            return acknowledgeMode != CLIENT_ACKNOWLEDGE;
263        }
264        
265        /**
266         * Note: Returns <code>0</code> if the session is transacted.
267         * This method does not exist in JMS 1.0.2. In JMS 1.1 it
268         * should return <code>Session.SESSION_TRANSACTED</code>
269         * which is specified as <code>0</code>. In order to avoid
270         * different versions for JMS 1.0.2 and 1.1 
271         * (<code>Session.SESSION_TRANSACTED</code> does not
272         * exist in 1.0.2) this method returns hardcoded <code>0</code>,
273         * if the session is transacted.
274         * @return the acknowledge mode
275         */
276        public int getAcknowledgeMode() throws JMSException
277        {
278            if(getTransacted()) return 0;
279            return acknowledgeMode;
280        }
281        
282        public boolean getTransacted() throws JMSException
283        {
284            connection.throwJMSException();
285            return transacted;
286        }
287        
288        public BytesMessage createBytesMessage() throws JMSException
289        {
290            connection.throwJMSException();
291            return getMessageManager().createBytesMessage();
292        }
293    
294        public MapMessage createMapMessage() throws JMSException
295        {
296            connection.throwJMSException();
297            return getMessageManager().createMapMessage();
298        }
299    
300        public Message createMessage() throws JMSException
301        {
302            connection.throwJMSException();
303            return getMessageManager().createMessage();
304        }
305    
306        public ObjectMessage createObjectMessage() throws JMSException
307        {
308            connection.throwJMSException();
309            return createObjectMessage(null);
310        }
311    
312        public ObjectMessage createObjectMessage(Serializable object) throws JMSException
313        {
314            connection.throwJMSException();
315            return getMessageManager().createObjectMessage(object);
316        }
317    
318        public StreamMessage createStreamMessage() throws JMSException
319        {
320            connection.throwJMSException();
321            return getMessageManager().createStreamMessage();
322        }
323    
324        public TextMessage createTextMessage() throws JMSException
325        {
326            connection.throwJMSException();
327            return createTextMessage(null);
328        }
329    
330        public TextMessage createTextMessage(String text) throws JMSException
331        {
332            connection.throwJMSException();
333            return getMessageManager().createTextMessage(text);
334        }
335        
336        public MessageListener getMessageListener() throws JMSException
337        {
338            connection.throwJMSException();
339            return messageListener;
340        }
341    
342        public void setMessageListener(MessageListener messageListener) throws JMSException
343        {
344            connection.throwJMSException();
345            this.messageListener = messageListener;
346        }
347        
348        public void run()
349        {
350        
351        }
352            
353        public void commit() throws JMSException
354        {
355            connection.throwJMSException();
356            numberCommits++;
357        }
358    
359        public void rollback() throws JMSException
360        {
361            connection.throwJMSException();
362            recover();
363            numberRollbacks++;
364        }
365    
366        public void close() throws JMSException
367        {
368            connection.throwJMSException();
369            if(getTransacted() && !isCommitted())
370            {
371                rollback();
372            }
373            getQueueTransmissionManager().closeAll();
374            getTopicTransmissionManager().closeAll();
375            getGenericTransmissionManager().closeAll();
376            removeSessionFromDestinations(tempQueues);
377            removeSessionFromDestinations(tempTopics);
378            removeSessionFromDestinations(queues);
379            removeSessionFromDestinations(topics);
380            queues.clear();
381            topics.clear();
382            closed = true;
383        }
384        
385        private void removeSessionFromDestinations(Collection destinations)
386        {
387            Iterator iterator = destinations.iterator();
388            while(iterator.hasNext())
389            {
390                Object currentDestination = (Object)iterator.next();
391                if(currentDestination instanceof MockDestination)
392                ((MockDestination)currentDestination).removeSession(this);
393            }
394        }
395    
396        public void recover() throws JMSException
397        {
398            connection.throwJMSException();
399            recovered = true;
400        }
401        
402        public void unsubscribe(String name) throws JMSException
403        {
404            getConnection().throwJMSException();
405            topicTransManager.removeTopicDurableSubscriber(name);
406        }
407        
408        public Queue createQueue(String name) throws JMSException
409        {
410            getConnection().throwJMSException();
411            MockQueue queue = ((MockConnection)getConnection()).getDestinationManager().getQueue(name);
412            if(null == queue)
413            {
414                throw new JMSException("Queue with name " + name + " not found");
415            }
416            addSessionToQueue(queue);
417            return queue;
418        }
419    
420        public TemporaryQueue createTemporaryQueue() throws JMSException
421        {
422            getConnection().throwJMSException();
423            MockTemporaryQueue queue = new MockTemporaryQueue();
424            tempQueues.add(queue);
425            addSessionToQueue(queue);
426            return queue;
427        }
428        
429        public Topic createTopic(String name) throws JMSException
430        {
431            getConnection().throwJMSException();
432            MockTopic topic = ((MockConnection)getConnection()).getDestinationManager().getTopic(name);
433            if(null == topic)
434            {
435                throw new JMSException("Topic with name " + name + " not found");
436            }
437            addSessionToTopic(topic);
438            return topic;
439        }
440    
441        public TemporaryTopic createTemporaryTopic() throws JMSException
442        {
443            getConnection().throwJMSException();
444            MockTemporaryTopic topic = new MockTemporaryTopic();
445            tempTopics.add(topic);
446            addSessionToTopic(topic);
447            return topic;
448        }
449        
450        public MessageConsumer createConsumer(Destination destination) throws JMSException
451        {
452            getConnection().throwJMSException();
453            return createConsumer(destination, null);
454        }
455        
456        public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
457        {
458            getConnection().throwJMSException();
459            return createConsumer(destination, messageSelector, false);
460        }
461        
462        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException
463        {
464            if(null == destination)
465            {
466                throw new IllegalArgumentException("destination must not be null");
467            }
468            getConnection().throwJMSException();      
469            if(destination instanceof MockQueue)
470            {
471                addSessionToQueue((Queue)destination);
472                return getQueueTransmissionManager().createQueueReceiver((MockQueue)destination, messageSelector);
473            }
474            else if(destination instanceof MockTopic)
475            {
476                addSessionToTopic((Topic)destination);
477                return getTopicTransmissionManager().createTopicSubscriber((MockTopic)destination, messageSelector, noLocal);
478            }
479            else
480            {
481                throw new InvalidDestinationException("destination must be an instance of MockQueue or MockTopic");
482            }
483        }
484        
485        public MessageProducer createProducer(Destination destination) throws JMSException
486        {
487            getConnection().throwJMSException();
488            if(null == destination)
489            {
490                return createProducerForNullDestination();
491            }
492            if(destination instanceof MockQueue)
493            {
494                addSessionToQueue((Queue)destination);
495                return getQueueTransmissionManager().createQueueSender((MockQueue)destination);
496            }
497            else if(destination instanceof MockTopic)
498            {
499                addSessionToTopic((Topic)destination);
500                return getTopicTransmissionManager().createTopicPublisher((MockTopic)destination);
501            }
502            else
503            {
504                throw new InvalidDestinationException("destination must be an instance of MockQueue or MockTopic");
505            }
506        }
507        
508        public QueueBrowser createBrowser(Queue queue) throws JMSException
509        {
510            getConnection().throwJMSException();
511            return createBrowser(queue, null);
512        }
513    
514        public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
515        {
516            getConnection().throwJMSException();
517            if(!(queue instanceof MockQueue))
518            {
519                throw new InvalidDestinationException("queue must be an instance of MockQueue");
520            }
521            addSessionToQueue(queue);
522            return queueTransManager.createQueueBrowser((MockQueue)queue, messageSelector);
523        }
524        
525        public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
526        {
527            getConnection().throwJMSException();
528            return createDurableSubscriber(topic, name, null, false);
529        }
530    
531        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException
532        {
533            getConnection().throwJMSException();
534            if(!(topic instanceof MockTopic))
535            {
536                throw new InvalidDestinationException("topic must be an instance of MockTopic");
537            }
538            addSessionToTopic(topic);
539            return topicTransManager.createDurableTopicSubscriber((MockTopic)topic, name, messageSelector, noLocal);
540        }
541        
542        protected MockConnection getConnection()
543        {
544            return connection;
545        }
546        
547        public void addSessionToQueue(Queue queue)
548        {
549            if(queue instanceof MockQueue)
550            {
551                ((MockQueue)queue).addSession(this);
552                queues.add(queue);
553            }
554        }
555        
556        public void addSessionToTopic(Topic topic)
557        {
558            if(topic instanceof MockTopic)
559            {
560                ((MockTopic)topic).addSession(this);
561                topics.add(topic);
562            }
563        }
564        
565        protected MessageProducer createProducerForNullDestination()
566        {
567            return getGenericTransmissionManager().createMessageProducer();
568        }
569    }