001 package com.mockrunner.mock.jms; 002 003 import java.io.Serializable; 004 import java.util.ArrayList; 005 import java.util.Collection; 006 import java.util.Collections; 007 import java.util.HashSet; 008 import java.util.Iterator; 009 import java.util.List; 010 import java.util.Set; 011 012 import javax.jms.BytesMessage; 013 import javax.jms.Destination; 014 import javax.jms.InvalidDestinationException; 015 import javax.jms.JMSException; 016 import javax.jms.MapMessage; 017 import javax.jms.Message; 018 import javax.jms.MessageConsumer; 019 import javax.jms.MessageListener; 020 import javax.jms.MessageProducer; 021 import javax.jms.ObjectMessage; 022 import javax.jms.Queue; 023 import javax.jms.QueueBrowser; 024 import javax.jms.Session; 025 import javax.jms.StreamMessage; 026 import javax.jms.TemporaryQueue; 027 import javax.jms.TemporaryTopic; 028 import javax.jms.TextMessage; 029 import javax.jms.Topic; 030 import javax.jms.TopicSubscriber; 031 032 import com.mockrunner.jms.GenericTransmissionManager; 033 import com.mockrunner.jms.MessageManager; 034 import com.mockrunner.jms.QueueTransmissionManager; 035 import com.mockrunner.jms.TopicTransmissionManager; 036 import com.mockrunner.jms.TransmissionManagerWrapper; 037 038 /** 039 * Mock implementation of JMS <code>Session</code>. 040 * 041 * Please note that this implementation does not 042 * implement transaction isolation at the moment. 043 * Messages are immediately sent. If acknowledge 044 * mode is AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE, 045 * the message will be automatically acknowledged, 046 * otherwise, it will not be acknowledged. According 047 * to JMS specification, the acknowledged mode must 048 * be ignored for transacted sessions. This is currently 049 * not implemented, i.e. transacted sessions behave like 050 * sessions with acknowledge mode AUTO_ACKNOWLEDGE. 051 * Messages are acknowledged even if the transaction is 052 * rolled back. However, the framework keeps track if a 053 * transaction is committed or rolled back, so you can test 054 * this and rely on the container for the rest. 055 * You can set a <code>MessageListener</code> directly to 056 * the session. This is an application server internal feature 057 * and not meant for application use in JMS. 058 * This mock session dispatches any message of any 059 * known <code>Queue</code> and <code>Topic</code> to the 060 * distinguished <code>MessageListener</code>, if such 061 * a <code>MessageListener</code> is registered. 062 */ 063 public class MockSession implements Session, Serializable 064 { 065 private MockConnection connection; 066 private QueueTransmissionManager queueTransManager; 067 private TopicTransmissionManager topicTransManager; 068 private GenericTransmissionManager genericTransManager; 069 private TransmissionManagerWrapper transManager; 070 private MessageManager messageManager; 071 private MessageListener messageListener; 072 private List tempQueues; 073 private List tempTopics; 074 private Set queues; 075 private Set topics; 076 private boolean transacted; 077 private int acknowledgeMode; 078 private int numberCommits; 079 private int numberRollbacks; 080 private boolean recovered; 081 private boolean closed; 082 083 public MockSession(MockConnection connection, boolean transacted, int acknowledgeMode) 084 { 085 this.connection = connection; 086 this.transacted = transacted; 087 this.acknowledgeMode = acknowledgeMode; 088 queueTransManager = new QueueTransmissionManager(connection, this); 089 topicTransManager = new TopicTransmissionManager(connection, this); 090 genericTransManager = new GenericTransmissionManager(connection, this); 091 transManager = new TransmissionManagerWrapper(queueTransManager, topicTransManager, genericTransManager); 092 messageManager = new MessageManager(); 093 tempQueues = new ArrayList(); 094 tempTopics = new ArrayList(); 095 queues = new HashSet(); 096 topics = new HashSet(); 097 messageListener = null; 098 numberCommits = 0; 099 numberRollbacks = 0; 100 recovered = false; 101 closed = false; 102 } 103 104 /** 105 * Returns the {@link com.mockrunner.jms.QueueTransmissionManager}. 106 * @return the {@link com.mockrunner.jms.QueueTransmissionManager} 107 */ 108 public QueueTransmissionManager getQueueTransmissionManager() 109 { 110 return queueTransManager; 111 } 112 113 /** 114 * Returns the {@link com.mockrunner.jms.TopicTransmissionManager}. 115 * @return the {@link com.mockrunner.jms.TopicTransmissionManager} 116 */ 117 public TopicTransmissionManager getTopicTransmissionManager() 118 { 119 return topicTransManager; 120 } 121 122 /** 123 * Returns the {@link com.mockrunner.jms.GenericTransmissionManager}. 124 * @return the {@link com.mockrunner.jms.GenericTransmissionManager} 125 */ 126 public GenericTransmissionManager getGenericTransmissionManager() 127 { 128 return genericTransManager; 129 } 130 131 /** 132 * @deprecated use {@link #getTransmissionManagerWrapper} 133 */ 134 public TransmissionManagerWrapper getTransmissionManager() 135 { 136 return getTransmissionManagerWrapper(); 137 } 138 139 /** 140 * Returns the {@link com.mockrunner.jms.TransmissionManagerWrapper}. 141 * @return the {@link com.mockrunner.jms.TransmissionManagerWrapper} 142 */ 143 public TransmissionManagerWrapper getTransmissionManagerWrapper() 144 { 145 return transManager; 146 } 147 148 /** 149 * Returns the {@link MessageManager} for this session. 150 * @return the {@link MessageManager} 151 */ 152 public MessageManager getMessageManager() 153 { 154 return messageManager; 155 } 156 157 /** 158 * Returns the list of temporary queues. 159 * @return the <code>TemporaryQueue</code> list 160 */ 161 public List getTemporaryQueueList() 162 { 163 return Collections.unmodifiableList(tempQueues); 164 } 165 166 /** 167 * Returns a <code>TemporaryQueue</code> by its index. The 168 * index represent the number of the queue. Returns <code>null</code> 169 * if no such <code>TemporaryQueue</code> is present. 170 * @param index the index 171 * @return the <code>TemporaryQueue</code> 172 */ 173 public MockTemporaryQueue getTemporaryQueue(int index) 174 { 175 if(tempQueues.size() <= index || index < 0) return null; 176 return (MockTemporaryQueue)tempQueues.get(index); 177 } 178 179 /** 180 * Returns the list of temporary topics. 181 * @return the <code>TemporaryTopic</code> list 182 */ 183 public List getTemporaryTopicList() 184 { 185 return Collections.unmodifiableList(tempTopics); 186 } 187 188 /** 189 * Returns a <code>TemporaryTopic</code> by its index. The 190 * index represent the number of the topic. Returns <code>null</code> 191 * if no such <code>TemporaryTopic</code> is present. 192 * @param index the index 193 * @return the <code>TemporaryTopic</code> 194 */ 195 public MockTemporaryTopic getTemporaryTopic(int index) 196 { 197 if(tempTopics.size() <= index || index < 0) return null; 198 return (MockTemporaryTopic)tempTopics.get(index); 199 } 200 201 /** 202 * Returns if this session was closed. 203 * @return <code>true</code> if this session is closed 204 */ 205 public boolean isClosed() 206 { 207 return closed; 208 } 209 210 /** 211 * Returns if this session was recovered. 212 * @return <code>true</code> if this session was recovered 213 */ 214 public boolean isRecovered() 215 { 216 return recovered; 217 } 218 219 /** 220 * Returns if the current transaction was committed. 221 * @return <code>true</code> if the transaction was committed 222 */ 223 public boolean isCommitted() 224 { 225 return (numberCommits > 0); 226 } 227 228 /** 229 * Returns the number of commits. 230 * @return the number of commits 231 */ 232 public int getNumberCommits() 233 { 234 return numberCommits; 235 } 236 237 /** 238 * Returns if the current transaction was rolled back. 239 * @return <code>true</code> if the transaction was rolled back 240 */ 241 public boolean isRolledBack() 242 { 243 return (numberRollbacks > 0); 244 } 245 246 /** 247 * Returns the number of rollbacks. 248 * @return the number of rollbacks 249 */ 250 public int getNumberRollbacks() 251 { 252 return numberRollbacks; 253 } 254 255 /** 256 * Returns if messages should be automatically acknowledged, 257 * i.e. if the acknowledge mode is not <code>CLIENT_ACKNOWLEDGE</code>. 258 * @return <code>true</code> if messages are automatically acknowledged 259 */ 260 public boolean isAutoAcknowledge() 261 { 262 return acknowledgeMode != CLIENT_ACKNOWLEDGE; 263 } 264 265 /** 266 * Note: Returns <code>0</code> if the session is transacted. 267 * This method does not exist in JMS 1.0.2. In JMS 1.1 it 268 * should return <code>Session.SESSION_TRANSACTED</code> 269 * which is specified as <code>0</code>. In order to avoid 270 * different versions for JMS 1.0.2 and 1.1 271 * (<code>Session.SESSION_TRANSACTED</code> does not 272 * exist in 1.0.2) this method returns hardcoded <code>0</code>, 273 * if the session is transacted. 274 * @return the acknowledge mode 275 */ 276 public int getAcknowledgeMode() throws JMSException 277 { 278 if(getTransacted()) return 0; 279 return acknowledgeMode; 280 } 281 282 public boolean getTransacted() throws JMSException 283 { 284 connection.throwJMSException(); 285 return transacted; 286 } 287 288 public BytesMessage createBytesMessage() throws JMSException 289 { 290 connection.throwJMSException(); 291 return getMessageManager().createBytesMessage(); 292 } 293 294 public MapMessage createMapMessage() throws JMSException 295 { 296 connection.throwJMSException(); 297 return getMessageManager().createMapMessage(); 298 } 299 300 public Message createMessage() throws JMSException 301 { 302 connection.throwJMSException(); 303 return getMessageManager().createMessage(); 304 } 305 306 public ObjectMessage createObjectMessage() throws JMSException 307 { 308 connection.throwJMSException(); 309 return createObjectMessage(null); 310 } 311 312 public ObjectMessage createObjectMessage(Serializable object) throws JMSException 313 { 314 connection.throwJMSException(); 315 return getMessageManager().createObjectMessage(object); 316 } 317 318 public StreamMessage createStreamMessage() throws JMSException 319 { 320 connection.throwJMSException(); 321 return getMessageManager().createStreamMessage(); 322 } 323 324 public TextMessage createTextMessage() throws JMSException 325 { 326 connection.throwJMSException(); 327 return createTextMessage(null); 328 } 329 330 public TextMessage createTextMessage(String text) throws JMSException 331 { 332 connection.throwJMSException(); 333 return getMessageManager().createTextMessage(text); 334 } 335 336 public MessageListener getMessageListener() throws JMSException 337 { 338 connection.throwJMSException(); 339 return messageListener; 340 } 341 342 public void setMessageListener(MessageListener messageListener) throws JMSException 343 { 344 connection.throwJMSException(); 345 this.messageListener = messageListener; 346 } 347 348 public void run() 349 { 350 351 } 352 353 public void commit() throws JMSException 354 { 355 connection.throwJMSException(); 356 numberCommits++; 357 } 358 359 public void rollback() throws JMSException 360 { 361 connection.throwJMSException(); 362 recover(); 363 numberRollbacks++; 364 } 365 366 public void close() throws JMSException 367 { 368 connection.throwJMSException(); 369 if(getTransacted() && !isCommitted()) 370 { 371 rollback(); 372 } 373 getQueueTransmissionManager().closeAll(); 374 getTopicTransmissionManager().closeAll(); 375 getGenericTransmissionManager().closeAll(); 376 removeSessionFromDestinations(tempQueues); 377 removeSessionFromDestinations(tempTopics); 378 removeSessionFromDestinations(queues); 379 removeSessionFromDestinations(topics); 380 queues.clear(); 381 topics.clear(); 382 closed = true; 383 } 384 385 private void removeSessionFromDestinations(Collection destinations) 386 { 387 Iterator iterator = destinations.iterator(); 388 while(iterator.hasNext()) 389 { 390 Object currentDestination = (Object)iterator.next(); 391 if(currentDestination instanceof MockDestination) 392 ((MockDestination)currentDestination).removeSession(this); 393 } 394 } 395 396 public void recover() throws JMSException 397 { 398 connection.throwJMSException(); 399 recovered = true; 400 } 401 402 public void unsubscribe(String name) throws JMSException 403 { 404 getConnection().throwJMSException(); 405 topicTransManager.removeTopicDurableSubscriber(name); 406 } 407 408 public Queue createQueue(String name) throws JMSException 409 { 410 getConnection().throwJMSException(); 411 MockQueue queue = ((MockConnection)getConnection()).getDestinationManager().getQueue(name); 412 if(null == queue) 413 { 414 throw new JMSException("Queue with name " + name + " not found"); 415 } 416 addSessionToQueue(queue); 417 return queue; 418 } 419 420 public TemporaryQueue createTemporaryQueue() throws JMSException 421 { 422 getConnection().throwJMSException(); 423 MockTemporaryQueue queue = new MockTemporaryQueue(); 424 tempQueues.add(queue); 425 addSessionToQueue(queue); 426 return queue; 427 } 428 429 public Topic createTopic(String name) throws JMSException 430 { 431 getConnection().throwJMSException(); 432 MockTopic topic = ((MockConnection)getConnection()).getDestinationManager().getTopic(name); 433 if(null == topic) 434 { 435 throw new JMSException("Topic with name " + name + " not found"); 436 } 437 addSessionToTopic(topic); 438 return topic; 439 } 440 441 public TemporaryTopic createTemporaryTopic() throws JMSException 442 { 443 getConnection().throwJMSException(); 444 MockTemporaryTopic topic = new MockTemporaryTopic(); 445 tempTopics.add(topic); 446 addSessionToTopic(topic); 447 return topic; 448 } 449 450 public MessageConsumer createConsumer(Destination destination) throws JMSException 451 { 452 getConnection().throwJMSException(); 453 return createConsumer(destination, null); 454 } 455 456 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException 457 { 458 getConnection().throwJMSException(); 459 return createConsumer(destination, messageSelector, false); 460 } 461 462 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException 463 { 464 if(null == destination) 465 { 466 throw new IllegalArgumentException("destination must not be null"); 467 } 468 getConnection().throwJMSException(); 469 if(destination instanceof MockQueue) 470 { 471 addSessionToQueue((Queue)destination); 472 return getQueueTransmissionManager().createQueueReceiver((MockQueue)destination, messageSelector); 473 } 474 else if(destination instanceof MockTopic) 475 { 476 addSessionToTopic((Topic)destination); 477 return getTopicTransmissionManager().createTopicSubscriber((MockTopic)destination, messageSelector, noLocal); 478 } 479 else 480 { 481 throw new InvalidDestinationException("destination must be an instance of MockQueue or MockTopic"); 482 } 483 } 484 485 public MessageProducer createProducer(Destination destination) throws JMSException 486 { 487 getConnection().throwJMSException(); 488 if(null == destination) 489 { 490 return createProducerForNullDestination(); 491 } 492 if(destination instanceof MockQueue) 493 { 494 addSessionToQueue((Queue)destination); 495 return getQueueTransmissionManager().createQueueSender((MockQueue)destination); 496 } 497 else if(destination instanceof MockTopic) 498 { 499 addSessionToTopic((Topic)destination); 500 return getTopicTransmissionManager().createTopicPublisher((MockTopic)destination); 501 } 502 else 503 { 504 throw new InvalidDestinationException("destination must be an instance of MockQueue or MockTopic"); 505 } 506 } 507 508 public QueueBrowser createBrowser(Queue queue) throws JMSException 509 { 510 getConnection().throwJMSException(); 511 return createBrowser(queue, null); 512 } 513 514 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException 515 { 516 getConnection().throwJMSException(); 517 if(!(queue instanceof MockQueue)) 518 { 519 throw new InvalidDestinationException("queue must be an instance of MockQueue"); 520 } 521 addSessionToQueue(queue); 522 return queueTransManager.createQueueBrowser((MockQueue)queue, messageSelector); 523 } 524 525 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException 526 { 527 getConnection().throwJMSException(); 528 return createDurableSubscriber(topic, name, null, false); 529 } 530 531 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException 532 { 533 getConnection().throwJMSException(); 534 if(!(topic instanceof MockTopic)) 535 { 536 throw new InvalidDestinationException("topic must be an instance of MockTopic"); 537 } 538 addSessionToTopic(topic); 539 return topicTransManager.createDurableTopicSubscriber((MockTopic)topic, name, messageSelector, noLocal); 540 } 541 542 protected MockConnection getConnection() 543 { 544 return connection; 545 } 546 547 public void addSessionToQueue(Queue queue) 548 { 549 if(queue instanceof MockQueue) 550 { 551 ((MockQueue)queue).addSession(this); 552 queues.add(queue); 553 } 554 } 555 556 public void addSessionToTopic(Topic topic) 557 { 558 if(topic instanceof MockTopic) 559 { 560 ((MockTopic)topic).addSession(this); 561 topics.add(topic); 562 } 563 } 564 565 protected MessageProducer createProducerForNullDestination() 566 { 567 return getGenericTransmissionManager().createMessageProducer(); 568 } 569 }