001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.tool; 019 020 import java.util.Properties; 021 022 import javax.jms.BytesMessage; 023 import javax.jms.Connection; 024 import javax.jms.ConnectionFactory; 025 import javax.jms.DeliveryMode; 026 import javax.jms.Destination; 027 import javax.jms.JMSException; 028 import javax.jms.Message; 029 import javax.jms.Session; 030 031 import org.apache.activemq.ActiveMQConnectionFactory; 032 import org.apache.activemq.broker.BrokerService; 033 import org.apache.commons.logging.Log; 034 import org.apache.commons.logging.LogFactory; 035 036 037 public class JMSMemtest { 038 039 private static final Log LOG = LogFactory.getLog(JMSMemtest.class); 040 private static final int DEFAULT_MESSAGECOUNT = 5000; 041 042 protected BrokerService broker; 043 protected boolean topic = true; 044 protected boolean durable; 045 protected long messageCount; 046 047 // how large the message in kb before we close/start the producer/consumer with a new connection. -1 means no connectionCheckpointSize 048 protected int connectionCheckpointSize; 049 protected long connectionInterval; 050 051 052 protected int consumerCount; 053 protected int producerCount; 054 protected int checkpointInterval; 055 protected int prefetchSize; 056 //set 10 kb of payload as default 057 protected int messageSize; 058 059 protected String reportDirectory; 060 protected String reportName; 061 062 063 protected String url = ""; 064 protected MemProducer[] producers; 065 protected MemConsumer[] consumers; 066 protected String destinationName; 067 protected boolean allMessagesConsumed = true; 068 protected MemConsumer allMessagesList = new MemConsumer(); 069 070 protected Message payload; 071 072 protected ActiveMQConnectionFactory connectionFactory; 073 protected Connection connection; 074 protected Destination destination; 075 076 077 protected boolean createConnectionPerClient = true; 078 079 protected boolean transacted; 080 protected boolean useEmbeddedBroker = true; 081 protected MemoryMonitoringTool memoryMonitoringTool; 082 083 public JMSMemtest(Properties settings) { 084 url = settings.getProperty("url"); 085 topic = new Boolean(settings.getProperty("topic")).booleanValue(); 086 durable = new Boolean(settings.getProperty("durable")).booleanValue(); 087 connectionCheckpointSize = new Integer(settings.getProperty("connectionCheckpointSize")).intValue(); 088 producerCount = new Integer(settings.getProperty("producerCount")).intValue(); 089 consumerCount = new Integer(settings.getProperty("consumerCount")).intValue(); 090 messageCount = new Integer(settings.getProperty("messageCount")).intValue(); 091 messageSize = new Integer(settings.getProperty("messageSize")).intValue(); 092 prefetchSize = new Integer(settings.getProperty("prefetchSize")).intValue(); 093 checkpointInterval = new Integer(settings.getProperty("checkpointInterval")).intValue() * 1000; 094 producerCount = new Integer(settings.getProperty("producerCount")).intValue(); 095 reportName = settings.getProperty("reportName"); 096 destinationName = settings.getProperty("destinationName"); 097 reportDirectory = settings.getProperty("reportDirectory"); 098 connectionInterval = connectionCheckpointSize * 1024; 099 } 100 101 public static void main(String[] args) { 102 103 104 Properties sysSettings = new Properties(); 105 106 for (int i = 0; i < args.length; i++) { 107 108 int index = args[i].indexOf("="); 109 String key = args[i].substring(0, index); 110 String val = args[i].substring(index + 1); 111 sysSettings.setProperty(key, val); 112 113 } 114 115 116 JMSMemtest memtest = new JMSMemtest(sysSettings); 117 try { 118 memtest.start(); 119 } catch (Exception e) { 120 121 e.printStackTrace(); 122 } 123 124 } 125 126 protected void start() throws Exception { 127 LOG.info("Starting Monitor"); 128 memoryMonitoringTool = new MemoryMonitoringTool(); 129 memoryMonitoringTool.setTestSettings(getSysTestSettings()); 130 Thread monitorThread = memoryMonitoringTool.startMonitor(); 131 132 if (messageCount == 0) { 133 messageCount = DEFAULT_MESSAGECOUNT; 134 } 135 136 137 if (useEmbeddedBroker) { 138 if (broker == null) { 139 broker = createBroker(); 140 } 141 } 142 143 144 connectionFactory = (ActiveMQConnectionFactory) createConnectionFactory(); 145 if (prefetchSize > 0) { 146 connectionFactory.getPrefetchPolicy().setTopicPrefetch(prefetchSize); 147 connectionFactory.getPrefetchPolicy().setQueuePrefetch(prefetchSize); 148 } 149 150 connection = connectionFactory.createConnection(); 151 Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); 152 153 if (topic) { 154 destination = session.createTopic(destinationName); 155 } else { 156 destination = session.createQueue(destinationName); 157 } 158 159 createPayload(session); 160 161 publishAndConsume(); 162 163 LOG.info("Closing resources"); 164 this.close(); 165 166 monitorThread.join(); 167 168 169 } 170 171 172 protected boolean resetConnection(int counter) { 173 if (connectionInterval > 0) { 174 long totalMsgSizeConsumed = counter * 1024; 175 if (connectionInterval < totalMsgSizeConsumed) { 176 return true; 177 } 178 } 179 return false; 180 } 181 182 protected void publishAndConsume() throws Exception { 183 184 createConsumers(); 185 createProducers(); 186 int counter = 0; 187 boolean resetCon = false; 188 LOG.info("Start sending messages "); 189 for (int i = 0; i < messageCount; i++) { 190 if (resetCon) { 191 closeConsumers(); 192 closeProducers(); 193 createConsumers(); 194 createProducers(); 195 resetCon = false; 196 } 197 198 for (int k = 0; k < producers.length; k++) { 199 producers[k].sendMessage(payload, "counter", counter); 200 counter++; 201 if (resetConnection(counter)) { 202 resetCon = true; 203 break; 204 } 205 } 206 } 207 } 208 209 210 protected void close() throws Exception { 211 connection.close(); 212 broker.stop(); 213 214 memoryMonitoringTool.stopMonitor(); 215 } 216 217 protected void createPayload(Session session) throws JMSException { 218 219 byte[] array = new byte[messageSize]; 220 for (int i = 0; i < array.length; i++) { 221 array[i] = (byte) i; 222 } 223 224 BytesMessage bystePayload = session.createBytesMessage(); 225 bystePayload.writeBytes(array); 226 payload = (Message) bystePayload; 227 } 228 229 230 protected void createProducers() throws JMSException { 231 producers = new MemProducer[producerCount]; 232 for (int i = 0; i < producerCount; i++) { 233 producers[i] = new MemProducer(connectionFactory, destination); 234 if (durable) { 235 producers[i].setDeliveryMode(DeliveryMode.PERSISTENT); 236 } else { 237 producers[i].setDeliveryMode(DeliveryMode.NON_PERSISTENT); 238 } 239 producers[i].start(); 240 } 241 242 } 243 244 protected void createConsumers() throws JMSException { 245 consumers = new MemConsumer[consumerCount]; 246 for (int i = 0; i < consumerCount; i++) { 247 consumers[i] = new MemConsumer(connectionFactory, destination); 248 consumers[i].setParent(allMessagesList); 249 consumers[i].start(); 250 251 252 } 253 } 254 255 protected void closeProducers() throws JMSException { 256 for (int i = 0; i < producerCount; i++) { 257 producers[i].shutDown(); 258 } 259 260 } 261 262 protected void closeConsumers() throws JMSException { 263 for (int i = 0; i < consumerCount; i++) { 264 consumers[i].shutDown(); 265 } 266 } 267 268 protected ConnectionFactory createConnectionFactory() throws JMSException { 269 270 if (url == null || url.trim().equals("") || url.trim().equals("null")) { 271 return new ActiveMQConnectionFactory("vm://localhost"); 272 } else { 273 return new ActiveMQConnectionFactory(url); 274 } 275 } 276 277 protected BrokerService createBroker() throws Exception { 278 BrokerService broker = new BrokerService(); 279 configureBroker(broker); 280 broker.start(); 281 return broker; 282 } 283 284 protected void configureBroker(BrokerService broker) throws Exception { 285 broker.addConnector("vm://localhost"); 286 broker.setDeleteAllMessagesOnStartup(true); 287 } 288 289 protected Properties getSysTestSettings() { 290 Properties settings = new Properties(); 291 settings.setProperty("domain", topic ? "topic" : "queue"); 292 settings.setProperty("durable", durable ? "durable" : "non-durable"); 293 settings.setProperty("connection_checkpoint_size_kb", new Integer(connectionCheckpointSize).toString()); 294 settings.setProperty("producer_count", new Integer(producerCount).toString()); 295 settings.setProperty("consumer_count", new Integer(consumerCount).toString()); 296 settings.setProperty("message_count", new Long(messageCount).toString()); 297 settings.setProperty("message_size", new Integer(messageSize).toString()); 298 settings.setProperty("prefetchSize", new Integer(prefetchSize).toString()); 299 settings.setProperty("checkpoint_interval", new Integer(checkpointInterval).toString()); 300 settings.setProperty("destination_name", destinationName); 301 settings.setProperty("report_name", reportName); 302 settings.setProperty("report_directory", reportDirectory); 303 settings.setProperty("connection_checkpoint_size", new Integer(connectionCheckpointSize).toString()); 304 return settings; 305 } 306 307 308 }