001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.benchmark; 018 019 import java.io.BufferedReader; 020 import java.io.File; 021 import java.io.FileReader; 022 import java.io.IOException; 023 024 import javax.jms.DeliveryMode; 025 import javax.jms.Destination; 026 import javax.jms.JMSException; 027 import javax.jms.Message; 028 import javax.jms.MessageProducer; 029 import javax.jms.Session; 030 031 /** 032 * @author James Strachan 033 * @version $Revision$ 034 */ 035 public class Producer extends BenchmarkSupport { 036 037 int loops = -1; 038 int loopSize = 1000; 039 private int messageSize = 1000; 040 041 public Producer() { 042 } 043 044 public static void main(String[] args) { 045 Producer tool = new Producer(); 046 if (args.length > 0) { 047 tool.setUrl(args[0]); 048 } 049 if (args.length > 1) { 050 tool.setTopic(parseBoolean(args[1])); 051 } 052 if (args.length > 2) { 053 tool.setSubject(args[2]); 054 } 055 if (args.length > 3) { 056 tool.setDurable(parseBoolean(args[3])); 057 } 058 if (args.length > 4) { 059 tool.setMessageSize(Integer.parseInt(args[4])); 060 } 061 if (args.length > 5) { 062 tool.setConnectionCount(Integer.parseInt(args[5])); 063 } 064 try { 065 tool.run(); 066 } catch (Exception e) { 067 System.out.println("Caught: " + e); 068 e.printStackTrace(); 069 } 070 } 071 072 public void run() throws Exception { 073 start(); 074 publish(); 075 } 076 077 // Properties 078 // ------------------------------------------------------------------------- 079 public int getMessageSize() { 080 return messageSize; 081 } 082 083 public void setMessageSize(int messageSize) { 084 this.messageSize = messageSize; 085 } 086 087 public int getLoopSize() { 088 return loopSize; 089 } 090 091 public void setLoopSize(int loopSize) { 092 this.loopSize = loopSize; 093 } 094 095 // Implementation methods 096 // ------------------------------------------------------------------------- 097 098 protected void publish() throws Exception { 099 final String text = getMessage(); 100 101 System.out.println("Publishing to: " + subjects.length + " subject(s)"); 102 103 for (int i = 0; i < subjects.length; i++) { 104 final String subject = subjects[i]; 105 Thread thread = new Thread() { 106 public void run() { 107 try { 108 publish(text, subject); 109 } catch (JMSException e) { 110 System.out.println("Caught: " + e); 111 e.printStackTrace(); 112 } 113 } 114 }; 115 thread.start(); 116 } 117 118 } 119 120 protected String getMessage() { 121 StringBuffer buffer = new StringBuffer(); 122 for (int i = 0; i < messageSize; i++) { 123 char ch = 'X'; 124 buffer.append(ch); 125 } 126 return buffer.toString(); 127 } 128 129 protected void publish(String text, String subject) throws JMSException { 130 Session session = createSession(); 131 132 Destination destination = createDestination(session, subject); 133 134 MessageProducer publisher = session.createProducer(destination); 135 if (isDurable()) { 136 publisher.setDeliveryMode(DeliveryMode.PERSISTENT); 137 } else { 138 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 139 } 140 141 System.out.println("Starting publisher on : " + destination + " of type: " + destination.getClass().getName()); 142 System.out.println("Message length: " + text.length()); 143 144 if (loops <= 0) { 145 while (true) { 146 publishLoop(session, publisher, text); 147 } 148 } else { 149 for (int i = 0; i < loops; i++) { 150 publishLoop(session, publisher, text); 151 } 152 } 153 } 154 155 protected void publishLoop(Session session, MessageProducer publisher, String text) throws JMSException { 156 for (int i = 0; i < loopSize; i++) { 157 Message message = session.createTextMessage(text); 158 159 publisher.send(message); 160 count(1); 161 } 162 } 163 164 protected String loadFile(String file) throws IOException { 165 System.out.println("Loading file: " + file); 166 167 StringBuffer buffer = new StringBuffer(); 168 BufferedReader in = new BufferedReader(new FileReader(file)); 169 while (true) { 170 String line = in.readLine(); 171 if (line == null) { 172 break; 173 } 174 buffer.append(line); 175 buffer.append(File.separator); 176 } 177 in.close(); 178 return buffer.toString(); 179 } 180 181 public int getLoops() { 182 return loops; 183 } 184 185 public void setLoops(int loops) { 186 this.loops = loops; 187 } 188 }