org.apache.qpid.server.failover
Class MessageDisappearWithIOExceptionTest

java.lang.Object
  extended by junit.framework.Assert
      extended by junit.framework.TestCase
          extended by org.apache.qpid.test.utils.QpidTestCase
              extended by org.apache.qpid.test.utils.QpidBrokerTestCase
                  extended by org.apache.qpid.test.utils.FailoverBaseCase
                      extended by org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest
All Implemented Interfaces:
junit.framework.Test, org.apache.qpid.jms.ConnectionListener

public class MessageDisappearWithIOExceptionTest
extends FailoverBaseCase
implements org.apache.qpid.jms.ConnectionListener

Test case based on user reported error. Summary: A user has reported message loss from their application. On bouncing of the broker the 'lost' messages are delivered to the broker. Note: The client was using Spring so that may influence the situation. Issue: The log files show 7 instances of the following which result in 7 missing messages. The client log files show: The broker log file show: 7 missing messages have delivery tags 5-11. Which says that they are sequentially the next message from the broker. The only way for the 'without a handler' log to occur is if the consumer has been removed from the look up table of the dispatcher. And the only way for the 'null message' log to occur on the broker is is if the message does not exist in the unacked-map The consumer is only removed from the list during session closure and failover. If the session was closed then the broker would requeue the unacked messages so the potential exists to have an empty map but the broker will not send a message out after the unacked map has been cleared. When failover occurs the _consumer map is cleared and the consumers are resubscribed. This is down without first stopping any existing dispatcher so there exists the potential to receive a message after the _consumer map has been cleared which is how the 'without a handler' log statement occurs. Scenario: Looking over logs the sequence that best fits the events is as follows: - Something causes Mina to be delayed causing the WriteTimoutException. - This exception is recevied by AMQProtocolHandler#exceptionCaught - As the WriteTimeoutException is an IOException this will cause sessionClosed to be called to start failover. + This is potentially the issues here. All IOExceptions are treated as connection failure events. - Failover Runs + Failover assumes that the previous connection has been closed. + Failover binds the existing objects (AMQConnection/Session) to the new connection objects. - Everything is reported as being successfully failed over. However, what is neglected is that the original connection has not been closed. + So what occurs is that the broker sends a message to the consumer on the original connection, as it was not notified of the client failing over. As the client failover reuses the original AMQSession and Dispatcher the new messages the broker sends to the old consumer arrives at the client and is processed by the same AMQSession and Dispatcher. However, as the failover process cleared the _consumer map and resubscribe the consumers the Dispatcher does not recognise the delivery tag and so logs the 'without a handler' message. - The Dispatcher then attempts to reject the message, however, + The AMQSession/Dispatcher pair have been swapped to using a new Mina ProtocolSession as part of the failover process so the reject is sent down the second connection. The broker receives the Reject request but as the Message was sent on a different connection the unacknowledgemap is empty and a 'message is null' log message produced. Test Strategy: It should be easy to demonstrate if we can send an IOException to AMQProtocolHandler#exceptionCaught and then try sending a message. The current unknowns here are the type of consumers that are in use. If it was an exclusive queue(Durable Subscription) then why did the resubscribe not fail. If it was not exclusive then why did the messages not round robin?


Nested Class Summary
 
Nested classes/interfaces inherited from class org.apache.qpid.test.utils.QpidBrokerTestCase
QpidBrokerTestCase.MessageType
 
Field Summary
(package private)  org.apache.qpid.client.AMQConnection _connection
           
(package private)  MessageConsumer _consumer
           
(package private)  Queue _queue
           
(package private)  Session _session
           
 
Fields inherited from class org.apache.qpid.test.utils.FailoverBaseCase
_logger, DEFAULT_FAILOVER_TIME, FAILING_PORT, FAILING_VM_PORT, failingPort
 
Fields inherited from class org.apache.qpid.test.utils.QpidBrokerTestCase
_broker, _brokerLanguage, _brokerLogPrefix, _brokerOutputStream, _brokerPersistent, _brokers, _configFile, _connectionFactory, _connections, _initialContext, _interleaveBrokerLog, _output, _outputFile, _testName, BROKER_READY, CONTENT, CPP, DEFAULT_MANAGEMENT_PORT, DEFAULT_MESSAGE_SIZE, DEFAULT_PORT, DEFAULT_SSL_PORT, DEFAULT_VM_PORT, EXTERNAL, INDEX, JAVA, LOGMONITOR_TIMEOUT, QPID_HOME, QpidHome, QUEUE, RECEIVE_TIMEOUT, TOPIC, VM
 
