org.apache.qpid.client.protocol
Class AMQProtocolHandler

java.lang.Object
  extended by org.apache.qpid.client.protocol.AMQProtocolHandler
All Implemented Interfaces:
org.apache.qpid.protocol.ProtocolEngine, org.apache.qpid.transport.Receiver<ByteBuffer>

public class AMQProtocolHandler
extends Object
implements org.apache.qpid.protocol.ProtocolEngine

AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP, expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in terms of "message received" and so on.

There is a 1:1 mapping between an AMQProtocolHandler and an AMQConnection. The connection class is exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public API calls through which an individual connection can be manipulated. This protocol handler talks to the network through MINA, in a behind the scenes role; it is not an exposed part of the client API.

There is a 1:many mapping between an AMQProtocolHandler and a set of AMQSessions. At the MINA level, there is one session per connection. At the AMQP level there can be many channels which are also called sessions in JMS parlance. The AMQSessions are managed through an AMQProtocolSession instance. The protocol session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions in the event of failover. See below for more information about this.

Mina provides a session container that can be used to store/retrieve arbitrary objects as String named attributes. A more convenient, type-safe, container for session data is provided in the form of AMQProtocolSession.

A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper as described above). This event handler is different, because dealing with failover complicates things. To the end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection and the protocol session data is held outside of the MINA IOSession.

