|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectorg.apache.qpid.test.utils.ConversationFactory
public class ConversationFactory
A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids.
One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded conversation (the conversation methods can be called many times in parallel):class Initiator { ConversationHelper conversation = new ConversationHelper(connection, null, java.util.concurrent.LinkedBlockingQueue.class); initiateConversation() { try { // Exchange greetings. conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello.")); Message greeting = conversation.receive(); // Exchange goodbyes. conversation.send(conversation.getSession().createTextMessage("Goodbye.")); Message goodbye = conversation.receive(); } finally { conversation.end(); } } } class Responder { ConversationHelper conversation = new ConversationHelper(connection, receiveDestination, java.util.concurrent.LinkedBlockingQueue.class); respondToConversation() { try { // Exchange greetings. Message greeting = conversation.receive(); conversation.send(conversation.getSession().createTextMessage("Hello.")); // Exchange goodbyes. Message goodbye = conversation.receive(); conversation.send(conversation.getSession().createTextMessage("Goodbye.")); } finally { conversation.end(); } } }Conversation correlation id's are generated on a per thread basis. The same controlSession is shared amongst all conversations. Calls to send are therefore synchronized because JMS sessions are not multi-threaded.
Responsibilities | Collaborations |
---|---|
Associate messages to an ongoing conversation using correlation ids. | |
Auto manage sessions for conversations. | |
Store messages not in a conversation in dead letter box. |
Nested Class Summary | |
---|---|
class |
ConversationFactory.Conversation
Used to hold a conversation context. |
protected class |
ConversationFactory.Receiver
Implements the message listener for this conversation handler. |
Field Summary | |
---|---|
(package private) MessageConsumer |
consumer
The message consumer for incoming messages. |
(package private) AtomicLong |
conversationIdGenerator
Generates new coversation id's as needed. |
(package private) BlockingQueue<Message> |
deadLetterBox
Used to hold any replies that are received outside of the context of a conversation. |
(package private) MessageProducer |
producer
The message producer for outgoing messages. |
(package private) Class<? extends BlockingQueue> |
queueClass
Holds the queue implementation class for the reply queue. |
(package private) Destination |
receiveDestination
The well-known or temporary destination to receive replies on. |
Constructor Summary | |
---|---|
ConversationFactory(Connection connection,
Destination receiveDestination,
Class<? extends BlockingQueue> queueClass)
Creates a conversation helper on the specified connection with the default sending destination, and listening to the specified receiving destination. |
Method Summary | |
---|---|
Collection<Message> |
emptyDeadLetterBox()
Clears the dead letter box, returning all messages that were in it. |
Session |
getSession()
Gets the controlSession over which the conversation is conducted. |
ConversationFactory.Conversation |
startConversation()
Creates a new conversation context. |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Field Detail |
---|
MessageConsumer consumer
MessageProducer producer
Destination receiveDestination
Class<? extends BlockingQueue> queueClass
BlockingQueue<Message> deadLetterBox
AtomicLong conversationIdGenerator
Constructor Detail |
---|
public ConversationFactory(Connection connection, Destination receiveDestination, Class<? extends BlockingQueue> queueClass) throws JMSException
connection
- The connection to build the conversation helper on.receiveDestination
- The destination to listen to for incoming messages. This may be null to use a temporary
queue.queueClass
- The queue implementation class.
JMSException
- All underlying JMSExceptions are allowed to fall through.Method Detail |
---|
public ConversationFactory.Conversation startConversation()
public Collection<Message> emptyDeadLetterBox()
public Session getSession()
|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |