org.apache.qpid.requestreply
Class PingPongProducer

java.lang.Object
  extended by org.apache.qpid.requestreply.PingPongProducer
All Implemented Interfaces:
Runnable, ExceptionListener
Direct Known Subclasses:
PingClient, PingDurableClient

public class PingPongProducer
extends Object
implements Runnable, ExceptionListener

PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may either be generated by another client (see PingPongBouncer, or an extension of it may be used that listens to its own messages and does not send replies (see PingClient). The intention of ping pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour configurable.

The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation id in the ping to be bounced back in the reply correlation id.

This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover testing. A complete list of accepted parameters, default values and comments on their usage is provided here:

Parameters
Parameter Default Comments
messageSize 0 Message size in bytes. Not including any headers.
destinationName ping The root name to use to generate destination names to ping.
persistent false Determines whether peristent delivery is used.
transacted false Determines whether messages are sent/received in transactions.
broker tcp://localhost:5672 Determines the broker to connect to.
virtualHost test Determines the virtual host to send all ping over.
rate 0 The maximum rate (in hertz) to send messages at. 0 means no limit.
verbose false The verbose flag for debugging. Prints to console on every message.
pubsub false Whether to ping topics or queues. Uses p2p by default.
failAfterCommit false Whether to prompt user to kill broker after a commit batch.
failBeforeCommit false Whether to prompt user to kill broker before a commit batch.
failAfterSend false Whether to prompt user to kill broker after a send.
failBeforeSend false Whether to prompt user to kill broker before a send.
failOnce true Whether to prompt for failover only once.
username guest The username to access the broker with.
password guest The password to access the broker with.
selector null Not used. Defines a message selector to filter pings with.
destinationCount 1 The number of destinations to send pings to.
numConsumers 1 The number of consumers on each destination.
timeout 30000 In milliseconds. The timeout to stop waiting for replies.
commitBatchSize 1 The number of messages per transaction in transactional mode.
uniqueDests true Whether each receivers only listens to one ping destination or all.
durableDests false Whether or not durable destinations are used.
ackMode AUTO_ACK The message acknowledgement mode. Possible values are: 0 - SESSION_TRANSACTED 1 - AUTO_ACKNOWLEDGE 2 - CLIENT_ACKNOWLEDGE 3 - DUPS_OK_ACKNOWLEDGE 257 - NO_ACKNOWLEDGE 258 - PRE_ACKNOWLEDGE
consTransacted false Whether or not consumers use transactions. Defaults to the same value as the 'transacted' option if not seperately defined.
consAckMode AUTO_ACK The message acknowledgement mode for consumers. Defaults to the same value as 'ackMode' if not seperately defined.
maxPending 0 The maximum size in bytes, of messages sent but not yet received. Limits the volume of messages currently buffered on the client or broker. Can help scale test clients by limiting amount of buffered data to avoid out of memory errors.

This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also registered to terminate the ping-pong loop cleanly.

CRC Card
Responsibilities Collaborations
Provide a ping and wait for all responses cycle.
Provide command line invocation to loop the ping cycle on a configurable broker url.

Todo:
Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair. Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a block that obtains the write lock for the very last message, releases any waiting producer. Means that the last message waits until all other messages have been handled before releasing producers but allows messages to be processed concurrently, unlike the current synchronized block.

Nested Class Summary
static interface PingPongProducer.ChainedMessageListener
          Defines a chained message listener interface that can be attached to this pinger.
protected static class PingPongProducer.PerCorrelationId
          Holds information on each correlation id.
 
Field Summary
protected  int _ackMode
          Holds the acknowledgement mode used for the producers.
protected  String _brokerDetails
          Holds the broker url.
protected  PingPongProducer.ChainedMessageListener _chainedMessageListener
          Holds a message listener that this message listener chains all its messages to.
protected  Connection _connection
          Holds the connection for the message producer.
protected  int _consAckMode
          Holds the acknowledgement mode setting for the consumers.
protected  boolean _consTransacted
          Holds the consumers transactional mode flag.
protected  MessageConsumer[] _consumer
          Holds the message consumer to receive the ping replies through.
protected  Connection[] _consumerConnection
          Holds the consumer connections.
protected  Session[] _consumerSession
          Holds the controlSession on which ping replies are received.
protected  String _destinationName
          Holds the root name from which to generate test destination names.
protected  String _factoryName
          Holds the JNDI name of the JMS connection factory.
protected  boolean _failAfterCommit
          Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit.
protected  boolean _failAfterSend
          Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send.
protected  boolean _failBeforeCommit
          Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit.
protected  boolean _failBeforeSend
          Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send.
protected  boolean _failOnce
          Flag used to indicate that failover prompting should only be done on the first commit, not on every commit.
protected  String _fileProperties
          Holds the name of the properties file to configure JNDI with.
protected  boolean _isDurable
          Flag used to indicate that durable destination should be used.
protected  boolean _isPubSub
          Flag used to indicate if this is a point to point or pub/sub ping client.
protected  boolean _isUnique
          Flag used to indicate if the destinations should be unique client.
protected  int _maxPendingSize
          Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended if this limit is breached.
protected  int _messageSize
          Determines what size of messages this producer sends.
protected  int _noOfConsumers
          Holds the number of consumers per destination.
protected  int _noOfDestinations
          Holds the number of destinations to ping.
protected  boolean _overrideClientId
          Allows setting of client ID on the connection, rather than through the connection URL.
protected  String _password
          Holds the password to access the broker with.
protected  boolean _persistent
          Determines whether this producer sends persistent messages.
protected  List<Destination> _pingDestinations
          Holds the set of destinations that this ping producer pings.
protected  MessageProducer _producer
          Holds the message producer to send the pings through.
protected  Session _producerSession
          Holds the producer controlSession, needed to create ping messages.
protected  boolean _publish
          Used to tell the ping loop when to terminate, it only runs while this is true.
protected static AtomicInteger _queueJVMSequenceID
          This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when creating multiple ping producers in the same JVM.
protected  String _queueNamePostfix
          Holds the default queue name postfix value.
protected  AtomicInteger _queueSharedID
          This id generator is used to generates ids that are only unique within this pinger.
protected  int _rate
          Holds the maximum send rate in herz.
protected  org.apache.qpid.junit.extensions.Throttle _rateLimiter
          Used to restrict the sending rate to a specified limit.
protected  Destination _replyDestination
          Holds the destination where the response messages will arrive.
protected  String _selector
          Holds the message selector to filter the pings with.
(package private) static SynchronousQueue _sendPauseMonitor
          Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected to wait until the number of unreceived message is reduced before continuing to send.
protected  boolean _transacted
          Holds the producers transactional mode flag.
protected  int _txBatchSize
          Holds the number of sends that should be performed in every transaction when using transactions.
(package private) static AtomicInteger _unreceived
          Keeps a count of the number of message currently sent but not received.
protected  String _username
          Holds the username to access the broker with.
protected  boolean _verbose
          Used to indicate that the ping loop should print out whenever it pings.
protected  String _virtualpath
          Holds the virtual host on the broker to run the tests through.
static int ACK_MODE_DEFAULT
          Defines the default message acknowledgement mode.
static String ACK_MODE_PROPNAME
          Holds the name of the proeprty to get the message acknowledgement mode from.
static String BROKER_DEFAULT
          Holds the default broker url for the test.
static String BROKER_PROPNAME
          Holds the name of the property to get the test broker url from.
static boolean CONSUME_ONLY_DEFAULT
          Defines the default value of the consumeOnly flag to use when publishing messages is not desired.
static String CONSUME_ONLY_PROPNAME
          Holds the name of the property to get when no messasges should be sent.
static int CONSUMER_ACK_MODE_DEFAULT
          Defines the default consumers message acknowledgement mode.
static String CONSUMER_ACK_MODE_PROPNAME
          Holds the name of the property to get the consumers message acknowledgement mode from.
static boolean CONSUMER_TRANSACTED_DEFAULT
          Holds the consumer transactional mode default setting.
static String CONSUMER_TRANSACTED_PROPNAME
          Holds the name of the property to get the test consumer transacted mode from.
static org.apache.qpid.junit.extensions.util.ParsedProperties defaults
          Holds the default configuration properties.
static long DELAY_BEFORE_CONSUME
          Defines the default value for delay in ms to wait before starting thet test run.
static String DELAY_BEFORE_CONSUME_PROPNAME
          Holds the name of the property to get the delay to wait in ms before starting the main test after having prefilled.
static int DESTINATION_COUNT_DEFAULT
          Defines the default number of destinations to ping.
static String DESTINATION_COUNT_PROPNAME
          Holds the name of the property to get the destination count from.
static boolean DURABLE_DESTS_DEFAULT
          Defines the default value of the durable destinations flag.
static String DURABLE_DESTS_PROPNAME
          Holds the name of the property to get the durable destinations flag from.
static boolean EXCLUSIVE_DEFAULT
          Defines the default value of the exclusive flag to use when consuming messages.
static String FACTORY_NAME_DEAFULT
          Holds the default JNDI name of the connection factory.
static String FACTORY_NAME_PROPNAME
          Holds the name of the property to define the JNDI factory name with.
static boolean FAIL_AFTER_COMMIT_DEFAULT
          Holds the default failover after commit test flag.
static String FAIL_AFTER_COMMIT_PROPNAME
          Holds the name of the property to get the fail after commit flag from.
static boolean FAIL_AFTER_SEND_DEFAULT
          Holds the default failover after send test flag.
static String FAIL_AFTER_SEND_PROPNAME
          Holds the name of the proeprty to get the fail after send flag from.
static boolean FAIL_BEFORE_COMMIT_DEFAULT
          Holds the default failover before commit test flag.
static String FAIL_BEFORE_COMMIT_PROPNAME
          Holds the name of the proeprty to get the fail before commit flag from.
static boolean FAIL_BEFORE_SEND_DEFAULT
          Holds the default failover before send test flag.
static String FAIL_BEFORE_SEND_PROPNAME
          Holds the name of the property to get the fail before send flag from.
static boolean FAIL_ONCE_DEFAULT
          The default failover once flag, true means only do one failover, false means failover on every commit cycle.
static String FAIL_ONCE_PROPNAME
          Holds the name of the property to get the fail once flag from.
static String FILE_PROPERTIES_DEAFULT
          Holds the default file name of the JNDI initial context properties.
static String FILE_PROPERTIES_PROPNAME
          Holds the name of the property to set the JNDI initial context properties with.
static int MAX_PENDING_DEFAULT
          Defines the default value for the maximum pending message size setting.
static String MAX_PENDING_PROPNAME
          Holds the name of the property to get the maximum pending message size setting from.
static int MESSAGE_SIZE_DEAFULT
          Used to set up a default message size.
static String MESSAGE_SIZE_PROPNAME
          Holds the name of the property to get the test message size from.
static String MESSAGE_TIMESTAMP_PROPNAME
          Holds the name of the property to store nanosecond timestamps in ping messages with.
static boolean NO_LOCAL_DEFAULT
          Defines the default value of the no local flag to use when consuming messages.
static int NUM_CONSUMERS_DEFAULT
          Defines the default number consumers per destination.
static String NUM_CONSUMERS_PROPNAME
          Holds the name of the property to get the number of consumers per destination from.
static String OVERRIDE_CLIENT_ID_DEAFULT
          Holds the default value of the override client id flag.
static String OVERRIDE_CLIENT_ID_PROPNAME
          Holds the name of the property to determine whether of not client id is overridden at connection time.
static String PASSWORD_DEFAULT
          Holds the default broker log on password.
static String PASSWORD_PROPNAME
          Holds the name of the property to get the broker access password from.
static boolean PERSISTENT_MODE_DEFAULT
          Holds the message delivery mode to use for the test.
static String PERSISTENT_MODE_PROPNAME
          Holds the name of the property to get the test delivery mode from.
static String PING_QUEUE_NAME_DEFAULT
          Holds the name of the default destination to send pings on.
static String PING_QUEUE_NAME_PROPNAME
          Holds the name of the property to get the ping queue name from.
static int PREFETCH_DEFAULT
          Defines the default prefetch size to use when consuming messages.
static int PREFILL_DEFAULT
          Defines the default value for the number of messages to prefill.
static String PREFILL_PROPNAME
          Holds the name of the property to get the number of message to prefill the broker with before starting the main test.
static boolean PUBSUB_DEFAULT
          Holds the pub/sub mode default, true means ping a topic, false means ping a queue.
static String PUBSUB_PROPNAME
          Holds the name of the property to get the p2p or pub/sub messaging mode from.
static String QUEUE_NAME_POSTFIX_DEFAULT
          Holds the default queue name postfix value.
static String QUEUE_NAME_POSTFIX_PROPNAME
          Holds the name of the property to get the queue name postfix from.
static int RATE_DEFAULT
          Defines the default rate (in pings per second) to send pings at.
static String RATE_PROPNAME
          Holds the name of the property to get the message rate from.
static String SELECTOR_DEFAULT
          Holds the default message selector.
static String SELECTOR_PROPNAME
          Holds the name of the proeprty to get the.
static boolean SEND_ONLY_DEFAULT
          Defines the default value of the consumeOnly flag to use when publishing messages is not desired.
static String SEND_ONLY_PROPNAME
          Holds the name of the property to get when no messasges should be sent.
static long TIMEOUT_DEFAULT
          Default time to wait before assuming that a ping has timed out.
static String TIMEOUT_PROPNAME
          Holds the name of the property to get the waiting timeout for response messages.
protected static DateFormat timestampFormatter
          A convenient formatter to use when time stamping output.
static boolean TRANSACTED_DEFAULT
          Holds the transactional mode to use for the test.
static String TRANSACTED_PROPNAME
          Holds the name of the property to get the test transactional mode from.
static int TX_BATCH_SIZE_DEFAULT
          Defines the default number of pings to send in each transaction when running transactionally.
static String TX_BATCH_SIZE_PROPNAME
          Holds the name of the property to get the commit batch size from.
static boolean UNIQUE_DESTS_DEFAULT
          Defines the default value for the unique destinations property.
static String UNIQUE_DESTS_PROPNAME
          Holds the name of the property to get the unique destinations flag from.
static String USERNAME_DEFAULT
          Holds the default broker log on username.
static String USERNAME_PROPNAME
          Holds the name of the property to get the broker access username from.
static boolean VERBOSE_DEFAULT
          Holds the default verbose mode.
static String VERBOSE_PROPNAME
          Holds the name of the property to get the verbose mode proeprty from.
static String VIRTUAL_HOST_DEFAULT
          Holds the default virtual path for the test.
static String VIRTUAL_HOST_PROPNAME
          Holds the name of the property to get the test broker virtual path.
 
Constructor Summary
PingPongProducer(Properties overrides)
          Creates a ping producer with the specified parameters, of which there are many.
 
Method Summary
 void close()
          Closes all of the producer and consumer connections.
protected  boolean commitTx(Session session)
          Convenience method to commit the transaction on the specified controlSession.
protected  void createConnection(String clientID)
          Establishes a connection to the broker, based on the configuration parameters that this ping client was created with.
 void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, boolean durable)
          Creates consumers for the specified number of destinations.
 void createProducer()
          Creates the producer to send the pings on.
 void createReplyConsumers(Collection<Destination> destinations, String selector)
          Creates consumers for the specified destinations and registers this pinger to listen to their messages.
 void establishConnection(boolean producer, boolean consumer)
          Establishes a connection to the broker and creates message consumers and producers based on the parameters that this ping client was created with.
 int getConsumersPerDestination()
          Gets the number of consumers that are listening to each destination in the test.
 int getExpectedNumPings(int numpings)
          Calculates how many pings are expected to be received for the given number sent.
 List<Destination> getReplyDestinations()
          Gets all the reply destinations (to listen for replies on).
 Thread getShutdownHook()
          Gets a shutdown hook that will cleanly shut this down when it is running the ping loop.
 Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent)
          Generates a test message of the specified size, with the specified reply-to destination and persistence flag.
protected  long getTimestamp(Message msg)
          Extracts the nanosecond timestamp from a message.
static void main(String[] args)
          Starts a ping-pong loop running from the command line.
 void onException(JMSException e)
          Callback method, implementing ExceptionListener.
 void onMessageWithConsumerNo(Message message, int consumerNo)
          Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a correlating reply may be waiting on.
static void pause(long sleepTime)
          Convenience method for a short pause.
 int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId)
           
 int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
          Sends the specified number of ping message and then waits for all correlating replies.
 void pingLoop()
          Implements a single iteration of the ping loop.
 void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId)
          Sends the specified number of ping messages and does not wait for correlating replies.
 void removeChainedMessageListener()
          Removes any chained message listeners from this pinger.
 void run()
          Implements a ping loop that repeatedly pings until the publish flag becomes false.
protected  boolean sendMessage(int i, Message message)
          Sends the sepcified message, applies rate limiting and possibly commits the current transaction.
 void setChainedMessageListener(PingPongProducer.ChainedMessageListener messageListener)
          Sets a chained message listener.
protected  void setTimestamp(Message msg)
          Sets the current time in nanoseconds as the timestamp on the message.
 void setupCorrelationID(String correlationId, int expectedCount)
           
 void start()
          Starts the producer and consumer connections.
 void stop()
          Stops the ping loop by clearing the publish flag.
 void waitForUser(String prompt)
          Outputs a prompt to the console and waits for the user to press return.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