This handler is responsibile for setting up the filter chain to filter all events for this handler through. The filter chain is set up as a stack of event handers that perform the following functions (working upwards from the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.

CRC Card
Responsibilities Collaborations
Maintain fail-over state.

Todo:
Use a single handler instance, by shifting everything to do with the 'protocol session' state, including failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could be merged, although there is sense in keeping the session model seperate. Will clarify things by having data held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so that lifecycles of the fields match lifecycles of their containing objects.

Constructor Summary
AMQProtocolHandler(AMQConnection con)
          Creates a new protocol handler, associated with the specified client connection instance.
 
Method Summary
 void blockUntilNotFailingOver()
           
 void closeConnection(long timeout)
          Closes the connection.
 void closed()
          Called when the network connection is closed.
 void closeSession(AMQSession session)
           
 void createIoTransportSession(BrokerDetails brokerDetail)
          Called when we want to create a new IoTransport session
 StateWaiter createWaiter(Set<AMQState> states)
           
 void exception(Throwable cause)
          Invoked when any exception is thrown by the NetworkDriver
 void failover(String host, int port)
           
 void failoverInProgress()
           
 org.apache.qpid.framing.AMQShortString generateQueueName()
           
 AMQConnection getConnection()
           
 CountDownLatch getFailoverLatch()
           
(package private)  FailoverState getFailoverState()
           
 SocketAddress getLocalAddress()
           
 org.apache.qpid.framing.MethodRegistry getMethodRegistry()
           
 org.apache.qpid.transport.NetworkDriver getNetworkDriver()
           
 byte getProtocolMajorVersion()
           
 byte getProtocolMinorVersion()
           
 AMQProtocolSession getProtocolSession()
           
 org.apache.qpid.framing.ProtocolVersion getProtocolVersion()
           
 long getReadBytes()
           
 SocketAddress getRemoteAddress()
           
 AMQStateManager getStateManager()
           
 org.apache.qpid.framing.ProtocolVersion getSuggestedProtocolVersion()
           
 long getWrittenBytes()
           
(package private)  void initHeartbeats(int delay)
           
 void methodBodyReceived(int channelId, org.apache.qpid.framing.AMQBody bodyFrame)
           
 void notifyFailoverStarting()
           
 void propagateExceptionToAllWaiters(Exception e)
          There are two cases where we have other threads potentially blocking for events to be handled by this class.
 void propagateExceptionToFrameListeners(Exception e)
          This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any protocol level waits.
 void readerIdle()
           
 void received(ByteBuffer msg)
           
 void setFailoverLatch(CountDownLatch failoverLatch)
           
 void setFailoverState(FailoverState failoverState)
           
 void setNetworkDriver(org.apache.qpid.transport.NetworkDriver driver)
           
 void setStateManager(AMQStateManager stateManager)
           
 org.apache.qpid.protocol.AMQMethodEvent syncWrite(org.apache.qpid.framing.AMQFrame frame, Class responseClass)
          More convenient method to write a frame and wait for it's response.
 org.apache.qpid.protocol.AMQMethodEvent syncWrite(org.apache.qpid.framing.AMQFrame frame, Class responseClass, long timeout)
          More convenient method to write a frame and wait for it's response.
 org.apache.qpid.protocol.AMQMethodEvent writeCommandFrameAndWaitForReply(org.apache.qpid.framing.AMQFrame frame, BlockingMethodFrameListener listener)
          Convenience method that writes a frame to the protocol session and waits for a particular response.
 org.apache.qpid.protocol.AMQMethodEvent writeCommandFrameAndWaitForReply(org.apache.qpid.framing.AMQFrame frame, BlockingMethodFrameListener listener, long timeout)
          Convenience method that writes a frame to the protocol session and waits for a particular response.
 void writeFrame(org.apache.qpid.framing.AMQDataBlock frame)
          Convenience method that writes a frame to the protocol session.
 void writeFrame(org.apache.qpid.framing.AMQDataBlock frame, boolean wait)
           
 void writerIdle()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

AMQProtocolHandler

public AMQProtocolHandler(AMQConnection con)
Creates a new protocol handler, associated with the specified client connection instance.

Parameters:
con - The client connection that this is the event handler for.
Method Detail

createIoTransportSession

public void createIoTransportSession(BrokerDetails brokerDetail)
Called when we want to create a new IoTransport session

Parameters:
brokerDetail -

closed

public void closed()
Called when the network connection is closed. This can happen, either because the client explicitly requested that the connection be closed, in which case nothing is done, or because the connection died. In the case where the connection died, an attempt to failover automatically to a new connection may be started. The failover process will be started, provided that it is the clients policy to allow failover, and provided that a failover has not already been started or failed.

Specified by:
closed in interface org.apache.qpid.protocol.ProtocolEngine
Specified by:
closed in interface org.apache.qpid.transport.Receiver<ByteBuffer>
Todo:
Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and not otherwise? The above comment doesn't make that clear.

readerIdle

public void readerIdle()
Specified by:
readerIdle in interface org.apache.qpid.protocol.ProtocolEngine

writerIdle

public void writerIdle()
Specified by:
writerIdle in interface org.apache.qpid.protocol.ProtocolEngine

exception

public void exception(Throwable cause)
Invoked when any exception is thrown by the NetworkDriver

Specified by:
exception in interface org.apache.qpid.transport.Receiver<ByteBuffer>

propagateExceptionToAllWaiters

public void propagateExceptionToAllWaiters(Exception e)
There are two cases where we have other threads potentially blocking for events to be handled by this class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. This should be called only when the exception is fatal for the connection.

Parameters:
e - the exception to propagate
See Also:
propagateExceptionToFrameListeners(java.lang.Exception)

propagateExceptionToFrameListeners

public void propagateExceptionToFrameListeners(Exception e)
This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any protocol level waits. This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should stop waiting and relinquish the Failover lock FailoverHandler. Once the FailoverHandler has re-established the connection then the listeners will be able to re-attempt their protocol request and so listen again for the correct frame.

Parameters:
e - the exception to propagate

notifyFailoverStarting

public void notifyFailoverStarting()

failoverInProgress

public void failoverInProgress()

received

public void received(ByteBuffer msg)
Specified by:
received in interface org.apache.qpid.transport.Receiver<ByteBuffer>

methodBodyReceived

public void methodBodyReceived(int channelId,
                               org.apache.qpid.framing.AMQBody bodyFrame)
                        throws org.apache.qpid.AMQException
Throws:
org.apache.qpid.AMQException

createWaiter

public StateWaiter createWaiter(Set<AMQState> states)
                         throws org.apache.qpid.AMQException
Throws:
org.apache.qpid.AMQException

writeFrame

public void writeFrame(org.apache.qpid.framing.AMQDataBlock frame)
Convenience method that writes a frame to the protocol session. Equivalent to calling getProtocolSession().write().

Parameters:
frame - the frame to write

writeFrame

public void writeFrame(org.apache.qpid.framing.AMQDataBlock frame,
                       boolean wait)

writeCommandFrameAndWaitForReply

public org.apache.qpid.protocol.AMQMethodEvent writeCommandFrameAndWaitForReply(org.apache.qpid.framing.AMQFrame frame,
                                                                                BlockingMethodFrameListener listener)
                                                                         throws org.apache.qpid.AMQException,
                                                                                FailoverException
Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to calling getProtocolSession().write() then waiting for the response.

Parameters:
frame -
listener - the blocking listener. Note the calling thread will block.
Throws:
org.apache.qpid.AMQException
FailoverException

writeCommandFrameAndWaitForReply

public org.apache.qpid.protocol.AMQMethodEvent writeCommandFrameAndWaitForReply(org.apache.qpid.framing.AMQFrame frame,
                                                                                BlockingMethodFrameListener listener,
                                                                                long timeout)
                                                                         throws org.apache.qpid.AMQException,
                                                                                FailoverException
Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to calling getProtocolSession().write() then waiting for the response.

Parameters:
frame -
listener - the blocking listener. Note the calling thread will block.
Throws:
org.apache.qpid.AMQException
FailoverException

syncWrite

public org.apache.qpid.protocol.AMQMethodEvent syncWrite(org.apache.qpid.framing.AMQFrame frame,
                                                         Class responseClass)
                                                  throws org.apache.qpid.AMQException,
                                                         FailoverException
More convenient method to write a frame and wait for it's response.

Throws:
org.apache.qpid.AMQException
FailoverException

syncWrite

public org.apache.qpid.protocol.AMQMethodEvent syncWrite(org.apache.qpid.framing.AMQFrame frame,
                                                         Class responseClass,
                                                         long timeout)
                                                  throws org.apache.qpid.AMQException,
                                                         FailoverException
More convenient method to write a frame and wait for it's response.

Throws:
org.apache.qpid.AMQException
FailoverException

closeSession

public void closeSession(AMQSession session)
                  throws org.apache.qpid.AMQException
Throws:
org.apache.qpid.AMQException

closeConnection

public void closeConnection(long timeout)
                     throws org.apache.qpid.AMQException
Closes the connection.

If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed anyway.

Parameters:
timeout - The timeout to wait for an acknowledgement to the close request.
Throws:
org.apache.qpid.AMQException - If the close fails for any reason.

getReadBytes

public long getReadBytes()
Specified by:
getReadBytes in interface org.apache.qpid.protocol.ProtocolEngine
Returns:
the number of bytes read from this protocol session

getWrittenBytes

public long getWrittenBytes()
Specified by:
getWrittenBytes in interface org.apache.qpid.protocol.ProtocolEngine
Returns:
the number of bytes written to this protocol session

failover

public void failover(String host,
                     int port)

blockUntilNotFailingOver

public void blockUntilNotFailingOver()
                              throws InterruptedException
Throws:
InterruptedException

generateQueueName

public org.apache.qpid.framing.AMQShortString generateQueueName()

getFailoverLatch

public CountDownLatch getFailoverLatch()

setFailoverLatch

public void setFailoverLatch(CountDownLatch failoverLatch)

getConnection

public AMQConnection getConnection()

getStateManager

public AMQStateManager getStateManager()

setStateManager

public void setStateManager(AMQStateManager stateManager)

getProtocolSession

public AMQProtocolSession getProtocolSession()

getFailoverState

FailoverState getFailoverState()

setFailoverState

public void setFailoverState(FailoverState failoverState)

getProtocolMajorVersion

public byte getProtocolMajorVersion()

getProtocolMinorVersion

public byte getProtocolMinorVersion()

getMethodRegistry

public org.apache.qpid.framing.MethodRegistry getMethodRegistry()

getProtocolVersion

public org.apache.qpid.framing.ProtocolVersion getProtocolVersion()

getRemoteAddress

public SocketAddress getRemoteAddress()
Specified by:
getRemoteAddress in interface org.apache.qpid.protocol.ProtocolEngine

getLocalAddress

public SocketAddress getLocalAddress()
Specified by:
getLocalAddress in interface org.apache.qpid.protocol.ProtocolEngine

setNetworkDriver

public void setNetworkDriver(org.apache.qpid.transport.NetworkDriver driver)
Specified by:
setNetworkDriver in interface org.apache.qpid.protocol.ProtocolEngine

initHeartbeats

void initHeartbeats(int delay)
Parameters:
delay - delay in seconds (not ms)

getNetworkDriver

public org.apache.qpid.transport.NetworkDriver getNetworkDriver()

getSuggestedProtocolVersion

public org.apache.qpid.framing.ProtocolVersion getSuggestedProtocolVersion()


Licensed to the Apache Software Foundation