001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.tool;
018    
019    import java.util.concurrent.atomic.AtomicInteger;
020    
021    import javax.jms.ConnectionFactory;
022    import javax.jms.Destination;
023    import javax.jms.JMSException;
024    import javax.jms.Message;
025    import javax.jms.MessageConsumer;
026    import javax.jms.MessageListener;
027    import javax.jms.Topic;
028    
029    import org.apache.activemq.tool.properties.JmsClientProperties;
030    import org.apache.activemq.tool.properties.JmsConsumerProperties;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    
034    public class JmsConsumerClient extends AbstractJmsMeasurableClient {
035        private static final Log LOG = LogFactory.getLog(JmsConsumerClient.class);
036    
037        protected MessageConsumer jmsConsumer;
038        protected JmsConsumerProperties client;
039    
040        public JmsConsumerClient(ConnectionFactory factory) {
041            this(new JmsConsumerProperties(), factory);
042        }
043    
044        public JmsConsumerClient(JmsConsumerProperties clientProps, ConnectionFactory factory) {
045            super(factory);
046            client = clientProps;
047        }
048    
049        public void receiveMessages() throws JMSException {
050            if (client.isAsyncRecv()) {
051                if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) {
052                    receiveAsyncTimeBasedMessages(client.getRecvDuration());
053                } else {
054                    receiveAsyncCountBasedMessages(client.getRecvCount());
055                }
056            } else {
057                if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) {
058                    receiveSyncTimeBasedMessages(client.getRecvDuration());
059                } else {
060                    receiveSyncCountBasedMessages(client.getRecvCount());
061                }
062            }
063        }
064    
065        public void receiveMessages(int destCount) throws JMSException {
066            this.destCount = destCount;
067            receiveMessages();
068        }
069    
070        public void receiveMessages(int destIndex, int destCount) throws JMSException {
071            this.destIndex = destIndex;
072            receiveMessages(destCount);
073        }
074    
075        public void receiveSyncTimeBasedMessages(long duration) throws JMSException {
076            if (getJmsConsumer() == null) {
077                createJmsConsumer();
078            }
079    
080            try {
081                getConnection().start();
082    
083                LOG.info("Starting to synchronously receive messages for " + duration + " ms...");
084                long endTime = System.currentTimeMillis() + duration;
085                while (System.currentTimeMillis() < endTime) {
086                    getJmsConsumer().receive();
087                    incThroughput();
088                }
089            } finally {
090                if (client.isDurable() && client.isUnsubscribe()) {
091                    LOG.info("Unsubscribing durable subscriber: " + getClientName());
092                    getJmsConsumer().close();
093                    getSession().unsubscribe(getClientName());
094                }
095                getConnection().close();
096            }
097        }
098    
099        public void receiveSyncCountBasedMessages(long count) throws JMSException {
100            if (getJmsConsumer() == null) {
101                createJmsConsumer();
102            }
103    
104            try {
105                getConnection().start();
106                LOG.info("Starting to synchronously receive " + count + " messages...");
107    
108                int recvCount = 0;
109                while (recvCount < count) {
110                    getJmsConsumer().receive();
111                    incThroughput();
112                    recvCount++;
113                }
114            } finally {
115                if (client.isDurable() && client.isUnsubscribe()) {
116                    LOG.info("Unsubscribing durable subscriber: " + getClientName());
117                    getJmsConsumer().close();
118                    getSession().unsubscribe(getClientName());
119                }
120                getConnection().close();
121            }
122        }
123    
124        public void receiveAsyncTimeBasedMessages(long duration) throws JMSException {
125            if (getJmsConsumer() == null) {
126                createJmsConsumer();
127            }
128    
129            getJmsConsumer().setMessageListener(new MessageListener() {
130                public void onMessage(Message msg) {
131                    incThroughput();
132                }
133            });
134    
135            try {
136                getConnection().start();
137                LOG.info("Starting to asynchronously receive messages for " + duration + " ms...");
138                try {
139                    Thread.sleep(duration);
140                } catch (InterruptedException e) {
141                    throw new JMSException("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage());
142                }
143            } finally {
144                if (client.isDurable() && client.isUnsubscribe()) {
145                    LOG.info("Unsubscribing durable subscriber: " + getClientName());
146                    getJmsConsumer().close();
147                    getSession().unsubscribe(getClientName());
148                }
149                getConnection().close();
150            }
151        }
152    
153        public void receiveAsyncCountBasedMessages(long count) throws JMSException {
154            if (getJmsConsumer() == null) {
155                createJmsConsumer();
156            }
157    
158            final AtomicInteger recvCount = new AtomicInteger(0);
159            getJmsConsumer().setMessageListener(new MessageListener() {
160                public void onMessage(Message msg) {
161                    incThroughput();
162                    recvCount.incrementAndGet();
163                    recvCount.notify();
164                }
165            });
166    
167            try {
168                getConnection().start();
169                LOG.info("Starting to asynchronously receive " + client.getRecvCount() + " messages...");
170                try {
171                    while (recvCount.get() < count) {
172                        recvCount.wait();
173                    }
174                } catch (InterruptedException e) {
175                    throw new JMSException("JMS consumer thread wait has been interrupted. Message: " + e.getMessage());
176                }
177            } finally {
178                if (client.isDurable() && client.isUnsubscribe()) {
179                    LOG.info("Unsubscribing durable subscriber: " + getClientName());
180                    getJmsConsumer().close();
181                    getSession().unsubscribe(getClientName());
182                }
183                getConnection().close();
184            }
185        }
186    
187        public MessageConsumer createJmsConsumer() throws JMSException {
188            Destination[] dest = createDestination(destIndex, destCount);
189            return createJmsConsumer(dest[0]);
190        }
191    
192        public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {
193            if (client.isDurable()) {
194                String clientName = getClientName();
195                if (clientName == null) {
196                    clientName = "JmsConsumer";
197                    setClientName(clientName);
198                }
199                LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
200                jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName);
201            } else {
202                LOG.info("Creating non-durable consumer to: " + dest.toString());
203                jmsConsumer = getSession().createConsumer(dest);
204            }
205            return jmsConsumer;
206        }
207    
208        public MessageConsumer createJmsConsumer(Destination dest, String selector, boolean noLocal) throws JMSException {
209            if (client.isDurable()) {
210                String clientName = getClientName();
211                if (clientName == null) {
212                    clientName = "JmsConsumer";
213                    setClientName(clientName);
214                }
215                LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
216                jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName, selector, noLocal);
217            } else {
218                LOG.info("Creating non-durable consumer to: " + dest.toString());
219                jmsConsumer = getSession().createConsumer(dest, selector, noLocal);
220            }
221            return jmsConsumer;
222        }
223    
224        public MessageConsumer getJmsConsumer() {
225            return jmsConsumer;
226        }
227    
228        public JmsClientProperties getClient() {
229            return client;
230        }
231    
232        public void setClient(JmsClientProperties clientProps) {
233            client = (JmsConsumerProperties)clientProps;
234        }
235    }