OVERRIDE_CLIENT_ID_PROPNAME

public static final String OVERRIDE_CLIENT_ID_PROPNAME
Holds the name of the property to determine whether of not client id is overridden at connection time.

See Also:
Constant Field Values

OVERRIDE_CLIENT_ID_DEAFULT

public static final String OVERRIDE_CLIENT_ID_DEAFULT
Holds the default value of the override client id flag.

See Also:
Constant Field Values

FACTORY_NAME_PROPNAME

public static final String FACTORY_NAME_PROPNAME
Holds the name of the property to define the JNDI factory name with.

See Also:
Constant Field Values

FACTORY_NAME_DEAFULT

public static final String FACTORY_NAME_DEAFULT
Holds the default JNDI name of the connection factory.

See Also:
Constant Field Values

FILE_PROPERTIES_PROPNAME

public static final String FILE_PROPERTIES_PROPNAME
Holds the name of the property to set the JNDI initial context properties with.

See Also:
Constant Field Values

FILE_PROPERTIES_DEAFULT

public static final String FILE_PROPERTIES_DEAFULT
Holds the default file name of the JNDI initial context properties.

See Also:
Constant Field Values

MESSAGE_SIZE_PROPNAME

public static final String MESSAGE_SIZE_PROPNAME
Holds the name of the property to get the test message size from.

