org.apache.qpid.test.utils
Class ConversationFactory

java.lang.Object
  extended by org.apache.qpid.test.utils.ConversationFactory

public class ConversationFactory
extends Object

A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids.

One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded conversation (the conversation methods can be called many times in parallel):

 class Initiator
 {
 ConversationHelper conversation = new ConversationHelper(connection, null,
                                                          java.util.concurrent.LinkedBlockingQueue.class);

 initiateConversation()
 {
  try {
   // Exchange greetings.
   conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello."));
   Message greeting = conversation.receive();

   // Exchange goodbyes.
   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
   Message goodbye = conversation.receive();
  } finally {
   conversation.end();
  }
 }
 }

 class Responder
 {
 ConversationHelper conversation = new ConversationHelper(connection, receiveDestination,
                                                          java.util.concurrent.LinkedBlockingQueue.class);

 respondToConversation()
 {
   try {
   // Exchange greetings.
   Message greeting = conversation.receive();
   conversation.send(conversation.getSession().createTextMessage("Hello."));

   // Exchange goodbyes.
   Message goodbye = conversation.receive();
   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
  } finally {
   conversation.end();
  }
 }
 }
 

Conversation correlation id's are generated on a per thread basis.

The same controlSession is shared amongst all conversations. Calls to send are therefore synchronized because JMS sessions are not multi-threaded.

CRC Card
Responsibilities Collaborations
Associate messages to an ongoing conversation using correlation ids.
Auto manage sessions for conversations.
Store messages not in a conversation in dead letter box.


Nested Class Summary
 class ConversationFactory.Conversation
          Used to hold a conversation context.
protected  class ConversationFactory.Receiver
          Implements the message listener for this conversation handler.
 
Field Summary
(package private)  MessageConsumer consumer
          The message consumer for incoming messages.
(package private)  AtomicLong conversationIdGenerator
          Generates new coversation id's as needed.
(package private)  BlockingQueue<Message> deadLetterBox
          Used to hold any replies that are received outside of the context of a conversation.
(package private)  MessageProducer producer
          The message producer for outgoing messages.
(package private)  Class<? extends BlockingQueue> queueClass
          Holds the queue implementation class for the reply queue.
(package private)  Destination receiveDestination
          The well-known or temporary destination to receive replies on.
 
Constructor Summary
ConversationFactory(Connection connection, Destination receiveDestination, Class<? extends BlockingQueue> queueClass)
          Creates a conversation helper on the specified connection with the default sending destination, and listening to the specified receiving destination.
 
Method Summary
 Collection<Message> emptyDeadLetterBox()
          Clears the dead letter box, returning all messages that were in it.
 Session getSession()
          Gets the controlSession over which the conversation is conducted.
 ConversationFactory.Conversation startConversation()
          Creates a new conversation context.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

consumer

MessageConsumer consumer
The message consumer for incoming messages.


producer

MessageProducer producer
The message producer for outgoing messages.


receiveDestination

Destination receiveDestination
The well-known or temporary destination to receive replies on.


queueClass

Class<? extends BlockingQueue> queueClass
Holds the queue implementation class for the reply queue.


deadLetterBox

BlockingQueue<Message> deadLetterBox
Used to hold any replies that are received outside of the context of a conversation.


conversationIdGenerator

AtomicLong conversationIdGenerator
Generates new coversation id's as needed.

Constructor Detail

ConversationFactory

public ConversationFactory(Connection connection,
                           Destination receiveDestination,
                           Class<? extends BlockingQueue> queueClass)
                    throws JMSException
Creates a conversation helper on the specified connection with the default sending destination, and listening to the specified receiving destination.

Parameters:
connection - The connection to build the conversation helper on.
receiveDestination - The destination to listen to for incoming messages. This may be null to use a temporary queue.
queueClass - The queue implementation class.
Throws:
JMSException - All underlying JMSExceptions are allowed to fall through.
Method Detail

startConversation

public ConversationFactory.Conversation startConversation()
Creates a new conversation context.

Returns:
A new conversation context.

emptyDeadLetterBox

public Collection<Message> emptyDeadLetterBox()
Clears the dead letter box, returning all messages that were in it.

Returns:
All messages in the dead letter box.

getSession

public Session getSession()
Gets the controlSession over which the conversation is conducted.

Returns:
The controlSession over which the conversation is conducted.


Licensed to the Apache Software Foundation