001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.tool;
020    
021    import java.io.IOException;
022    import java.io.PrintWriter;
023    import java.util.ArrayList;
024    import java.util.Collections;
025    import java.util.Iterator;
026    import java.util.List;
027    import java.util.Random;
028    
029    import javax.jms.BytesMessage;
030    import javax.jms.Connection;
031    import javax.jms.DeliveryMode;
032    import javax.jms.Destination;
033    import javax.jms.JMSException;
034    import javax.jms.Message;
035    import javax.jms.MessageConsumer;
036    import javax.jms.MessageProducer;
037    import javax.jms.Session;
038    
039    import junit.framework.TestCase;
040    
041    import org.activemq.ActiveMQConnectionFactory;
042    import org.activemq.message.ActiveMQQueue;
043    
044    import EDU.oswego.cs.dl.util.concurrent.Latch;
045    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
046    import EDU.oswego.cs.dl.util.concurrent.WaitableInt;
047    
048    /**
049     * @version $Revision$
050     */
051    public class AcidTestTool extends TestCase {
052    
053        private Random random = new Random();
054        private byte data[];
055        private int workerCount = 10;
056        private PrintWriter statWriter;
057    
058        // Worker configuration.
059        protected int recordSize = 1024;
060        protected int batchSize = 5;
061        protected int workerThinkTime = 500;
062        SynchronizedBoolean ignoreJMSErrors = new SynchronizedBoolean(false);
063    
064        protected Destination target;
065        private ActiveMQConnectionFactory factory;
066        private Connection connection;
067        
068        WaitableInt publishedBatches = new WaitableInt(0);
069        WaitableInt consumedBatches = new WaitableInt(0);
070        
071        List errors = Collections.synchronizedList(new ArrayList());
072    
073        private interface Worker extends Runnable {
074            public boolean waitForExit(long i) throws InterruptedException;
075        }
076        
077        private final class ProducerWorker implements Worker {
078    
079            Session session;
080            private MessageProducer producer;
081            private BytesMessage message;
082            Latch doneLatch = new Latch();
083            private final String workerId;
084    
085            ProducerWorker(Session session, String workerId) throws JMSException {
086                this.session = session;
087                this.workerId = workerId;
088                producer = session.createProducer(target);
089                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
090                message = session.createBytesMessage();
091                message.setStringProperty("workerId", workerId);
092                message.writeBytes(data);
093            }
094    
095            public void run() {
096                try {
097                    for( int batchId=0; true; batchId++ ) {
098    //                                  System.out.println("Sending batch: "+workerId+" "+batchId);
099                        for( int msgId=0; msgId < batchSize; msgId++ ) {
100                            // Sleep some random amount of time less than workerThinkTime
101                            try {
102                                Thread.sleep(random.nextInt(workerThinkTime));
103                            } catch (InterruptedException e1) {
104                                return;
105                            }                       
106                            
107                                                message.setIntProperty("batch-id",batchId);
108                                                message.setIntProperty("msg-id",msgId);
109            
110                                                
111                                                producer.send(message);                             
112                        } 
113                        session.commit();
114                        publishedBatches.increment();                           
115    //                                  System.out.println("Commited send batch: "+workerId+" "+batchId);
116                    }
117                            } catch (JMSException e) {
118                                if( !ignoreJMSErrors.get() ) {
119                                        e.printStackTrace();
120                                        errors.add(e);
121                                }
122                                    return;
123                            } catch (Throwable e) {
124                                e.printStackTrace();
125                                errors.add(e);
126                                    return;
127                } finally {
128                    System.out.println("Producer exiting.");
129                    doneLatch.release();
130                }
131            }
132    
133            public boolean waitForExit(long i) throws InterruptedException {
134                return doneLatch.attempt(i);
135            }
136        }
137        
138        private final class ConsumerWorker implements Worker {
139    
140            Session session;
141            private MessageConsumer consumer;
142            private final long timeout;
143            Latch doneLatch = new Latch();
144            private final String workerId;
145            
146            ConsumerWorker(Session session, String workerId, long timeout) throws JMSException {
147                this.session = session;
148                this.workerId = workerId;
149                this.timeout = timeout;
150                consumer = session.createConsumer(target,"workerId='"+workerId+"'");
151            }
152    
153            public void run() {
154                
155                try {
156                    int batchId=0;
157                    while( true ) {
158                        for( int msgId=0; msgId < batchSize; msgId++ ) {
159    
160                            // Sleep some random amount of time less than workerThinkTime
161                            try {
162                                Thread.sleep(random.nextInt(workerThinkTime));
163                            } catch (InterruptedException e1) {
164                                return;
165                            }                       
166                            
167                                Message message = consumer.receive(timeout);                            
168                                if( msgId > 0 ) {
169                                    assertNotNull(message);                         
170                                    assertEquals(message.getIntProperty("batch-id"), batchId);
171                                    assertEquals(message.getIntProperty("msg-id"), msgId);
172                                } else {
173                                    if( message==null ) {
174                                        System.out.println("At end of batch an don't have a next batch to process.  done.");
175                                            return;
176                                    }
177                                    assertEquals(msgId, message.getIntProperty("msg-id") );
178                                    batchId = message.getIntProperty("batch-id");
179    //                                          System.out.println("Receiving batch: "+workerId+" "+batchId);
180                                }        
181                                
182                        } 
183                        session.commit();
184                        consumedBatches.increment();
185    //                                  System.out.println("Commited receive batch: "+workerId+" "+batchId);
186                    }
187                            } catch (JMSException e) {
188                                if( !ignoreJMSErrors.get() ) {
189                                        e.printStackTrace();
190                                        errors.add(e);
191                                }
192                                    return;
193                            } catch (Throwable e) {
194                                e.printStackTrace();
195                                errors.add(e);
196                                    return;
197                } finally {
198                    System.out.println("Consumer exiting.");
199                    doneLatch.release();
200                }
201            }
202    
203            public boolean waitForExit(long i) throws InterruptedException {
204                return doneLatch.attempt(i);
205            }
206        }
207        
208        /**
209         * @see junit.framework.TestCase#setUp()
210         */
211        protected void setUp() throws Exception {
212            factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
213            this.target = new ActiveMQQueue(getClass().getName());
214        }
215    
216        protected void tearDown() throws Exception {
217            if( connection!=null ) {
218                try { connection.close(); } catch (Throwable ignore) {}
219                connection = null;
220            }
221        }
222        
223        /**
224         * @throws InterruptedException
225         * @throws JMSException
226         * @throws JMSException
227         * 
228         */
229        private void reconnect() throws InterruptedException, JMSException {
230            if( connection!=null ) {
231                try { connection.close(); } catch (Throwable ignore) {}
232                connection = null;
233            }
234            
235            long reconnectDelay=1000;
236            JMSException lastError=null;
237            
238            while( connection == null) {
239                if( reconnectDelay > 1000*10 ) {
240                    reconnectDelay = 1000*10;
241                }
242                    try {
243                        connection = factory.createConnection();
244                        connection.start();
245                    } catch (JMSException e) {
246                    lastError = e;
247                        Thread.sleep(reconnectDelay);
248                        reconnectDelay*=2;
249                    }
250            }
251        }
252    
253        /**
254         * @throws Throwable
255         * @throws IOException
256         * 
257         */
258        public void testAcidTransactions() throws Throwable {
259    
260            System.out.println("Client threads write records using: Record Size: " + recordSize + ", Batch Size: "
261                    + batchSize + ", Worker Think Time: " + workerThinkTime);
262    
263            // Create the record and fill it with some values.
264            data = new byte[recordSize];
265            for (int i = 0; i < data.length; i++) {
266                data[i] = (byte) i;
267            }
268    
269            System.out.println("==============================================");
270            System.out.println("===> Start the server now.");
271            System.out.println("==============================================");
272            reconnect();
273            
274            System.out.println("Starting " + workerCount + " Workers...");
275            ArrayList workers = new ArrayList();
276            for( int i=0; i< workerCount; i++ ){        
277                String workerId = "worker-"+i;
278                
279                Worker w = new ConsumerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId, 1000*5);
280                workers.add(w);
281                new Thread(w,"Consumer:"+workerId).start();
282    
283                w = new ProducerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId);
284                workers.add(w);
285                new Thread(w,"Producer:"+workerId).start();
286            }        
287    
288            System.out.println("Waiting for "+(workerCount*10)+" batches to be delivered.");
289    
290            //
291            // Wait for about 5 batches of messages per worker to be consumed before restart. 
292            // 
293            while( publishedBatches.get() <  workerCount*5) {
294                System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());
295                Thread.sleep(1000);
296            }
297            
298            System.out.println("==============================================");
299            System.out.println("===> Server is under load now.  Kill it!");
300            System.out.println("==============================================");
301            ignoreJMSErrors.set(true);
302    
303            // Wait for all the workers to finish.
304            System.out.println("Waiting for all workers to exit due to server shutdown.");
305            for (Iterator iter = workers.iterator(); iter.hasNext();) {
306                Worker worker = (Worker) iter.next();
307                while( !worker.waitForExit(1000) ) {
308                    System.out.println("==============================================");
309                    System.out.println("===> Server is under load now.  Kill it!");
310                    System.out.println("==============================================");                
311                    System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());            
312                }
313            }
314            workers.clear();
315            
316            // No errors should have occured so far.
317            if( errors.size()>0 )
318                throw (Throwable) errors.get(0);
319            
320            System.out.println("==============================================");
321            System.out.println("===> Start the server now.");
322            System.out.println("==============================================");
323            reconnect();
324    
325            System.out.println("Restarted.");
326            
327            // Validate the all transactions were commited as a uow.  Looking for partial commits.
328            for( int i=0; i< workerCount; i++ ){
329                String workerId = "worker-"+i;
330                Worker w = new ConsumerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId, 5*1000);
331                workers.add(w);
332                new Thread(w, "Consumer:"+workerId).start();
333            }
334    
335            System.out.println("Waiting for restarted consumers to finish consuming all messages..");
336            for (Iterator iter = workers.iterator(); iter.hasNext();) {
337                Worker worker = (Worker) iter.next();
338                while( !worker.waitForExit(1000*5) ) {
339                    System.out.println("Waiting for restarted consumers to finish consuming all messages..");
340                    System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());            
341                }
342            }
343            workers.clear();
344    
345            System.out.println("Workers finished..");
346            System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());                    
347            
348            if( errors.size()>0 )
349                throw (Throwable) errors.get(0);
350            
351        }
352        
353        public static void main(String[] args) {
354            try {
355                AcidTestTool tool = new AcidTestTool();
356                tool.setUp();
357                tool.testAcidTransactions();
358                tool.tearDown();
359            } catch (Throwable e) {
360                System.out.println("Test Failed: "+e.getMessage());
361                e.printStackTrace();
362            }
363        }
364    }