See Also:
Constant Field Values

MESSAGE_SIZE_DEAFULT

public static final int MESSAGE_SIZE_DEAFULT
Used to set up a default message size.

See Also:
Constant Field Values

PING_QUEUE_NAME_PROPNAME

public static final String PING_QUEUE_NAME_PROPNAME
Holds the name of the property to get the ping queue name from.

See Also:
Constant Field Values

PING_QUEUE_NAME_DEFAULT

public static final String PING_QUEUE_NAME_DEFAULT
Holds the name of the default destination to send pings on.

See Also:
Constant Field Values

QUEUE_NAME_POSTFIX_PROPNAME

public static final String QUEUE_NAME_POSTFIX_PROPNAME
Holds the name of the property to get the queue name postfix from.

See Also:
Constant Field Values

QUEUE_NAME_POSTFIX_DEFAULT

public static final String QUEUE_NAME_POSTFIX_DEFAULT
Holds the default queue name postfix value.

See Also:
Constant Field Values

PERSISTENT_MODE_PROPNAME

public static final String PERSISTENT_MODE_PROPNAME
Holds the name of the property to get the test delivery mode from.

See Also:
Constant Field Values

PERSISTENT_MODE_DEFAULT

public static final boolean PERSISTENT_MODE_DEFAULT
Holds the message delivery mode to use for the test.

See Also:
Constant Field Values

TRANSACTED_PROPNAME

public static final String TRANSACTED_PROPNAME
Holds the name of the property to get the test transactional mode from.

See Also:
Constant Field Values

TRANSACTED_DEFAULT

public static final boolean TRANSACTED_DEFAULT
Holds the transactional mode to use for the test.

See Also:
Constant Field Values

CONSUMER_TRANSACTED_PROPNAME

public static final String CONSUMER_TRANSACTED_PROPNAME
Holds the name of the property to get the test consumer transacted mode from.

See Also:
Constant Field Values

CONSUMER_TRANSACTED_DEFAULT

public static final boolean CONSUMER_TRANSACTED_DEFAULT
Holds the consumer transactional mode default setting.

See Also:
Constant Field Values

BROKER_PROPNAME

public static final String BROKER_PROPNAME
Holds the name of the property to get the test broker url from.

See Also:
Constant Field Values

BROKER_DEFAULT

public static final String BROKER_DEFAULT
Holds the default broker url for the test.

See Also:
Constant Field Values

VIRTUAL_HOST_PROPNAME

public static final String VIRTUAL_HOST_PROPNAME
Holds the name of the property to get the test broker virtual path.

See Also:
Constant Field Values

VIRTUAL_HOST_DEFAULT

public static final String VIRTUAL_HOST_DEFAULT
Holds the default virtual path for the test.

See Also:
Constant Field Values

RATE_PROPNAME

public static final String RATE_PROPNAME
Holds the name of the property to get the message rate from.

See Also:
Constant Field Values

RATE_DEFAULT

public static final int RATE_DEFAULT
Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction.

See Also:
Constant Field Values

VERBOSE_PROPNAME

public static final String VERBOSE_PROPNAME
Holds the name of the property to get the verbose mode proeprty from.

See Also:
Constant Field Values

VERBOSE_DEFAULT

public static final boolean VERBOSE_DEFAULT
Holds the default verbose mode.

See Also:
Constant Field Values

PUBSUB_PROPNAME

public static final String PUBSUB_PROPNAME
Holds the name of the property to get the p2p or pub/sub messaging mode from.