Fields inherited from class org.apache.qpid.test.utils.QpidTestCase
MEMORY_STORE_CLASS_NAME, MS_CLASS_NAME_KEY
 
Constructor Summary
MessageDisappearWithIOExceptionTest()
           
 
Method Summary
 void bytesReceived(long count)
           
 void bytesSent(long count)
           
 void failoverComplete()
           
 boolean preFailover(boolean redirect)
           
 boolean preResubscribe()
           
 List<Message> sendNumberedBytesMessage(Session session, Destination destination, int count)
          QpidTestCase back port to this release
 void setUp()
           
 void test()
          Test Summary: Create a queue consumer and send 10 messages to the broker.
 
Methods inherited from class org.apache.qpid.test.utils.FailoverBaseCase
failBroker, getConnectionFactory, getFailingPort, tearDown
 
Methods inherited from class org.apache.qpid.test.utils.QpidBrokerTestCase
cleanBroker, createMessage, createNextMessage, drainQueue, getBroker, getBrokerCommand, getClientConnection, getConfigurationStringProperty, getConnection, getConnection, getConnection, getConnectionFactory, getConnectionURL, getInitialContext, getLogger, getManagementPort, getMessageSize, getPort, getPort, getTestConfigFile, getTestQueue, getTestQueueName, getTestVirtualhostsFile, isBroker010, isBroker08, isBrokerStorePersistent, isCppBroker, isExternalBroker, isJavaBroker, makeVirtualHostPersistent, reloadBrokerSecurityConfig, restartBroker, restartBroker, revertLoggingLevels, revertSystemProperties, runBare, saveTestConfiguration, saveTestVirtualhosts, sendMessage, sendMessage, sendMessage, setBrokerEnvironment, setBrokerOnlySystemProperty, setConfigurationProperty, setLoggerLevel, setMessageSize, setSystemProperty, setSystemProperty, setTestClientSystemProperty, startBroker, startBroker, stopBroker, stopBroker
 
Methods inherited from class org.apache.qpid.test.utils.QpidTestCase
getTestProfileMessageStoreClassName, run
 
Methods inherited from class junit.framework.TestCase
countTestCases, createResult, getName, run, runTest, setName, toString
 
Methods inherited from class junit.framework.Assert
assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertEquals, assertFalse, assertFalse, assertNotNull, assertNotNull, assertNotSame, assertNotSame, assertNull, assertNull, assertSame, assertSame, assertTrue, assertTrue, fail, fail, failNotEquals, failNotSame, failSame
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

_connection

org.apache.qpid.client.AMQConnection _connection

_session

Session _session

_queue

Queue _queue

_consumer

MessageConsumer _consumer
Constructor Detail

MessageDisappearWithIOExceptionTest

public MessageDisappearWithIOExceptionTest()
Method Detail

setUp

public void setUp()
           throws Exception
Overrides:
setUp in class FailoverBaseCase
Throws:
Exception

test

public void test()
          throws Exception
Test Summary: Create a queue consumer and send 10 messages to the broker. Consume the first message. This will pull the rest into the prefetch Send an IOException to the MinaProtocolHandler. This will force failover to occur. 9 messages would normally be expected but it is expected that none will arrive. As they are still in the prefetch of the first session. To free the messages we need to close all connections. - Simply doing connection.close() and retesting will not be enough as the original connection's IO layer will still exist and is nolonger connected to the connection object as a result of failover. - Test will need to retain a reference to the original connection IO so that it can be closed releasing the messages to validate that the messages have indeed been 'lost' on that sesssion.

Throws:
Exception

sendNumberedBytesMessage

public List<Message> sendNumberedBytesMessage(Session session,
                                              Destination destination,
                                              int count)
                                       throws Exception
QpidTestCase back port to this release

Throws:
Exception

bytesSent

public void bytesSent(long count)
Specified by:
bytesSent in interface org.apache.qpid.jms.ConnectionListener

bytesReceived

public void bytesReceived(long count)
Specified by:
bytesReceived in interface org.apache.qpid.jms.ConnectionListener

preFailover

public boolean preFailover(boolean redirect)
Specified by:
preFailover in interface org.apache.qpid.jms.ConnectionListener

preResubscribe

public boolean preResubscribe()
Specified by:
preResubscribe in interface org.apache.qpid.jms.ConnectionListener

failoverComplete

public void failoverComplete()
Specified by:
failoverComplete in interface org.apache.qpid.jms.ConnectionListener


Licensed to the Apache Software Foundation