001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.tool;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Collections;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Random;
025    import java.util.concurrent.CountDownLatch;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.atomic.AtomicBoolean;
028    import java.util.concurrent.atomic.AtomicInteger;
029    
030    import javax.jms.BytesMessage;
031    import javax.jms.Connection;
032    import javax.jms.DeliveryMode;
033    import javax.jms.Destination;
034    import javax.jms.JMSException;
035    import javax.jms.Message;
036    import javax.jms.MessageConsumer;
037    import javax.jms.MessageProducer;
038    import javax.jms.Session;
039    
040    import junit.framework.TestCase;
041    import org.apache.activemq.ActiveMQConnectionFactory;
042    import org.apache.activemq.command.ActiveMQQueue;
043    
044    /**
045     * @version $Revision$
046     */
047    public class AcidTestTool extends TestCase {
048    
049    
050        // Worker configuration.
051        protected int recordSize = 1024;
052        protected int batchSize = 5;
053        protected int workerThinkTime = 500;
054        protected Destination target;
055    
056        private Random random = new Random();
057        private byte data[];
058        private int workerCount = 10;
059        private AtomicBoolean ignoreJMSErrors = new AtomicBoolean(false);
060        private ActiveMQConnectionFactory factory;
061        private Connection connection;
062        private AtomicInteger publishedBatches = new AtomicInteger(0);
063        private AtomicInteger consumedBatches = new AtomicInteger(0);
064        private List<Throwable> errors = Collections.synchronizedList(new ArrayList<Throwable>());
065    
066        private interface Worker extends Runnable {
067            boolean waitForExit(long i) throws InterruptedException;
068        }
069    
070        private final class ProducerWorker implements Worker {
071    
072            private Session session;
073            private MessageProducer producer;
074            private BytesMessage message;
075            private CountDownLatch doneLatch = new CountDownLatch(1);
076    
077            ProducerWorker(Session session, String workerId) throws JMSException {
078                this.session = session;
079                producer = session.createProducer(target);
080                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
081                message = session.createBytesMessage();
082                message.setStringProperty("workerId", workerId);
083                message.writeBytes(data);
084            }
085    
086            public void run() {
087                try {
088                    for (int batchId = 0; true; batchId++) {
089                        // System.out.println("Sending batch: "+workerId+"
090                        // "+batchId);
091                        for (int msgId = 0; msgId < batchSize; msgId++) {
092                            // Sleep some random amount of time less than
093                            // workerThinkTime
094                            try {
095                                Thread.sleep(random.nextInt(workerThinkTime));
096                            } catch (InterruptedException e1) {
097                                return;
098                            }
099    
100                            message.setIntProperty("batch-id", batchId);
101                            message.setIntProperty("msg-id", msgId);
102    
103                            producer.send(message);
104                        }
105                        session.commit();
106                        publishedBatches.incrementAndGet();
107                        // System.out.println("Commited send batch: "+workerId+"
108                        // "+batchId);
109                    }
110                } catch (JMSException e) {
111                    if (!ignoreJMSErrors.get()) {
112                        e.printStackTrace();
113                        errors.add(e);
114                    }
115                    return;
116                } catch (Throwable e) {
117                    e.printStackTrace();
118                    errors.add(e);
119                    return;
120                } finally {
121                    System.out.println("Producer exiting.");
122                    doneLatch.countDown();
123                }
124            }
125    
126            public boolean waitForExit(long i) throws InterruptedException {
127                return doneLatch.await(i, TimeUnit.MILLISECONDS);
128            }
129        }
130    
131        private final class ConsumerWorker implements Worker {
132    
133            private Session session;
134            private MessageConsumer consumer;
135            private final long timeout;
136            private CountDownLatch doneLatch = new CountDownLatch(1);
137    
138            ConsumerWorker(Session session, String workerId, long timeout) throws JMSException {
139                this.session = session;
140                this.timeout = timeout;
141                consumer = session.createConsumer(target, "workerId='" + workerId + "'");
142            }
143    
144            public void run() {
145    
146                try {
147                    int batchId = 0;
148                    while (true) {
149                        for (int msgId = 0; msgId < batchSize; msgId++) {
150    
151                            // Sleep some random amount of time less than
152                            // workerThinkTime
153                            try {
154                                Thread.sleep(random.nextInt(workerThinkTime));
155                            } catch (InterruptedException e1) {
156                                return;
157                            }
158    
159                            Message message = consumer.receive(timeout);
160                            if (msgId > 0) {
161                                assertNotNull(message);
162                                assertEquals(message.getIntProperty("batch-id"), batchId);
163                                assertEquals(message.getIntProperty("msg-id"), msgId);
164                            } else {
165                                if (message == null) {
166                                    System.out.println("At end of batch an don't have a next batch to process.  done.");
167                                    return;
168                                }
169                                assertEquals(msgId, message.getIntProperty("msg-id"));
170                                batchId = message.getIntProperty("batch-id");
171                                // System.out.println("Receiving batch: "+workerId+"
172                                // "+batchId);
173                            }
174    
175                        }
176                        session.commit();
177                        consumedBatches.incrementAndGet();
178                        // System.out.println("Commited receive batch: "+workerId+"
179                        // "+batchId);
180                    }
181                } catch (JMSException e) {
182                    if (!ignoreJMSErrors.get()) {
183                        e.printStackTrace();
184                        errors.add(e);
185                    }
186                    return;
187                } catch (Throwable e) {
188                    e.printStackTrace();
189                    errors.add(e);
190                    return;
191                } finally {
192                    System.out.println("Consumer exiting.");
193                    doneLatch.countDown();
194                }
195            }
196    
197            public boolean waitForExit(long i) throws InterruptedException {
198                return doneLatch.await(i, TimeUnit.MILLISECONDS);
199            }
200        }
201    
202        /**
203         * @see junit.framework.TestCase#setUp()
204         */
205        protected void setUp() throws Exception {
206            factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
207            this.target = new ActiveMQQueue(getClass().getName());
208        }
209    
210        protected void tearDown() throws Exception {
211            if (connection != null) {
212                try {
213                    connection.close();
214                } catch (Throwable ignore) {
215                }
216                connection = null;
217            }
218        }
219    
220        /**
221         * @throws InterruptedException
222         * @throws JMSException
223         * @throws JMSException
224         */
225        private void reconnect() throws InterruptedException, JMSException {
226            if (connection != null) {
227                try {
228                    connection.close();
229                } catch (Throwable ignore) {
230                }
231                connection = null;
232            }
233    
234            long reconnectDelay = 1000;
235    
236            while (connection == null) {
237                if (reconnectDelay > 1000 * 10) {
238                    reconnectDelay = 1000 * 10;
239                }
240                try {
241                    connection = factory.createConnection();
242                    connection.start();
243                } catch (JMSException e) {
244                    Thread.sleep(reconnectDelay);
245                    reconnectDelay *= 2;
246                }
247            }
248        }
249    
250        /**
251         * @throws Throwable
252         * @throws IOException
253         */
254        public void testAcidTransactions() throws Throwable {
255    
256            System.out.println("Client threads write records using: Record Size: " + recordSize + ", Batch Size: " + batchSize + ", Worker Think Time: " + workerThinkTime);
257    
258            // Create the record and fill it with some values.
259            data = new byte[recordSize];
260            for (int i = 0; i < data.length; i++) {
261                data[i] = (byte)i;
262            }
263    
264            System.out.println("==============================================");
265            System.out.println("===> Start the server now.");
266            System.out.println("==============================================");
267            reconnect();
268    
269            System.out.println("Starting " + workerCount + " Workers...");
270            ArrayList<Worker> workers = new ArrayList<Worker>();
271            for (int i = 0; i < workerCount; i++) {
272                String workerId = "worker-" + i;
273    
274                Worker w = new ConsumerWorker(connection.createSession(true, Session.SESSION_TRANSACTED), workerId, 1000 * 5);
275                workers.add(w);
276                new Thread(w, "Consumer:" + workerId).start();
277    
278                w = new ProducerWorker(connection.createSession(true, Session.SESSION_TRANSACTED), workerId);
279                workers.add(w);
280                new Thread(w, "Producer:" + workerId).start();
281            }
282    
283            System.out.println("Waiting for " + (workerCount * 10) + " batches to be delivered.");
284    
285            //
286            // Wait for about 5 batches of messages per worker to be consumed before
287            // restart.
288            // 
289            while (publishedBatches.get() < workerCount * 5) {
290                System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get());
291                Thread.sleep(1000);
292            }
293    
294            System.out.println("==============================================");
295            System.out.println("===> Server is under load now.  Kill it!");
296            System.out.println("==============================================");
297            ignoreJMSErrors.set(true);
298    
299            // Wait for all the workers to finish.
300            System.out.println("Waiting for all workers to exit due to server shutdown.");
301            for (Iterator<Worker> iter = workers.iterator(); iter.hasNext();) {
302                Worker worker = iter.next();
303                while (!worker.waitForExit(1000)) {
304                    System.out.println("==============================================");
305                    System.out.println("===> Server is under load now.  Kill it!");
306                    System.out.println("==============================================");
307                    System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get());
308                }
309            }
310            workers.clear();
311    
312            // No errors should have occurred so far.
313            if (errors.size() > 0) {
314                throw errors.get(0);
315            }
316    
317            System.out.println("==============================================");
318            System.out.println("===> Start the server now.");
319            System.out.println("==============================================");
320            reconnect();
321    
322            System.out.println("Restarted.");
323    
324            // Validate the all transactions were commited as a uow. Looking for
325            // partial commits.
326            for (int i = 0; i < workerCount; i++) {
327                String workerId = "worker-" + i;
328                Worker w = new ConsumerWorker(connection.createSession(true, Session.SESSION_TRANSACTED), workerId, 5 * 1000);
329                workers.add(w);
330                new Thread(w, "Consumer:" + workerId).start();
331            }
332    
333            System.out.println("Waiting for restarted consumers to finish consuming all messages..");
334            for (Iterator<Worker> iter = workers.iterator(); iter.hasNext();) {
335                Worker worker = iter.next();
336                while (!worker.waitForExit(1000 * 5)) {
337                    System.out.println("Waiting for restarted consumers to finish consuming all messages..");
338                    System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get());
339                }
340            }
341            workers.clear();
342    
343            System.out.println("Workers finished..");
344            System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get());
345    
346            if (errors.size() > 0) {
347                throw errors.get(0);
348            }
349    
350        }
351    
352        public static void main(String[] args) {
353            try {
354                AcidTestTool tool = new AcidTestTool();
355                tool.setUp();
356                tool.testAcidTransactions();
357                tool.tearDown();
358            } catch (Throwable e) {
359                System.out.println("Test Failed: " + e.getMessage());
360                e.printStackTrace();
361            }
362        }
363    }