See Also:
Constant Field Values

PUBSUB_DEFAULT

public static final boolean PUBSUB_DEFAULT
Holds the pub/sub mode default, true means ping a topic, false means ping a queue.

See Also:
Constant Field Values

FAIL_AFTER_COMMIT_PROPNAME

public static final String FAIL_AFTER_COMMIT_PROPNAME
Holds the name of the property to get the fail after commit flag from.

See Also:
Constant Field Values

FAIL_AFTER_COMMIT_DEFAULT

public static final boolean FAIL_AFTER_COMMIT_DEFAULT
Holds the default failover after commit test flag.

See Also:
Constant Field Values

FAIL_BEFORE_COMMIT_PROPNAME

public static final String FAIL_BEFORE_COMMIT_PROPNAME
Holds the name of the proeprty to get the fail before commit flag from.

See Also:
Constant Field Values

FAIL_BEFORE_COMMIT_DEFAULT

public static final boolean FAIL_BEFORE_COMMIT_DEFAULT
Holds the default failover before commit test flag.

See Also:
Constant Field Values

FAIL_AFTER_SEND_PROPNAME

public static final String FAIL_AFTER_SEND_PROPNAME
Holds the name of the proeprty to get the fail after send flag from.

See Also:
Constant Field Values

FAIL_AFTER_SEND_DEFAULT

public static final boolean FAIL_AFTER_SEND_DEFAULT
Holds the default failover after send test flag.

See Also:
Constant Field Values

FAIL_BEFORE_SEND_PROPNAME

public static final String FAIL_BEFORE_SEND_PROPNAME
Holds the name of the property to get the fail before send flag from.

See Also:
Constant Field Values

FAIL_BEFORE_SEND_DEFAULT

public static final boolean FAIL_BEFORE_SEND_DEFAULT
Holds the default failover before send test flag.

See Also:
Constant Field Values

FAIL_ONCE_PROPNAME

public static final String FAIL_ONCE_PROPNAME
Holds the name of the property to get the fail once flag from.

See Also:
Constant Field Values

FAIL_ONCE_DEFAULT

public static final boolean FAIL_ONCE_DEFAULT
The default failover once flag, true means only do one failover, false means failover on every commit cycle.

See Also:
Constant Field Values

USERNAME_PROPNAME

public static final String USERNAME_PROPNAME
Holds the name of the property to get the broker access username from.

See Also:
Constant Field Values

USERNAME_DEFAULT

public static final String USERNAME_DEFAULT
Holds the default broker log on username.

See Also:
Constant Field Values

PASSWORD_PROPNAME

public static final String PASSWORD_PROPNAME
Holds the name of the property to get the broker access password from.

See Also:
Constant Field Values

PASSWORD_DEFAULT

public static final String PASSWORD_DEFAULT
Holds the default broker log on password.

See Also:
Constant Field Values

SELECTOR_PROPNAME

public static final String SELECTOR_PROPNAME
Holds the name of the proeprty to get the.

See Also:
Constant Field Values

SELECTOR_DEFAULT

public static final String SELECTOR_DEFAULT
Holds the default message selector.

See Also:
Constant Field Values

DESTINATION_COUNT_PROPNAME

public static final String DESTINATION_COUNT_PROPNAME
Holds the name of the property to get the destination count from.

See Also:
Constant Field Values

DESTINATION_COUNT_DEFAULT

public static final int DESTINATION_COUNT_DEFAULT
Defines the default number of destinations to ping.

See Also:
Constant Field Values

NUM_CONSUMERS_PROPNAME

public static final String NUM_CONSUMERS_PROPNAME
Holds the name of the property to get the number of consumers per destination from.

See Also:
Constant Field Values

NUM_CONSUMERS_DEFAULT

public static final int NUM_CONSUMERS_DEFAULT
Defines the default number consumers per destination.

See Also:
Constant Field Values

TIMEOUT_PROPNAME

public static final String TIMEOUT_PROPNAME
Holds the name of the property to get the waiting timeout for response messages.

See Also:
Constant Field Values

TIMEOUT_DEFAULT

public static final long TIMEOUT_DEFAULT
Default time to wait before assuming that a ping has timed out.

See Also:
Constant Field Values

TX_BATCH_SIZE_PROPNAME

public static final String TX_BATCH_SIZE_PROPNAME
Holds the name of the property to get the commit batch size from.

See Also:
Constant Field Values

TX_BATCH_SIZE_DEFAULT

public static final int TX_BATCH_SIZE_DEFAULT
Defines the default number of pings to send in each transaction when running transactionally.

See Also:
Constant Field Values

UNIQUE_DESTS_PROPNAME

public static final String UNIQUE_DESTS_PROPNAME
Holds the name of the property to get the unique destinations flag from.

See Also:
Constant Field Values

UNIQUE_DESTS_DEFAULT

public static final boolean UNIQUE_DESTS_DEFAULT
Defines the default value for the unique destinations property.

See Also:
Constant Field Values

DURABLE_DESTS_PROPNAME

public static final String DURABLE_DESTS_PROPNAME
Holds the name of the property to get the durable destinations flag from.

See Also:
Constant Field Values

DURABLE_DESTS_DEFAULT

public static final boolean DURABLE_DESTS_DEFAULT
Defines the default value of the durable destinations flag.

See Also:
Constant Field Values

ACK_MODE_PROPNAME

public static final String ACK_MODE_PROPNAME
Holds the name of the proeprty to get the message acknowledgement mode from.

See Also:
Constant Field Values

ACK_MODE_DEFAULT

public static final int ACK_MODE_DEFAULT
Defines the default message acknowledgement mode.

