org.apache.qpid.client
Class BasicMessageConsumer<U>

java.lang.Object
  extended by org.apache.qpid.client.Closeable
      extended by org.apache.qpid.client.BasicMessageConsumer<U>
All Implemented Interfaces:
MessageConsumer
Direct Known Subclasses:
BasicMessageConsumer_0_10, BasicMessageConsumer_0_8

public abstract class BasicMessageConsumer<U>
extends Closeable
implements MessageConsumer


Field Summary
protected  int _acknowledgeMode
          The acknowledge mode in force for this consumer.
protected  int _channelId
          We need to know the channel id when constructing frames
protected  AMQConnection _connection
          The connection being used by this consumer
protected  int _consumerTag
          The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker
protected  AMQDestination _destination
           
protected  boolean _exclusive
          We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
protected  MessageFactoryRegistry _messageFactory
           
protected  String _messageSelector
           
protected  AMQProtocolHandler _protocolHandler
           
protected  AMQSession _session
           
protected  BlockingQueue _synchronousQueue
          Used in the blocking receive methods to receive a message from the Session thread.
 
Fields inherited from class org.apache.qpid.client.Closeable
_closed, _closing
 
Constructor Summary
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, org.apache.qpid.framing.FieldTable arguments, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
           
 
Method Summary
(package private)  void acknowledgeDelivered()
          Acknowledge up to last message delivered (if any).
 void addBindingKey(AMQDestination amqd, String routingKey)
           
 void clearReceiveQueue()
           
 void close()
          Closes this object.
 void close(boolean sendClose)
           
abstract  AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, U messageFrame)
           
 String debugIdentity()
           
 List<Long> drainReceiverQueueAndRetrieveDeliveryTags()
           
 void failedOverPost()
           
 void failedOverPre()
          to be called when a failover has occured
 int getAcknowledgeMode()
           
 org.apache.qpid.framing.FieldTable getArguments()
           
 int getConsumerTag()
           
 AMQDestination getDestination()
           
(package private)  Long getLastDelivered()
          Acknowledge up to last message delivered (if any).
 Object getMessageFromQueue(long l)
           
 MessageListener getMessageListener()
           
 String getMessageSelector()
           
 int getPrefetch()
           
 int getPrefetchHigh()
           
 int getPrefetchLow()
           
 org.apache.qpid.framing.AMQShortString getQueuename()
           
 AMQSession getSession()
           
 boolean isAutoClose()
           
 boolean isExclusive()
           
protected  boolean isMessageListenerSet()
           
 boolean isNoConsume()
           
 boolean isNoLocal()
           
 boolean isReceiving()
           
(package private)  void markClosed()
          Called when you need to invalidate a consumer.
 void notifyCloseMessage(CloseConsumerMessage closeMessage)
           
(package private)  void notifyError(Throwable cause)
           
 void notifyMessage(AbstractJMSMessage jmsMessage)
           
(package private)  void notifyMessage(U messageFrame)
          Called from the AMQSession when a message has arrived for this consumer.
(package private)  void postDeliver(AbstractJMSMessage msg)
           
protected  void preApplicationProcessing(AbstractJMSMessage jmsMsg)
           
(package private)  void preDeliver(AbstractJMSMessage msg)
           
 Message receive()
           
 Message receive(long l)
           
(package private) abstract  Message receiveBrowse()
           
 Message receiveNoWait()
           
 void rollback()
           
 void rollbackPendingMessages()
           
(package private) abstract  void sendCancel()
           
 void setConsumerTag(int consumerTag)
           
 void setMessageListener(MessageListener messageListener)
           
 void setQueuename(org.apache.qpid.framing.AMQShortString queuename)
           
 
Methods inherited from class org.apache.qpid.client.Closeable
checkNotClosed, isClosed, isClosing
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

_connection

protected final AMQConnection _connection
The connection being used by this consumer


_messageSelector

protected final String _messageSelector

_destination

protected AMQDestination _destination

_consumerTag

protected int _consumerTag
The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker


_channelId

protected final int _channelId
We need to know the channel id when constructing frames


_synchronousQueue

protected final BlockingQueue _synchronousQueue
Used in the blocking receive methods to receive a message from the Session thread.

Or to notify of errors

Argument true indicates we want strict FIFO semantics


_messageFactory

protected final MessageFactoryRegistry _messageFactory

_session

protected final AMQSession _session

_protocolHandler

protected final AMQProtocolHandler _protocolHandler

_exclusive

protected boolean _exclusive
We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover


_acknowledgeMode

protected final int _acknowledgeMode
The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our implementation.

Constructor Detail

BasicMessageConsumer

protected BasicMessageConsumer(int channelId,
                               AMQConnection connection,
                               AMQDestination destination,
                               String messageSelector,
                               boolean noLocal,
                               MessageFactoryRegistry messageFactory,
                               AMQSession session,
                               AMQProtocolHandler protocolHandler,
                               org.apache.qpid.framing.FieldTable arguments,
                               int prefetchHigh,
                               int prefetchLow,
                               boolean exclusive,
                               int acknowledgeMode,
                               boolean noConsume,
                               boolean autoClose)
