001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.tool; 019 020 import javax.jms.Connection; 021 import javax.jms.JMSException; 022 import javax.jms.Message; 023 import javax.jms.MessageConsumer; 024 import javax.jms.MessageListener; 025 import javax.jms.Session; 026 import javax.jms.TextMessage; 027 import javax.jms.Topic; 028 import java.io.IOException; 029 030 /** 031 * A simple tool for consuming messages 032 * 033 * @version $Revision$ 034 */ 035 public class ConsumerTool extends ToolSupport implements MessageListener { 036 037 protected int count = 0; 038 protected int dumpCount = 10; 039 protected boolean verbose = true; 040 protected int maxiumMessages = 0; 041 private boolean pauseBeforeShutdown; 042 043 044 public static void main(String[] args) { 045 ConsumerTool tool = new ConsumerTool(); 046 if (args.length > 0) { 047 tool.url = args[0]; 048 } 049 if (args.length > 1) { 050 tool.topic = args[1].equalsIgnoreCase("true"); 051 } 052 if (args.length > 2) { 053 tool.subject = args[2]; 054 } 055 if (args.length > 3) { 056 tool.durable = args[3].equalsIgnoreCase("true"); 057 } 058 if (args.length > 4) { 059 tool.maxiumMessages = Integer.parseInt(args[4]); 060 } 061 tool.run(); 062 } 063 064 public void run() { 065 try { 066 System.out.println("Connecting to URL: " + url); 067 System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject); 068 System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription"); 069 070 Connection connection = createConnection(); 071 Session session = createSession(connection); 072 MessageConsumer consumer = null; 073 if (durable && topic) { 074 consumer = session.createDurableSubscriber((Topic) destination, consumerName); 075 } 076 else { 077 consumer = session.createConsumer(destination); 078 } 079 if (maxiumMessages <= 0) { 080 consumer.setMessageListener(this); 081 } 082 connection.start(); 083 084 if (maxiumMessages > 0) { 085 consumeMessagesAndClose(connection, session, consumer); 086 } 087 } 088 catch (Exception e) { 089 System.out.println("Caught: " + e); 090 e.printStackTrace(); 091 } 092 } 093 094 public void onMessage(Message message) { 095 try { 096 if (message instanceof TextMessage) { 097 TextMessage txtMsg = (TextMessage) message; 098 if (verbose) { 099 100 String msg = txtMsg.getText(); 101 if( msg.length() > 50 ) 102 msg = msg.substring(0, 50)+"..."; 103 104 System.out.println("Received: " + msg); 105 } 106 } 107 else { 108 if (verbose) { 109 System.out.println("Received: " + message); 110 } 111 } 112 /* 113 if (++count % dumpCount == 0) { 114 dumpStats(connection); 115 } 116 */ 117 } 118 catch (JMSException e) { 119 System.out.println("Caught: " + e); 120 e.printStackTrace(); 121 } 122 } 123 124 125 protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException { 126 System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown"); 127 128 for (int i = 0; i < maxiumMessages; i++) { 129 Message message = consumer.receive(); 130 onMessage(message); 131 } 132 System.out.println("Closing connection"); 133 consumer.close(); 134 session.close(); 135 connection.close(); 136 if (pauseBeforeShutdown) { 137 System.out.println("Press return to shut down"); 138 System.in.read(); 139 } 140 } 141 }