001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.tool; 018 019 import java.io.IOException; 020 021 import javax.jms.Connection; 022 import javax.jms.JMSException; 023 import javax.jms.Message; 024 import javax.jms.MessageConsumer; 025 import javax.jms.MessageListener; 026 import javax.jms.Session; 027 import javax.jms.TextMessage; 028 import javax.jms.Topic; 029 030 /** 031 * A simple tool for consuming messages 032 * 033 * @version $Revision$ 034 */ 035 public class ConsumerTool extends ToolSupport implements MessageListener { 036 037 protected int count; 038 protected int dumpCount = 10; 039 protected boolean verbose = true; 040 protected int maxiumMessages; 041 private boolean pauseBeforeShutdown; 042 043 public static void main(String[] args) { 044 ConsumerTool tool = new ConsumerTool(); 045 if (args.length > 0) { 046 tool.url = args[0]; 047 } 048 if (args.length > 1) { 049 tool.topic = args[1].equalsIgnoreCase("true"); 050 } 051 if (args.length > 2) { 052 tool.subject = args[2]; 053 } 054 if (args.length > 3) { 055 tool.durable = args[3].equalsIgnoreCase("true"); 056 } 057 if (args.length > 4) { 058 tool.maxiumMessages = Integer.parseInt(args[4]); 059 } 060 tool.run(); 061 } 062 063 public void run() { 064 try { 065 System.out.println("Connecting to URL: " + url); 066 System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject); 067 System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription"); 068 069 Connection connection = createConnection(); 070 Session session = createSession(connection); 071 MessageConsumer consumer = null; 072 if (durable && topic) { 073 consumer = session.createDurableSubscriber((Topic)destination, consumerName); 074 } else { 075 consumer = session.createConsumer(destination); 076 } 077 if (maxiumMessages <= 0) { 078 consumer.setMessageListener(this); 079 } 080 connection.start(); 081 082 if (maxiumMessages > 0) { 083 consumeMessagesAndClose(connection, session, consumer); 084 } 085 } catch (Exception e) { 086 System.out.println("Caught: " + e); 087 e.printStackTrace(); 088 } 089 } 090 091 public void onMessage(Message message) { 092 try { 093 if (message instanceof TextMessage) { 094 TextMessage txtMsg = (TextMessage)message; 095 if (verbose) { 096 097 String msg = txtMsg.getText(); 098 if (msg.length() > 50) { 099 msg = msg.substring(0, 50) + "..."; 100 } 101 102 System.out.println("Received: " + msg); 103 } 104 } else { 105 if (verbose) { 106 System.out.println("Received: " + message); 107 } 108 } 109 /* 110 * if (++count % dumpCount == 0) { dumpStats(connection); } 111 */ 112 } catch (JMSException e) { 113 System.out.println("Caught: " + e); 114 e.printStackTrace(); 115 } 116 } 117 118 protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException { 119 System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown"); 120 121 for (int i = 0; i < maxiumMessages; i++) { 122 Message message = consumer.receive(); 123 onMessage(message); 124 } 125 System.out.println("Closing connection"); 126 consumer.close(); 127 session.close(); 128 connection.close(); 129 if (pauseBeforeShutdown) { 130 System.out.println("Press return to shut down"); 131 System.in.read(); 132 } 133 } 134 }