Method Detail

getDestination

public AMQDestination getDestination()

getMessageSelector

public String getMessageSelector()
                          throws JMSException
Specified by:
getMessageSelector in interface MessageConsumer
Throws:
JMSException

getMessageListener

public MessageListener getMessageListener()
                                   throws JMSException
Specified by:
getMessageListener in interface MessageConsumer
Throws:
JMSException

getAcknowledgeMode

public int getAcknowledgeMode()

isMessageListenerSet

protected boolean isMessageListenerSet()

setMessageListener

public void setMessageListener(MessageListener messageListener)
                        throws JMSException
Specified by:
setMessageListener in interface MessageConsumer
Throws:
JMSException

preApplicationProcessing

protected void preApplicationProcessing(AbstractJMSMessage jmsMsg)
                                 throws JMSException
Throws:
JMSException

getArguments

public org.apache.qpid.framing.FieldTable getArguments()

getPrefetch

public int getPrefetch()

getPrefetchHigh

public int getPrefetchHigh()

getPrefetchLow

public int getPrefetchLow()

isNoLocal

public boolean isNoLocal()

isExclusive

public boolean isExclusive()

isReceiving

public boolean isReceiving()

receive

public Message receive()
                throws JMSException
Specified by:
receive in interface MessageConsumer
Throws:
JMSException

receive

public Message receive(long l)
                throws JMSException
Specified by:
receive in interface MessageConsumer
Throws:
JMSException

getMessageFromQueue

public Object getMessageFromQueue(long l)
                           throws InterruptedException
Throws:
InterruptedException

receiveBrowse

abstract Message receiveBrowse()
                        throws JMSException
Throws:
JMSException

receiveNoWait

public Message receiveNoWait()
                      throws JMSException
Specified by:
receiveNoWait in interface MessageConsumer
Throws:
JMSException

close

public void close()
           throws JMSException
Description copied from class: Closeable
Closes this object.

Specified by:
close in interface MessageConsumer
Specified by:
close in class Closeable
Throws:
JMSException - If this cannot be closed for any reason.

close

public void close(boolean sendClose)
           throws JMSException
Throws:
JMSException

sendCancel

abstract void sendCancel()
                  throws org.apache.qpid.AMQException,
                         FailoverException
Throws:
org.apache.qpid.AMQException
FailoverException

markClosed

void markClosed()
Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has vetoed automatic resubscription. The caller must hold the failover mutex.


notifyCloseMessage

public void notifyCloseMessage(CloseConsumerMessage closeMessage)
Parameters:
closeMessage - this message signals that we should close the browser

notifyMessage

void notifyMessage(U messageFrame)
Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case of a message listener or a synchronous receive() caller.

Parameters:
messageFrame - the raw unprocessed mesage

createJMSMessageFromUnprocessedMessage

public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory,
                                                                          U messageFrame)
                                                                   throws Exception
Throws:
Exception

notifyMessage

public void notifyMessage(AbstractJMSMessage jmsMessage)
Parameters:
jmsMessage - this message has already been processed so can't redo preDeliver

preDeliver

void preDeliver(AbstractJMSMessage msg)

postDeliver

void postDeliver(AbstractJMSMessage msg)
           throws JMSException
Throws:
JMSException

getLastDelivered

Long getLastDelivered()
Acknowledge up to last message delivered (if any). Used when commiting.

Returns:
the lastDeliveryTag to acknowledge

acknowledgeDelivered

void acknowledgeDelivered()
Acknowledge up to last message delivered (if any). Used when commiting.


notifyError

void notifyError(Throwable cause)

getConsumerTag

public int getConsumerTag()

setConsumerTag

public void setConsumerTag(int consumerTag)

getSession

public AMQSession getSession()

isAutoClose

public boolean isAutoClose()

isNoConsume

public boolean isNoConsume()

rollback

public void rollback()

rollbackPendingMessages

public void rollbackPendingMessages()

debugIdentity

public String debugIdentity()

clearReceiveQueue

public void clearReceiveQueue()

drainReceiverQueueAndRetrieveDeliveryTags

public List<Long> drainReceiverQueueAndRetrieveDeliveryTags()

getQueuename

public org.apache.qpid.framing.AMQShortString getQueuename()

setQueuename

public void setQueuename(org.apache.qpid.framing.AMQShortString queuename)

addBindingKey

public void addBindingKey(AMQDestination amqd,
                          String routingKey)
                   throws org.apache.qpid.AMQException
Throws:
org.apache.qpid.AMQException

failedOverPre

public void failedOverPre()
to be called when a failover has occured


failedOverPost

public void failedOverPost()


Licensed to the Apache Software Foundation