See Also:
Constant Field Values

CONSUMER_ACK_MODE_PROPNAME

public static final String CONSUMER_ACK_MODE_PROPNAME
Holds the name of the property to get the consumers message acknowledgement mode from.

See Also:
Constant Field Values

CONSUMER_ACK_MODE_DEFAULT

public static final int CONSUMER_ACK_MODE_DEFAULT
Defines the default consumers message acknowledgement mode.

See Also:
Constant Field Values

MAX_PENDING_PROPNAME

public static final String MAX_PENDING_PROPNAME
Holds the name of the property to get the maximum pending message size setting from.

See Also:
Constant Field Values

MAX_PENDING_DEFAULT

public static final int MAX_PENDING_DEFAULT
Defines the default value for the maximum pending message size setting. 0 means no limit.

See Also:
Constant Field Values

PREFETCH_DEFAULT

public static final int PREFETCH_DEFAULT
Defines the default prefetch size to use when consuming messages.

See Also:
Constant Field Values

NO_LOCAL_DEFAULT

public static final boolean NO_LOCAL_DEFAULT
Defines the default value of the no local flag to use when consuming messages.

See Also:
Constant Field Values

EXCLUSIVE_DEFAULT

public static final boolean EXCLUSIVE_DEFAULT
Defines the default value of the exclusive flag to use when consuming messages.

See Also:
Constant Field Values

MESSAGE_TIMESTAMP_PROPNAME

public static final String MESSAGE_TIMESTAMP_PROPNAME
Holds the name of the property to store nanosecond timestamps in ping messages with.

See Also:
Constant Field Values

PREFILL_PROPNAME

public static final String PREFILL_PROPNAME
Holds the name of the property to get the number of message to prefill the broker with before starting the main test.

See Also:
Constant Field Values

PREFILL_DEFAULT

public static final int PREFILL_DEFAULT
Defines the default value for the number of messages to prefill. 0,default, no messages.

See Also:
Constant Field Values

DELAY_BEFORE_CONSUME_PROPNAME

public static final String DELAY_BEFORE_CONSUME_PROPNAME
Holds the name of the property to get the delay to wait in ms before starting the main test after having prefilled.

See Also:
Constant Field Values

DELAY_BEFORE_CONSUME

public static final long DELAY_BEFORE_CONSUME
Defines the default value for delay in ms to wait before starting thet test run. 0,default, no delay.

See Also:
Constant Field Values

CONSUME_ONLY_PROPNAME

public static final String CONSUME_ONLY_PROPNAME
Holds the name of the property to get when no messasges should be sent.

See Also:
Constant Field Values

CONSUME_ONLY_DEFAULT

public static final boolean CONSUME_ONLY_DEFAULT
Defines the default value of the consumeOnly flag to use when publishing messages is not desired.

See Also:
Constant Field Values

SEND_ONLY_PROPNAME

public static final String SEND_ONLY_PROPNAME
Holds the name of the property to get when no messasges should be sent.

See Also:
Constant Field Values

SEND_ONLY_DEFAULT

public static final boolean SEND_ONLY_DEFAULT
Defines the default value of the consumeOnly flag to use when publishing messages is not desired.

See Also:
Constant Field Values

defaults

public static org.apache.qpid.junit.extensions.util.ParsedProperties defaults
Holds the default configuration properties.


_overrideClientId

protected boolean _overrideClientId
Allows setting of client ID on the connection, rather than through the connection URL.


_factoryName

protected String _factoryName
Holds the JNDI name of the JMS connection factory.


_fileProperties

protected String _fileProperties
Holds the name of the properties file to configure JNDI with.


_brokerDetails

protected String _brokerDetails
Holds the broker url.


_username

protected String _username
Holds the username to access the broker with.


_password

protected String _password
Holds the password to access the broker with.


_virtualpath

protected String _virtualpath
Holds the virtual host on the broker to run the tests through.


_destinationName

protected String _destinationName
Holds the root name from which to generate test destination names.


_queueNamePostfix

protected String _queueNamePostfix
Holds the default queue name postfix value.


_selector

protected String _selector
Holds the message selector to filter the pings with.


_transacted

protected boolean _transacted
Holds the producers transactional mode flag.


_consTransacted

protected boolean _consTransacted
Holds the consumers transactional mode flag.


_persistent

protected boolean _persistent
Determines whether this producer sends persistent messages.


_ackMode

protected int _ackMode
Holds the acknowledgement mode used for the producers.


_consAckMode

protected int _consAckMode
Holds the acknowledgement mode setting for the consumers.


_messageSize

protected int _messageSize
Determines what size of messages this producer sends.


_verbose

protected boolean _verbose
Used to indicate that the ping loop should print out whenever it pings.


_isPubSub

protected boolean _isPubSub
Flag used to indicate if this is a point to point or pub/sub ping client.


_isUnique

protected boolean _isUnique
Flag used to indicate if the destinations should be unique client.


_isDurable

protected boolean _isDurable
Flag used to indicate that durable destination should be used.


_failBeforeCommit

protected boolean _failBeforeCommit
Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit.


_failAfterCommit

protected boolean _failAfterCommit
Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit.


_failBeforeSend

protected boolean _failBeforeSend
Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send.


_failAfterSend

protected boolean _failAfterSend
Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send.


_failOnce

protected boolean _failOnce
Flag used to indicate that failover prompting should only be done on the first commit, not on every commit.


_txBatchSize

protected int _txBatchSize
Holds the number of sends that should be performed in every transaction when using transactions.


_noOfDestinations

protected int _noOfDestinations
Holds the number of destinations to ping.


_noOfConsumers

