001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.tool; 018 019 import java.util.Date; 020 021 import javax.jms.Connection; 022 import javax.jms.DeliveryMode; 023 import javax.jms.JMSException; 024 import javax.jms.MessageProducer; 025 import javax.jms.Session; 026 import javax.jms.TextMessage; 027 028 /** 029 * A simple tool for publishing messages 030 * 031 * @version $Revision$ 032 */ 033 public class ProducerTool extends ToolSupport { 034 035 protected int messageCount = 10; 036 protected long sleepTime; 037 protected boolean verbose = true; 038 protected int messageSize = 255; 039 040 public static void main(String[] args) { 041 runTool(args, new ProducerTool()); 042 } 043 044 protected static void runTool(String[] args, ProducerTool tool) { 045 if (args.length > 0) { 046 tool.url = args[0]; 047 } 048 if (args.length > 1) { 049 tool.topic = args[1].equalsIgnoreCase("true"); 050 } 051 if (args.length > 2) { 052 tool.subject = args[2]; 053 } 054 if (args.length > 3) { 055 tool.durable = args[3].equalsIgnoreCase("true"); 056 } 057 if (args.length > 4) { 058 tool.messageCount = Integer.parseInt(args[4]); 059 } 060 if (args.length > 5) { 061 tool.messageSize = Integer.parseInt(args[5]); 062 } 063 tool.run(); 064 } 065 066 public void run() { 067 try { 068 System.out.println("Connecting to URL: " + url); 069 System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject); 070 System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing"); 071 072 Connection connection = createConnection(); 073 Session session = createSession(connection); 074 MessageProducer producer = createProducer(session); 075 // connection.start(); 076 077 sendLoop(session, producer); 078 079 System.out.println("Done."); 080 close(connection, session); 081 } catch (Exception e) { 082 System.out.println("Caught: " + e); 083 e.printStackTrace(); 084 } 085 } 086 087 protected MessageProducer createProducer(Session session) throws JMSException { 088 MessageProducer producer = session.createProducer(destination); 089 if (durable) { 090 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 091 } else { 092 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 093 } 094 return producer; 095 } 096 097 protected void sendLoop(Session session, MessageProducer producer) throws Exception { 098 099 for (int i = 0; i < messageCount; i++) { 100 101 TextMessage message = session.createTextMessage(createMessageText(i)); 102 103 if (verbose) { 104 String msg = message.getText(); 105 if (msg.length() > 50) { 106 msg = msg.substring(0, 50) + "..."; 107 } 108 System.out.println("Sending message: " + msg); 109 } 110 111 producer.send(message); 112 Thread.sleep(sleepTime); 113 } 114 producer.send(session.createMessage()); 115 } 116 117 /** 118 * @param i 119 * @return 120 */ 121 private String createMessageText(int index) { 122 StringBuffer buffer = new StringBuffer(messageSize); 123 buffer.append("Message: " + index + " sent at: " + new Date()); 124 if (buffer.length() > messageSize) { 125 return buffer.substring(0, messageSize); 126 } 127 for (int i = buffer.length(); i < messageSize; i++) { 128 buffer.append(' '); 129 } 130 return buffer.toString(); 131 } 132 }