001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.tool;
018    
019    import java.util.Arrays;
020    
021    import javax.jms.ConnectionFactory;
022    import javax.jms.DeliveryMode;
023    import javax.jms.Destination;
024    import javax.jms.JMSException;
025    import javax.jms.MessageProducer;
026    import javax.jms.TextMessage;
027    
028    import org.apache.activemq.tool.properties.JmsClientProperties;
029    import org.apache.activemq.tool.properties.JmsProducerProperties;
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    
033    public class JmsProducerClient extends AbstractJmsMeasurableClient {
034        private static final Log LOG = LogFactory.getLog(JmsProducerClient.class);
035    
036        protected JmsProducerProperties client;
037        protected MessageProducer jmsProducer;
038        protected TextMessage jmsTextMessage;
039    
040        public JmsProducerClient(ConnectionFactory factory) {
041            this(new JmsProducerProperties(), factory);
042        }
043    
044        public JmsProducerClient(JmsProducerProperties clientProps, ConnectionFactory factory) {
045            super(factory);
046            this.client = clientProps;
047        }
048    
049        public void sendMessages() throws JMSException {
050            // Send a specific number of messages
051            if (client.getSendType().equalsIgnoreCase(JmsProducerProperties.COUNT_BASED_SENDING)) {
052                sendCountBasedMessages(client.getSendCount());
053    
054            // Send messages for a specific duration
055            } else {
056                sendTimeBasedMessages(client.getSendDuration());
057            }
058        }
059    
060        public void sendMessages(int destCount) throws JMSException {
061            this.destCount = destCount;
062            sendMessages();
063        }
064    
065        public void sendMessages(int destIndex, int destCount) throws JMSException {
066            this.destIndex = destIndex;
067            sendMessages(destCount);
068        }
069    
070        public void sendCountBasedMessages(long messageCount) throws JMSException {
071            // Parse through different ways to send messages
072            // Avoided putting the condition inside the loop to prevent effect on performance
073            Destination[] dest = createDestination(destIndex, destCount);
074    
075            // Create a producer, if none is created.
076            if (getJmsProducer() == null) {
077                if (dest.length == 1) {
078                    createJmsProducer(dest[0]);
079                } else {
080                    createJmsProducer();
081                }
082            }
083            try {
084                getConnection().start();
085                LOG.info("Starting to publish " + client.getMessageSize() + " byte(s) of " + messageCount + " messages...");
086    
087                // Send one type of message only, avoiding the creation of different messages on sending
088                if (!client.isCreateNewMsg()) {
089                    // Create only a single message
090                    createJmsTextMessage();
091    
092                    // Send to more than one actual destination
093                    if (dest.length > 1) {
094                        for (int i = 0; i < messageCount; i++) {
095                            for (int j = 0; j < dest.length; j++) {
096                                getJmsProducer().send(dest[j], getJmsTextMessage());
097                                incThroughput();
098                            }
099                        }
100                        // Send to only one actual destination
101                    } else {
102                        for (int i = 0; i < messageCount; i++) {
103                            getJmsProducer().send(getJmsTextMessage());
104                            incThroughput();
105                        }
106                    }
107    
108                    // Send different type of messages using indexing to identify each one.
109                    // Message size will vary. Definitely slower, since messages properties
110                    // will be set individually each send.
111                } else {
112                    // Send to more than one actual destination
113                    if (dest.length > 1) {
114                        for (int i = 0; i < messageCount; i++) {
115                            for (int j = 0; j < dest.length; j++) {
116                                getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]"));
117                                incThroughput();
118                            }
119                        }
120    
121                        // Send to only one actual destination
122                    } else {
123                        for (int i = 0; i < messageCount; i++) {
124                            getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]"));
125                            incThroughput();
126                        }
127                    }
128                }
129            } finally {
130                getConnection().close();
131            }
132        }
133    
134        public void sendTimeBasedMessages(long duration) throws JMSException {
135            long endTime = System.currentTimeMillis() + duration;
136            // Parse through different ways to send messages
137            // Avoided putting the condition inside the loop to prevent effect on performance
138    
139            Destination[] dest = createDestination(destIndex, destCount);
140    
141            // Create a producer, if none is created.
142            if (getJmsProducer() == null) {
143                if (dest.length == 1) {
144                    createJmsProducer(dest[0]);
145                } else {
146                    createJmsProducer();
147                }
148            }
149    
150            try {
151                getConnection().start();
152                LOG.info("Starting to publish " + client.getMessageSize() + " byte(s) messages for " + duration + " ms");
153    
154                // Send one type of message only, avoiding the creation of different messages on sending
155                if (!client.isCreateNewMsg()) {
156                    // Create only a single message
157                    createJmsTextMessage();
158    
159                    // Send to more than one actual destination
160                    if (dest.length > 1) {
161                        while (System.currentTimeMillis() < endTime) {
162                            for (int j = 0; j < dest.length; j++) {
163                                getJmsProducer().send(dest[j], getJmsTextMessage());
164                                incThroughput();
165                            }
166                        }
167                        // Send to only one actual destination
168                    } else {
169                        while (System.currentTimeMillis() < endTime) {
170                            getJmsProducer().send(getJmsTextMessage());
171                            incThroughput();
172                        }
173                    }
174    
175                    // Send different type of messages using indexing to identify each one.
176                    // Message size will vary. Definitely slower, since messages properties
177                    // will be set individually each send.
178                } else {
179                    // Send to more than one actual destination
180                    long count = 1;
181                    if (dest.length > 1) {
182                        while (System.currentTimeMillis() < endTime) {
183                            for (int j = 0; j < dest.length; j++) {
184                                getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]"));
185                                incThroughput();
186                            }
187                        }
188    
189                        // Send to only one actual destination
190                    } else {
191                        while (System.currentTimeMillis() < endTime) {
192    
193                            getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]"));
194                            incThroughput();
195                        }
196                    }
197                }
198            } finally {
199                getConnection().close();
200            }
201        }
202    
203        public MessageProducer createJmsProducer() throws JMSException {
204            jmsProducer = getSession().createProducer(null);
205            if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_PERSISTENT)) {
206                LOG.info("Creating producer to possible multiple destinations with persistent delivery.");
207                jmsProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
208            } else if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_NON_PERSISTENT)) {
209                LOG.info("Creating producer to possible multiple destinations with non-persistent delivery.");
210                jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
211            } else {
212                LOG.warn("Unknown deliveryMode value. Defaulting to non-persistent.");
213                jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
214            }
215            return jmsProducer;
216        }
217    
218        public MessageProducer createJmsProducer(Destination dest) throws JMSException {
219            jmsProducer = getSession().createProducer(dest);
220            if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_PERSISTENT)) {
221                LOG.info("Creating producer to: " + dest.toString() + " with persistent delivery.");
222                jmsProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
223            } else if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_NON_PERSISTENT)) {
224                LOG.info("Creating  producer to: " + dest.toString() + " with non-persistent delivery.");
225                jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
226            } else {
227                LOG.warn("Unknown deliveryMode value. Defaulting to non-persistent.");
228                jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
229            }
230            return jmsProducer;
231        }
232    
233        public MessageProducer getJmsProducer() {
234            return jmsProducer;
235        }
236    
237        public TextMessage createJmsTextMessage() throws JMSException {
238            return createJmsTextMessage(client.getMessageSize());
239        }
240    
241        public TextMessage createJmsTextMessage(int size) throws JMSException {
242            jmsTextMessage = getSession().createTextMessage(buildText("", size));
243            return jmsTextMessage;
244        }
245    
246        public TextMessage createJmsTextMessage(String text) throws JMSException {
247            jmsTextMessage = getSession().createTextMessage(buildText(text, client.getMessageSize()));
248            return jmsTextMessage;
249        }
250    
251        public TextMessage getJmsTextMessage() {
252            return jmsTextMessage;
253        }
254    
255        public JmsClientProperties getClient() {
256            return client;
257        }
258    
259        public void setClient(JmsClientProperties clientProps) {
260            client = (JmsProducerProperties)clientProps;
261        }
262    
263        protected String buildText(String text, int size) {
264            byte[] data = new byte[size - text.length()];
265            Arrays.fill(data, (byte) 0);
266            return text + new String(data);
267        }
268    }