protected int _noOfConsumers
Holds the number of consumers per destination.


_rate

protected int _rate
Holds the maximum send rate in herz.


_maxPendingSize

protected int _maxPendingSize
Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended if this limit is breached.


timestampFormatter

protected static final DateFormat timestampFormatter
A convenient formatter to use when time stamping output.


_connection

protected Connection _connection
Holds the connection for the message producer.


_consumerConnection

protected Connection[] _consumerConnection
Holds the consumer connections.


_consumerSession

protected Session[] _consumerSession
Holds the controlSession on which ping replies are received.


_producerSession

protected Session _producerSession
Holds the producer controlSession, needed to create ping messages.


_replyDestination

protected Destination _replyDestination
Holds the destination where the response messages will arrive.


_pingDestinations

protected List<Destination> _pingDestinations
Holds the set of destinations that this ping producer pings.


_rateLimiter

protected org.apache.qpid.junit.extensions.Throttle _rateLimiter
Used to restrict the sending rate to a specified limit.


_chainedMessageListener

protected PingPongProducer.ChainedMessageListener _chainedMessageListener
Holds a message listener that this message listener chains all its messages to.


_queueJVMSequenceID

protected static AtomicInteger _queueJVMSequenceID
This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when creating multiple ping producers in the same JVM.


_queueSharedID

protected AtomicInteger _queueSharedID
This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers on the same JVM using this id generator will allow them to ping on the same queues.


_publish

protected boolean _publish
Used to tell the ping loop when to terminate, it only runs while this is true.


_producer

protected MessageProducer _producer
Holds the message producer to send the pings through.


_consumer

protected MessageConsumer[] _consumer
Holds the message consumer to receive the ping replies through.


_sendPauseMonitor

static final SynchronousQueue _sendPauseMonitor
Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected to wait until the number of unreceived message is reduced before continuing to send. This monitor is a fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an equal chance to produce messages.


_unreceived

static AtomicInteger _unreceived
Keeps a count of the number of message currently sent but not received.

Constructor Detail

PingPongProducer

public PingPongProducer(Properties overrides)
                 throws Exception
Creates a ping producer with the specified parameters, of which there are many. See the class level comments for details. This constructor creates a connection to the broker and creates producer and consumer sessions on it, to send and recieve its pings and replies on.

Parameters:
overrides - Properties containing any desired overrides to the defaults.
Throws:
Exception - Any exceptions are allowed to fall through.
Method Detail

establishConnection

public void establishConnection(boolean producer,
                                boolean consumer)
                         throws Exception
Establishes a connection to the broker and creates message consumers and producers based on the parameters that this ping client was created with.

Parameters:
producer - Flag to indicate whether or not the producer should be set up.
consumer - Flag to indicate whether or not the consumers should be set up.
Throws:
Exception - Any exceptions are allowed to fall through.

createConnection

protected void createConnection(String clientID)
                         throws JMSException,
                                NamingException,
                                IOException
Establishes a connection to the broker, based on the configuration parameters that this ping client was created with.

Parameters:
clientID - The clients identifier.
Throws:
JMSException - Underlying exceptions allowed to fall through.
NamingException - Underlying exceptions allowed to fall through.
IOException - Underlying exceptions allowed to fall through.

main

public static void main(String[] args)
Starts a ping-pong loop running from the command line. The bounce back client PingPongBouncer also needs to be started to bounce the pings back again.

Parameters:
args - The command line arguments.

pause

public static void pause(long sleepTime)
Convenience method for a short pause.

Parameters:
sleepTime - The time in milliseconds to pause for.

getReplyDestinations

public List<Destination> getReplyDestinations()
Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply to destination of this pinger.

Returns:
The single reply to destination of this pinger, wrapped in a list.

createProducer

public void createProducer()
                    throws JMSException
Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery flag is set accoring the ping producer creation options.

Throws:
JMSException - Any JMSExceptions are allowed to fall through.

createPingDestinations

public void createPingDestinations(int noOfDestinations,
                                   String selector,
                                   String rootName,
                                   boolean unique,
                                   boolean durable)
                            throws JMSException
Creates consumers for the specified number of destinations. The destinations themselves are also created by this method.

Parameters:
noOfDestinations - The number of destinations to create consumers for.
selector - The message selector to filter the consumers with.
rootName - The root of the name, or actual name if only one is being created.
unique - true to make the destinations unique to this pinger, false to share the numbering with all pingers on the same JVM.
durable - If the destinations are durable topics.
Throws:
JMSException - Any JMSExceptions are allowed to fall through.

createReplyConsumers

public void createReplyConsumers(Collection<Destination> destinations,
                                 String selector)
                          throws JMSException
Creates consumers for the specified destinations and registers this pinger to listen to their messages.

Parameters:
destinations - The destinations to listen to.
selector - A selector to filter the messages with.
Throws:
JMSException - Any JMSExceptions are allowed to fall through.

onMessageWithConsumerNo

public void onMessageWithConsumerNo(Message message,
                                    int consumerNo)
Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the replies map.

Parameters:
message - The received message.
consumerNo - The consumer number within this test pinger instance.

setupCorrelationID

public void setupCorrelationID(String correlationId,
                               int expectedCount)

pingAndWaitForReply

public int pingAndWaitForReply(Message message,
                               int numPings,
                               long timeout,
                               String messageCorrelationId)
                        throws JMSException,
                               InterruptedException
Sends the specified number of ping message and then waits for all correlating replies. If the wait times out before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify the correlation id. Can be augmented through a pre-fill property (PingPongProducer.PREFILL_PROPNAME) that will populate the destination with a set number of messages so the total pings sent and therefore expected will be PREFILL + numPings. If pre-fill is specified then the consumers will start paused to allow the prefilling to occur.

