001 package com.mockrunner.mock.jms;
002
003 import java.io.Serializable;
004 import java.util.ArrayList;
005 import java.util.Collection;
006 import java.util.Collections;
007 import java.util.HashSet;
008 import java.util.Iterator;
009 import java.util.List;
010 import java.util.Set;
011
012 import javax.jms.BytesMessage;
013 import javax.jms.Destination;
014 import javax.jms.InvalidDestinationException;
015 import javax.jms.JMSException;
016 import javax.jms.MapMessage;
017 import javax.jms.Message;
018 import javax.jms.MessageConsumer;
019 import javax.jms.MessageListener;
020 import javax.jms.MessageProducer;
021 import javax.jms.ObjectMessage;
022 import javax.jms.Queue;
023 import javax.jms.QueueBrowser;
024 import javax.jms.Session;
025 import javax.jms.StreamMessage;
026 import javax.jms.TemporaryQueue;
027 import javax.jms.TemporaryTopic;
028 import javax.jms.TextMessage;
029 import javax.jms.Topic;
030 import javax.jms.TopicSubscriber;
031
032 import com.mockrunner.jms.GenericTransmissionManager;
033 import com.mockrunner.jms.MessageManager;
034 import com.mockrunner.jms.QueueTransmissionManager;
035 import com.mockrunner.jms.TopicTransmissionManager;
036 import com.mockrunner.jms.TransmissionManagerWrapper;
037
038 /**
039 * Mock implementation of JMS <code>Session</code>.
040 *
041 * Please note that this implementation does not
042 * implement transaction isolation at the moment.
043 * Messages are immediately sent. If acknowledge
044 * mode is AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE,
045 * the message will be automatically acknowledged,
046 * otherwise, it will not be acknowledged. According
047 * to JMS specification, the acknowledged mode must
048 * be ignored for transacted sessions. This is currently
049 * not implemented, i.e. transacted sessions behave like
050 * sessions with acknowledge mode AUTO_ACKNOWLEDGE.
051 * Messages are acknowledged even if the transaction is
052 * rolled back. However, the framework keeps track if a
053 * transaction is committed or rolled back, so you can test
054 * this and rely on the container for the rest.
055 * You can set a <code>MessageListener</code> directly to
056 * the session. This is an application server internal feature
057 * and not meant for application use in JMS.
058 * This mock session dispatches any message of any
059 * known <code>Queue</code> and <code>Topic</code> to the
060 * distinguished <code>MessageListener</code>, if such
061 * a <code>MessageListener</code> is registered.
062 */
063 public class MockSession implements Session, Serializable
064 {
065 private MockConnection connection;
066 private QueueTransmissionManager queueTransManager;
067 private TopicTransmissionManager topicTransManager;
068 private GenericTransmissionManager genericTransManager;
069 private TransmissionManagerWrapper transManager;
070 private MessageManager messageManager;
071 private MessageListener messageListener;
072 private List tempQueues;
073 private List tempTopics;
074 private Set queues;
075 private Set topics;
076 private boolean transacted;
077 private int acknowledgeMode;
078 private int numberCommits;
079 private int numberRollbacks;
080 private boolean recovered;
081 private boolean closed;
082
083 public MockSession(MockConnection connection, boolean transacted, int acknowledgeMode)
084 {
085 this.connection = connection;
086 this.transacted = transacted;
087 this.acknowledgeMode = acknowledgeMode;
088 queueTransManager = new QueueTransmissionManager(connection, this);
089 topicTransManager = new TopicTransmissionManager(connection, this);
090 genericTransManager = new GenericTransmissionManager(connection, this);
091 transManager = new TransmissionManagerWrapper(queueTransManager, topicTransManager, genericTransManager);
092 messageManager = new MessageManager();
093 tempQueues = new ArrayList();
094 tempTopics = new ArrayList();
095 queues = new HashSet();
096 topics = new HashSet();
097 messageListener = null;
098 numberCommits = 0;
099 numberRollbacks = 0;
100 recovered = false;
101 closed = false;
102 }
103
104 /**
105 * Returns the {@link com.mockrunner.jms.QueueTransmissionManager}.
106 * @return the {@link com.mockrunner.jms.QueueTransmissionManager}
107 */
108 public QueueTransmissionManager getQueueTransmissionManager()
109 {
110 return queueTransManager;
111 }
112
113 /**
114 * Returns the {@link com.mockrunner.jms.TopicTransmissionManager}.
115 * @return the {@link com.mockrunner.jms.TopicTransmissionManager}
116 */
117 public TopicTransmissionManager getTopicTransmissionManager()
118 {
119 return topicTransManager;
120 }
121
122 /**
123 * Returns the {@link com.mockrunner.jms.GenericTransmissionManager}.
124 * @return the {@link com.mockrunner.jms.GenericTransmissionManager}
125 */
126 public GenericTransmissionManager getGenericTransmissionManager()
127 {
128 return genericTransManager;
129 }
130
131 /**
132 * @deprecated use {@link #getTransmissionManagerWrapper}
133 */
134 public TransmissionManagerWrapper getTransmissionManager()
135 {
136 return getTransmissionManagerWrapper();
137 }
138
139 /**
140 * Returns the {@link com.mockrunner.jms.TransmissionManagerWrapper}.
141 * @return the {@link com.mockrunner.jms.TransmissionManagerWrapper}
142 */
143 public TransmissionManagerWrapper getTransmissionManagerWrapper()
144 {
145 return transManager;
146 }
147
148 /**
149 * Returns the {@link MessageManager} for this session.
150 * @return the {@link MessageManager}
151 */
152 public MessageManager getMessageManager()
153 {
154 return messageManager;
155 }
156
157 /**
158 * Returns the list of temporary queues.
159 * @return the <code>TemporaryQueue</code> list
160 */
161 public List getTemporaryQueueList()
162 {
163 return Collections.unmodifiableList(tempQueues);
164 }
165
166 /**
167 * Returns a <code>TemporaryQueue</code> by its index. The
168 * index represent the number of the queue. Returns <code>null</code>
169 * if no such <code>TemporaryQueue</code> is present.
170 * @param index the index
171 * @return the <code>TemporaryQueue</code>
172 */
173 public MockTemporaryQueue getTemporaryQueue(int index)
174 {
175 if(tempQueues.size() <= index || index < 0) return null;
176 return (MockTemporaryQueue)tempQueues.get(index);
177 }
178
179 /**
180 * Returns the list of temporary topics.
181 * @return the <code>TemporaryTopic</code> list
182 */
183 public List getTemporaryTopicList()
184 {
185 return Collections.unmodifiableList(tempTopics);
186 }
187
188 /**
189 * Returns a <code>TemporaryTopic</code> by its index. The
190 * index represent the number of the topic. Returns <code>null</code>
191 * if no such <code>TemporaryTopic</code> is present.
192 * @param index the index
193 * @return the <code>TemporaryTopic</code>
194 */
195 public MockTemporaryTopic getTemporaryTopic(int index)
196 {
197 if(tempTopics.size() <= index || index < 0) return null;
198 return (MockTemporaryTopic)tempTopics.get(index);
199 }
200
201 /**
202 * Returns if this session was closed.
203 * @return <code>true</code> if this session is closed
204 */
205 public boolean isClosed()
206 {
207 return closed;
208 }
209
210 /**
211 * Returns if this session was recovered.
212 * @return <code>true</code> if this session was recovered
213 */
214 public boolean isRecovered()
215 {
216 return recovered;
217 }
218
219 /**
220 * Returns if the current transaction was committed.
221 * @return <code>true</code> if the transaction was committed
222 */
223 public boolean isCommitted()
224 {
225 return (numberCommits > 0);
226 }
227
228 /**
229 * Returns the number of commits.
230 * @return the number of commits
231 */
232 public int getNumberCommits()
233 {
234 return numberCommits;
235 }
236
237 /**
238 * Returns if the current transaction was rolled back.
239 * @return <code>true</code> if the transaction was rolled back
240 */
241 public boolean isRolledBack()
242 {
243 return (numberRollbacks > 0);
244 }
245
246 /**
247 * Returns the number of rollbacks.
248 * @return the number of rollbacks
249 */
250 public int getNumberRollbacks()
251 {
252 return numberRollbacks;
253 }
254
255 /**
256 * Returns if messages should be automatically acknowledged,
257 * i.e. if the acknowledge mode is not <code>CLIENT_ACKNOWLEDGE</code>.
258 * @return <code>true</code> if messages are automatically acknowledged
259 */
260 public boolean isAutoAcknowledge()
261 {
262 return acknowledgeMode != CLIENT_ACKNOWLEDGE;
263 }
264
265 /**
266 * Note: Returns <code>0</code> if the session is transacted.
267 * This method does not exist in JMS 1.0.2. In JMS 1.1 it
268 * should return <code>Session.SESSION_TRANSACTED</code>
269 * which is specified as <code>0</code>. In order to avoid
270 * different versions for JMS 1.0.2 and 1.1
271 * (<code>Session.SESSION_TRANSACTED</code> does not
272 * exist in 1.0.2) this method returns hardcoded <code>0</code>,
273 * if the session is transacted.
274 * @return the acknowledge mode
275 */
276 public int getAcknowledgeMode() throws JMSException
277 {
278 if(getTransacted()) return 0;
279 return acknowledgeMode;
280 }
281
282 public boolean getTransacted() throws JMSException
283 {
284 connection.throwJMSException();
285 return transacted;
286 }
287
288 public BytesMessage createBytesMessage() throws JMSException
289 {
290 connection.throwJMSException();
291 return getMessageManager().createBytesMessage();
292 }
293
294 public MapMessage createMapMessage() throws JMSException
295 {
296 connection.throwJMSException();
297 return getMessageManager().createMapMessage();
298 }
299
300 public Message createMessage() throws JMSException
301 {
302 connection.throwJMSException();
303 return getMessageManager().createMessage();
304 }
305
306 public ObjectMessage createObjectMessage() throws JMSException
307 {
308 connection.throwJMSException();
309 return createObjectMessage(null);
310 }
311
312 public ObjectMessage createObjectMessage(Serializable object) throws JMSException
313 {
314 connection.throwJMSException();
315 return getMessageManager().createObjectMessage(object);
316 }
317
318 public StreamMessage createStreamMessage() throws JMSException
319 {
320 connection.throwJMSException();
321 return getMessageManager().createStreamMessage();
322 }
323
324 public TextMessage createTextMessage() throws JMSException
325 {
326 connection.throwJMSException();
327 return createTextMessage(null);
328 }
329
330 public TextMessage createTextMessage(String text) throws JMSException
331 {
332 connection.throwJMSException();
333 return getMessageManager().createTextMessage(text);
334 }
335
336 public MessageListener getMessageListener() throws JMSException
337 {
338 connection.throwJMSException();
339 return messageListener;
340 }
341
342 public void setMessageListener(MessageListener messageListener) throws JMSException
343 {
344 connection.throwJMSException();
345 this.messageListener = messageListener;
346 }
347
348 public void run()
349 {
350
351 }
352
353 public void commit() throws JMSException
354 {
355 connection.throwJMSException();
356 numberCommits++;
357 }
358
359 public void rollback() throws JMSException
360 {
361 connection.throwJMSException();
362 recover();
363 numberRollbacks++;
364 }
365
366 public void close() throws JMSException
367 {
368 connection.throwJMSException();
369 if(getTransacted() && !isCommitted())
370 {
371 rollback();
372 }
373 getQueueTransmissionManager().closeAll();
374 getTopicTransmissionManager().closeAll();
375 getGenericTransmissionManager().closeAll();
376 removeSessionFromDestinations(tempQueues);
377 removeSessionFromDestinations(tempTopics);
378 removeSessionFromDestinations(queues);
379 removeSessionFromDestinations(topics);
380 queues.clear();
381 topics.clear();
382 closed = true;
383 }
384
385 private void removeSessionFromDestinations(Collection destinations)
386 {
387 Iterator iterator = destinations.iterator();
388 while(iterator.hasNext())
389 {
390 Object currentDestination = (Object)iterator.next();
391 if(currentDestination instanceof MockDestination)
392 ((MockDestination)currentDestination).removeSession(this);
393 }
394 }
395
396 public void recover() throws JMSException
397 {
398 connection.throwJMSException();
399 recovered = true;
400 }
401
402 public void unsubscribe(String name) throws JMSException
403 {
404 getConnection().throwJMSException();
405 topicTransManager.removeTopicDurableSubscriber(name);
406 }
407
408 public Queue createQueue(String name) throws JMSException
409 {
410 getConnection().throwJMSException();
411 MockQueue queue = ((MockConnection)getConnection()).getDestinationManager().getQueue(name);
412 if(null == queue)
413 {
414 throw new JMSException("Queue with name " + name + " not found");
415 }
416 addSessionToQueue(queue);
417 return queue;
418 }
419
420 public TemporaryQueue createTemporaryQueue() throws JMSException
421 {
422 getConnection().throwJMSException();
423 MockTemporaryQueue queue = new MockTemporaryQueue();
424 tempQueues.add(queue);
425 addSessionToQueue(queue);
426 return queue;
427 }
428
429 public Topic createTopic(String name) throws JMSException
430 {
431 getConnection().throwJMSException();
432 MockTopic topic = ((MockConnection)getConnection()).getDestinationManager().getTopic(name);
433 if(null == topic)
434 {
435 throw new JMSException("Topic with name " + name + " not found");
436 }
437 addSessionToTopic(topic);
438 return topic;
439 }
440
441 public TemporaryTopic createTemporaryTopic() throws JMSException
442 {
443 getConnection().throwJMSException();
444 MockTemporaryTopic topic = new MockTemporaryTopic();
445 tempTopics.add(topic);
446 addSessionToTopic(topic);
447 return topic;
448 }
449
450 public MessageConsumer createConsumer(Destination destination) throws JMSException
451 {
452 getConnection().throwJMSException();
453 return createConsumer(destination, null);
454 }
455
456 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
457 {
458 getConnection().throwJMSException();
459 return createConsumer(destination, messageSelector, false);
460 }
461
462 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException
463 {
464 if(null == destination)
465 {
466 throw new IllegalArgumentException("destination must not be null");
467 }
468 getConnection().throwJMSException();
469 if(destination instanceof MockQueue)
470 {
471 addSessionToQueue((Queue)destination);
472 return getQueueTransmissionManager().createQueueReceiver((MockQueue)destination, messageSelector);
473 }
474 else if(destination instanceof MockTopic)
475 {
476 addSessionToTopic((Topic)destination);
477 return getTopicTransmissionManager().createTopicSubscriber((MockTopic)destination, messageSelector, noLocal);
478 }
479 else
480 {
481 throw new InvalidDestinationException("destination must be an instance of MockQueue or MockTopic");
482 }
483 }
484
485 public MessageProducer createProducer(Destination destination) throws JMSException
486 {
487 getConnection().throwJMSException();
488 if(null == destination)
489 {
490 return createProducerForNullDestination();
491 }
492 if(destination instanceof MockQueue)
493 {
494 addSessionToQueue((Queue)destination);
495 return getQueueTransmissionManager().createQueueSender((MockQueue)destination);
496 }
497 else if(destination instanceof MockTopic)
498 {
499 addSessionToTopic((Topic)destination);
500 return getTopicTransmissionManager().createTopicPublisher((MockTopic)destination);
501 }
502 else
503 {
504 throw new InvalidDestinationException("destination must be an instance of MockQueue or MockTopic");
505 }
506 }
507
508 public QueueBrowser createBrowser(Queue queue) throws JMSException
509 {
510 getConnection().throwJMSException();
511 return createBrowser(queue, null);
512 }
513
514 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
515 {
516 getConnection().throwJMSException();
517 if(!(queue instanceof MockQueue))
518 {
519 throw new InvalidDestinationException("queue must be an instance of MockQueue");
520 }
521 addSessionToQueue(queue);
522 return queueTransManager.createQueueBrowser((MockQueue)queue, messageSelector);
523 }
524
525 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
526 {
527 getConnection().throwJMSException();
528 return createDurableSubscriber(topic, name, null, false);
529 }
530
531 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException
532 {
533 getConnection().throwJMSException();
534 if(!(topic instanceof MockTopic))
535 {
536 throw new InvalidDestinationException("topic must be an instance of MockTopic");
537 }
538 addSessionToTopic(topic);
539 return topicTransManager.createDurableTopicSubscriber((MockTopic)topic, name, messageSelector, noLocal);
540 }
541
542 protected MockConnection getConnection()
543 {
544 return connection;
545 }
546
547 public void addSessionToQueue(Queue queue)
548 {
549 if(queue instanceof MockQueue)
550 {
551 ((MockQueue)queue).addSession(this);
552 queues.add(queue);
553 }
554 }
555
556 public void addSessionToTopic(Topic topic)
557 {
558 if(topic instanceof MockTopic)
559 {
560 ((MockTopic)topic).addSession(this);
561 topics.add(topic);
562 }
563 }
564
565 protected MessageProducer createProducerForNullDestination()
566 {
567 return getGenericTransmissionManager().createMessageProducer();
568 }
569 }