001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.tool; 018 019 import java.io.IOException; 020 import java.util.ArrayList; 021 import java.util.Collections; 022 import java.util.Iterator; 023 import java.util.List; 024 import java.util.Random; 025 import java.util.concurrent.CountDownLatch; 026 import java.util.concurrent.TimeUnit; 027 import java.util.concurrent.atomic.AtomicBoolean; 028 import java.util.concurrent.atomic.AtomicInteger; 029 030 import javax.jms.BytesMessage; 031 import javax.jms.Connection; 032 import javax.jms.DeliveryMode; 033 import javax.jms.Destination; 034 import javax.jms.JMSException; 035 import javax.jms.Message; 036 import javax.jms.MessageConsumer; 037 import javax.jms.MessageProducer; 038 import javax.jms.Session; 039 040 import junit.framework.TestCase; 041 import org.apache.activemq.ActiveMQConnectionFactory; 042 import org.apache.activemq.command.ActiveMQQueue; 043 044 /** 045 * @version $Revision$ 046 */ 047 public class AcidTestTool extends TestCase { 048 049 050 // Worker configuration. 051 protected int recordSize = 1024; 052 protected int batchSize = 5; 053 protected int workerThinkTime = 500; 054 protected Destination target; 055 056 private Random random = new Random(); 057 private byte data[]; 058 private int workerCount = 10; 059 private AtomicBoolean ignoreJMSErrors = new AtomicBoolean(false); 060 private ActiveMQConnectionFactory factory; 061 private Connection connection; 062 private AtomicInteger publishedBatches = new AtomicInteger(0); 063 private AtomicInteger consumedBatches = new AtomicInteger(0); 064 private List<Throwable> errors = Collections.synchronizedList(new ArrayList<Throwable>()); 065 066 private interface Worker extends Runnable { 067 boolean waitForExit(long i) throws InterruptedException; 068 } 069 070 private final class ProducerWorker implements Worker { 071 072 private Session session; 073 private MessageProducer producer; 074 private BytesMessage message; 075 private CountDownLatch doneLatch = new CountDownLatch(1); 076 077 ProducerWorker(Session session, String workerId) throws JMSException { 078 this.session = session; 079 producer = session.createProducer(target); 080 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 081 message = session.createBytesMessage(); 082 message.setStringProperty("workerId", workerId); 083 message.writeBytes(data); 084 } 085 086 public void run() { 087 try { 088 for (int batchId = 0; true; batchId++) { 089 // System.out.println("Sending batch: "+workerId+" 090 // "+batchId); 091 for (int msgId = 0; msgId < batchSize; msgId++) { 092 // Sleep some random amount of time less than 093 // workerThinkTime 094 try { 095 Thread.sleep(random.nextInt(workerThinkTime)); 096 } catch (InterruptedException e1) { 097 return; 098 } 099 100 message.setIntProperty("batch-id", batchId); 101 message.setIntProperty("msg-id", msgId); 102 103 producer.send(message); 104 } 105 session.commit(); 106 publishedBatches.incrementAndGet(); 107 // System.out.println("Commited send batch: "+workerId+" 108 // "+batchId); 109 } 110 } catch (JMSException e) { 111 if (!ignoreJMSErrors.get()) { 112 e.printStackTrace(); 113 errors.add(e); 114 } 115 return; 116 } catch (Throwable e) { 117 e.printStackTrace(); 118 errors.add(e); 119 return; 120 } finally { 121 System.out.println("Producer exiting."); 122 doneLatch.countDown(); 123 } 124 } 125 126 public boolean waitForExit(long i) throws InterruptedException { 127 return doneLatch.await(i, TimeUnit.MILLISECONDS); 128 } 129 } 130 131 private final class ConsumerWorker implements Worker { 132 133 private Session session; 134 private MessageConsumer consumer; 135 private final long timeout; 136 private CountDownLatch doneLatch = new CountDownLatch(1); 137 138 ConsumerWorker(Session session, String workerId, long timeout) throws JMSException { 139 this.session = session; 140 this.timeout = timeout; 141 consumer = session.createConsumer(target, "workerId='" + workerId + "'"); 142 } 143 144 public void run() { 145 146 try { 147 int batchId = 0; 148 while (true) { 149 for (int msgId = 0; msgId < batchSize; msgId++) { 150 151 // Sleep some random amount of time less than 152 // workerThinkTime 153 try { 154 Thread.sleep(random.nextInt(workerThinkTime)); 155 } catch (InterruptedException e1) { 156 return; 157 } 158 159 Message message = consumer.receive(timeout); 160 if (msgId > 0) { 161 assertNotNull(message); 162 assertEquals(message.getIntProperty("batch-id"), batchId); 163 assertEquals(message.getIntProperty("msg-id"), msgId); 164 } else { 165 if (message == null) { 166 System.out.println("At end of batch an don't have a next batch to process. done."); 167 return; 168 } 169 assertEquals(msgId, message.getIntProperty("msg-id")); 170 batchId = message.getIntProperty("batch-id"); 171 // System.out.println("Receiving batch: "+workerId+" 172 // "+batchId); 173 } 174 175 } 176 session.commit(); 177 consumedBatches.incrementAndGet(); 178 // System.out.println("Commited receive batch: "+workerId+" 179 // "+batchId); 180 } 181 } catch (JMSException e) { 182 if (!ignoreJMSErrors.get()) { 183 e.printStackTrace(); 184 errors.add(e); 185 } 186 return; 187 } catch (Throwable e) { 188 e.printStackTrace(); 189 errors.add(e); 190 return; 191 } finally { 192 System.out.println("Consumer exiting."); 193 doneLatch.countDown(); 194 } 195 } 196 197 public boolean waitForExit(long i) throws InterruptedException { 198 return doneLatch.await(i, TimeUnit.MILLISECONDS); 199 } 200 } 201 202 /** 203 * @see junit.framework.TestCase#setUp() 204 */ 205 protected void setUp() throws Exception { 206 factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 207 this.target = new ActiveMQQueue(getClass().getName()); 208 } 209 210 protected void tearDown() throws Exception { 211 if (connection != null) { 212 try { 213 connection.close(); 214 } catch (Throwable ignore) { 215 } 216 connection = null; 217 } 218 } 219 220 /** 221 * @throws InterruptedException 222 * @throws JMSException 223 * @throws JMSException 224 */ 225 private void reconnect() throws InterruptedException, JMSException { 226 if (connection != null) { 227 try { 228 connection.close(); 229 } catch (Throwable ignore) { 230 } 231 connection = null; 232 } 233 234 long reconnectDelay = 1000; 235 236 while (connection == null) { 237 if (reconnectDelay > 1000 * 10) { 238 reconnectDelay = 1000 * 10; 239 } 240 try { 241 connection = factory.createConnection(); 242 connection.start(); 243 } catch (JMSException e) { 244 Thread.sleep(reconnectDelay); 245 reconnectDelay *= 2; 246 } 247 } 248 } 249 250 /** 251 * @throws Throwable 252 * @throws IOException 253 */ 254 public void testAcidTransactions() throws Throwable { 255 256 System.out.println("Client threads write records using: Record Size: " + recordSize + ", Batch Size: " + batchSize + ", Worker Think Time: " + workerThinkTime); 257 258 // Create the record and fill it with some values. 259 data = new byte[recordSize]; 260 for (int i = 0; i < data.length; i++) { 261 data[i] = (byte)i; 262 } 263 264 System.out.println("=============================================="); 265 System.out.println("===> Start the server now."); 266 System.out.println("=============================================="); 267 reconnect(); 268 269 System.out.println("Starting " + workerCount + " Workers..."); 270 ArrayList<Worker> workers = new ArrayList<Worker>(); 271 for (int i = 0; i < workerCount; i++) { 272 String workerId = "worker-" + i; 273 274 Worker w = new ConsumerWorker(connection.createSession(true, Session.SESSION_TRANSACTED), workerId, 1000 * 5); 275 workers.add(w); 276 new Thread(w, "Consumer:" + workerId).start(); 277 278 w = new ProducerWorker(connection.createSession(true, Session.SESSION_TRANSACTED), workerId); 279 workers.add(w); 280 new Thread(w, "Producer:" + workerId).start(); 281 } 282 283 System.out.println("Waiting for " + (workerCount * 10) + " batches to be delivered."); 284 285 // 286 // Wait for about 5 batches of messages per worker to be consumed before 287 // restart. 288 // 289 while (publishedBatches.get() < workerCount * 5) { 290 System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get()); 291 Thread.sleep(1000); 292 } 293 294 System.out.println("=============================================="); 295 System.out.println("===> Server is under load now. Kill it!"); 296 System.out.println("=============================================="); 297 ignoreJMSErrors.set(true); 298 299 // Wait for all the workers to finish. 300 System.out.println("Waiting for all workers to exit due to server shutdown."); 301 for (Iterator<Worker> iter = workers.iterator(); iter.hasNext();) { 302 Worker worker = iter.next(); 303 while (!worker.waitForExit(1000)) { 304 System.out.println("=============================================="); 305 System.out.println("===> Server is under load now. Kill it!"); 306 System.out.println("=============================================="); 307 System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get()); 308 } 309 } 310 workers.clear(); 311 312 // No errors should have occurred so far. 313 if (errors.size() > 0) { 314 throw errors.get(0); 315 } 316 317 System.out.println("=============================================="); 318 System.out.println("===> Start the server now."); 319 System.out.println("=============================================="); 320 reconnect(); 321 322 System.out.println("Restarted."); 323 324 // Validate the all transactions were commited as a uow. Looking for 325 // partial commits. 326 for (int i = 0; i < workerCount; i++) { 327 String workerId = "worker-" + i; 328 Worker w = new ConsumerWorker(connection.createSession(true, Session.SESSION_TRANSACTED), workerId, 5 * 1000); 329 workers.add(w); 330 new Thread(w, "Consumer:" + workerId).start(); 331 } 332 333 System.out.println("Waiting for restarted consumers to finish consuming all messages.."); 334 for (Iterator<Worker> iter = workers.iterator(); iter.hasNext();) { 335 Worker worker = iter.next(); 336 while (!worker.waitForExit(1000 * 5)) { 337 System.out.println("Waiting for restarted consumers to finish consuming all messages.."); 338 System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get()); 339 } 340 } 341 workers.clear(); 342 343 System.out.println("Workers finished.."); 344 System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get()); 345 346 if (errors.size() > 0) { 347 throw errors.get(0); 348 } 349 350 } 351 352 public static void main(String[] args) { 353 try { 354 AcidTestTool tool = new AcidTestTool(); 355 tool.setUp(); 356 tool.testAcidTransactions(); 357 tool.tearDown(); 358 } catch (Throwable e) { 359 System.out.println("Test Failed: " + e.getMessage()); 360 e.printStackTrace(); 361 } 362 } 363 }