Parameters:
message - The message to send. If this is null, one is generated.
numPings - The number of ping messages to send.
timeout - The timeout in milliseconds.
messageCorrelationId - The message correlation id. If this is null, one is generated.
Returns:
The number of replies received. This may be less than the number sent if the timeout terminated the wait for all prematurely. If we are running in noConsumer=0 so send only mode then it will return the no msgs sent.
Throws:
JMSException - All underlying JMSExceptions are allowed to fall through.
InterruptedException - When interrupted by a timeout

pingAndWaitForReply

public int pingAndWaitForReply(Message message,
                               int numPings,
                               int preFill,
                               long timeout,
                               String messageCorrelationId)
                        throws JMSException,
                               InterruptedException
Throws:
JMSException
InterruptedException

pingNoWaitForReply

public void pingNoWaitForReply(Message message,
                               int numPings,
                               String messageCorrelationId)
                        throws JMSException
Sends the specified number of ping messages and does not wait for correlating replies.

Parameters:
message - The message to send.
numPings - The number of pings to send.
messageCorrelationId - A correlation id to place on all messages sent.
Throws:
JMSException - All underlying JMSExceptions are allowed to fall through.

sendMessage

protected boolean sendMessage(int i,
                              Message message)
                       throws JMSException
Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of messages sent so far must be specified and is used to round robin the ping destinations (where there are more than one), and to determine if the transaction batch size has been reached and the sent messages should be committed.

Parameters:
i - The count of messages sent so far in a loop of multiple calls to this send method.
message - The message to send.
Returns:
true if the messages were committed, false otherwise.
Throws:
JMSException - All underlyiung JMSExceptions are allowed to fall through.

pingLoop

public void pingLoop()
Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will terminate the pinger.


setChainedMessageListener

public void setChainedMessageListener(PingPongProducer.ChainedMessageListener messageListener)
Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set here.

Parameters:
messageListener - The chained message listener.

removeChainedMessageListener

public void removeChainedMessageListener()
Removes any chained message listeners from this pinger.


getTestMessage

public Message getTestMessage(Destination replyQueue,
                              int messageSize,
                              boolean persistent)
                       throws JMSException
Generates a test message of the specified size, with the specified reply-to destination and persistence flag.

Parameters:
replyQueue - The reply-to destination for the message.
messageSize - The desired size of the message in bytes.
persistent - true if the message should use persistent delivery, false otherwise.
Returns:
A freshly generated test message.
Throws:
JMSException - All underlying JMSException are allowed to fall through.

setTimestamp

protected void setTimestamp(Message msg)
                     throws JMSException
Sets the current time in nanoseconds as the timestamp on the message.

Parameters:
msg - The message to timestamp.
Throws:
JMSException - Any JMSExceptions are allowed to fall through.

getTimestamp

protected long getTimestamp(Message msg)
                     throws JMSException
Extracts the nanosecond timestamp from a message.

Parameters:
msg - The message to extract the time stamp from.
Returns:
The timestamp in nanos.
Throws:
JMSException - Any JMSExceptions are allowed to fall through.

stop

public void stop()
Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag has been cleared.


start

public void start()
           throws JMSException
Starts the producer and consumer connections.

Throws:
JMSException - Any JMSExceptions are allowed to fall through.

run

public void run()
Implements a ping loop that repeatedly pings until the publish flag becomes false.

Specified by:
run in interface Runnable

onException

public void onException(JMSException e)
Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the connection, this clears the publish flag which in turn will halt the ping loop.

Specified by:
onException in interface ExceptionListener
Parameters:
e - The exception that triggered this callback method.

getShutdownHook

public Thread getShutdownHook()
Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with the runtime system as a shutdown hook.

Returns:
A shutdown hook for the ping loop.

close

public void close()
           throws JMSException
Closes all of the producer and consumer connections.

Throws:
JMSException - All JMSException are allowed to fall through.

commitTx

protected boolean commitTx(Session session)
                    throws JMSException
Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a transactional controlSession, this method does nothing (unless the failover after send flag is set).

If the _failAfterSend flag is set, this will prompt the user to kill the broker before the commit is applied. This flag applies whether the pinger is transactional or not.

If the _failBeforeCommit flag is set, this will prompt the user to kill the broker before the commit is applied. If the _failAfterCommit flag is set, this will prompt the user to kill the broker after the commit is applied. These flags will only apply if using a transactional pinger.

Parameters:
session - The controlSession to commit
Returns:
true if the controlSession was committed, false if it was not.
Throws:
JMSException - If the commit fails and then the rollback fails.
Todo:
Consider moving the fail after send logic into the send method. It is confusing to have it in this commit method, because commits only apply to transactional pingers, but fail after send applied to transactional and non-transactional alike.

waitForUser

public void waitForUser(String prompt)
Outputs a prompt to the console and waits for the user to press return.

Parameters:
prompt - The prompt to display on the console.

getConsumersPerDestination

public int getConsumersPerDestination()
Gets the number of consumers that are listening to each destination in the test.

Returns:
int The number of consumers subscribing to each topic.

getExpectedNumPings

public int getExpectedNumPings(int numpings)
Calculates how many pings are expected to be received for the given number sent. Note : that if you have set noConsumers to 0 then this will also return 0 in the case of PubSub testing. This is correct as without consumers there will be no-one to receive the sent messages so they will be unable to respond.

Parameters:
numpings - The number of pings that will be sent.
Returns:
The number that should be received, for the test to pass.


Licensed to the Apache Software Foundation