001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.tool; 018 019 import java.util.concurrent.atomic.AtomicInteger; 020 021 import javax.jms.ConnectionFactory; 022 import javax.jms.Destination; 023 import javax.jms.JMSException; 024 import javax.jms.Message; 025 import javax.jms.MessageConsumer; 026 import javax.jms.MessageListener; 027 import javax.jms.Topic; 028 029 import org.apache.activemq.tool.properties.JmsClientProperties; 030 import org.apache.activemq.tool.properties.JmsConsumerProperties; 031 import org.apache.commons.logging.Log; 032 import org.apache.commons.logging.LogFactory; 033 034 public class JmsConsumerClient extends AbstractJmsMeasurableClient { 035 private static final Log LOG = LogFactory.getLog(JmsConsumerClient.class); 036 037 protected MessageConsumer jmsConsumer; 038 protected JmsConsumerProperties client; 039 040 public JmsConsumerClient(ConnectionFactory factory) { 041 this(new JmsConsumerProperties(), factory); 042 } 043 044 public JmsConsumerClient(JmsConsumerProperties clientProps, ConnectionFactory factory) { 045 super(factory); 046 client = clientProps; 047 } 048 049 public void receiveMessages() throws JMSException { 050 if (client.isAsyncRecv()) { 051 if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) { 052 receiveAsyncTimeBasedMessages(client.getRecvDuration()); 053 } else { 054 receiveAsyncCountBasedMessages(client.getRecvCount()); 055 } 056 } else { 057 if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) { 058 receiveSyncTimeBasedMessages(client.getRecvDuration()); 059 } else { 060 receiveSyncCountBasedMessages(client.getRecvCount()); 061 } 062 } 063 } 064 065 public void receiveMessages(int destCount) throws JMSException { 066 this.destCount = destCount; 067 receiveMessages(); 068 } 069 070 public void receiveMessages(int destIndex, int destCount) throws JMSException { 071 this.destIndex = destIndex; 072 receiveMessages(destCount); 073 } 074 075 public void receiveSyncTimeBasedMessages(long duration) throws JMSException { 076 if (getJmsConsumer() == null) { 077 createJmsConsumer(); 078 } 079 080 try { 081 getConnection().start(); 082 083 LOG.info("Starting to synchronously receive messages for " + duration + " ms..."); 084 long endTime = System.currentTimeMillis() + duration; 085 while (System.currentTimeMillis() < endTime) { 086 getJmsConsumer().receive(); 087 incThroughput(); 088 } 089 } finally { 090 if (client.isDurable() && client.isUnsubscribe()) { 091 LOG.info("Unsubscribing durable subscriber: " + getClientName()); 092 getJmsConsumer().close(); 093 getSession().unsubscribe(getClientName()); 094 } 095 getConnection().close(); 096 } 097 } 098 099 public void receiveSyncCountBasedMessages(long count) throws JMSException { 100 if (getJmsConsumer() == null) { 101 createJmsConsumer(); 102 } 103 104 try { 105 getConnection().start(); 106 LOG.info("Starting to synchronously receive " + count + " messages..."); 107 108 int recvCount = 0; 109 while (recvCount < count) { 110 getJmsConsumer().receive(); 111 incThroughput(); 112 recvCount++; 113 } 114 } finally { 115 if (client.isDurable() && client.isUnsubscribe()) { 116 LOG.info("Unsubscribing durable subscriber: " + getClientName()); 117 getJmsConsumer().close(); 118 getSession().unsubscribe(getClientName()); 119 } 120 getConnection().close(); 121 } 122 } 123 124 public void receiveAsyncTimeBasedMessages(long duration) throws JMSException { 125 if (getJmsConsumer() == null) { 126 createJmsConsumer(); 127 } 128 129 getJmsConsumer().setMessageListener(new MessageListener() { 130 public void onMessage(Message msg) { 131 incThroughput(); 132 } 133 }); 134 135 try { 136 getConnection().start(); 137 LOG.info("Starting to asynchronously receive messages for " + duration + " ms..."); 138 try { 139 Thread.sleep(duration); 140 } catch (InterruptedException e) { 141 throw new JMSException("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage()); 142 } 143 } finally { 144 if (client.isDurable() && client.isUnsubscribe()) { 145 LOG.info("Unsubscribing durable subscriber: " + getClientName()); 146 getJmsConsumer().close(); 147 getSession().unsubscribe(getClientName()); 148 } 149 getConnection().close(); 150 } 151 } 152 153 public void receiveAsyncCountBasedMessages(long count) throws JMSException { 154 if (getJmsConsumer() == null) { 155 createJmsConsumer(); 156 } 157 158 final AtomicInteger recvCount = new AtomicInteger(0); 159 getJmsConsumer().setMessageListener(new MessageListener() { 160 public void onMessage(Message msg) { 161 incThroughput(); 162 recvCount.incrementAndGet(); 163 recvCount.notify(); 164 } 165 }); 166 167 try { 168 getConnection().start(); 169 LOG.info("Starting to asynchronously receive " + client.getRecvCount() + " messages..."); 170 try { 171 while (recvCount.get() < count) { 172 recvCount.wait(); 173 } 174 } catch (InterruptedException e) { 175 throw new JMSException("JMS consumer thread wait has been interrupted. Message: " + e.getMessage()); 176 } 177 } finally { 178 if (client.isDurable() && client.isUnsubscribe()) { 179 LOG.info("Unsubscribing durable subscriber: " + getClientName()); 180 getJmsConsumer().close(); 181 getSession().unsubscribe(getClientName()); 182 } 183 getConnection().close(); 184 } 185 } 186 187 public MessageConsumer createJmsConsumer() throws JMSException { 188 Destination[] dest = createDestination(destIndex, destCount); 189 return createJmsConsumer(dest[0]); 190 } 191 192 public MessageConsumer createJmsConsumer(Destination dest) throws JMSException { 193 if (client.isDurable()) { 194 String clientName = getClientName(); 195 if (clientName == null) { 196 clientName = "JmsConsumer"; 197 setClientName(clientName); 198 } 199 LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString()); 200 jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName); 201 } else { 202 LOG.info("Creating non-durable consumer to: " + dest.toString()); 203 jmsConsumer = getSession().createConsumer(dest); 204 } 205 return jmsConsumer; 206 } 207 208 public MessageConsumer createJmsConsumer(Destination dest, String selector, boolean noLocal) throws JMSException { 209 if (client.isDurable()) { 210 String clientName = getClientName(); 211 if (clientName == null) { 212 clientName = "JmsConsumer"; 213 setClientName(clientName); 214 } 215 LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString()); 216 jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName, selector, noLocal); 217 } else { 218 LOG.info("Creating non-durable consumer to: " + dest.toString()); 219 jmsConsumer = getSession().createConsumer(dest, selector, noLocal); 220 } 221 return jmsConsumer; 222 } 223 224 public MessageConsumer getJmsConsumer() { 225 return jmsConsumer; 226 } 227 228 public JmsClientProperties getClient() { 229 return client; 230 } 231 232 public void setClient(JmsClientProperties clientProps) { 233 client = (JmsConsumerProperties)clientProps; 234 } 235 }