001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    import java.util.concurrent.atomic.AtomicLong;
022    
023    import javax.jms.Destination;
024    import javax.jms.IllegalStateException;
025    import javax.jms.InvalidDestinationException;
026    import javax.jms.JMSException;
027    import javax.jms.Message;
028    
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ProducerAck;
031    import org.apache.activemq.command.ProducerId;
032    import org.apache.activemq.command.ProducerInfo;
033    import org.apache.activemq.management.JMSProducerStatsImpl;
034    import org.apache.activemq.management.StatsCapable;
035    import org.apache.activemq.management.StatsImpl;
036    import org.apache.activemq.usage.MemoryUsage;
037    import org.apache.activemq.util.IntrospectionSupport;
038    
039    /**
040     * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
041     * destination. A <CODE>MessageProducer</CODE> object is created by passing a
042     * <CODE>Destination</CODE> object to a message-producer creation method
043     * supplied by a session.
044     * <P>
045     * <CODE>MessageProducer</CODE> is the parent interface for all message
046     * producers.
047     * <P>
048     * A client also has the option of creating a message producer without supplying
049     * a destination. In this case, a destination must be provided with every send
050     * operation. A typical use for this kind of message producer is to send replies
051     * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
052     * <P>
053     * A client can specify a default delivery mode, priority, and time to live for
054     * messages sent by a message producer. It can also specify the delivery mode,
055     * priority, and time to live for an individual message.
056     * <P>
057     * A client can specify a time-to-live value in milliseconds for each message it
058     * sends. This value defines a message expiration time that is the sum of the
059     * message's time-to-live and the GMT when it is sent (for transacted sends,
060     * this is the time the client sends the message, not the time the transaction
061     * is committed).
062     * <P>
063     * A JMS provider should do its best to expire messages accurately; however, the
064     * JMS API does not define the accuracy provided.
065     * 
066     * @version $Revision: 1.14 $
067     * @see javax.jms.TopicPublisher
068     * @see javax.jms.QueueSender
069     * @see javax.jms.Session#createProducer
070     */
071    public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
072    
073        protected ProducerInfo info;
074        protected boolean closed;
075    
076        private JMSProducerStatsImpl stats;
077        private AtomicLong messageSequence;
078        private long startTime;
079        private MessageTransformer transformer;
080        private MemoryUsage producerWindow;
081    
082        protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
083            super(session);
084            this.info = new ProducerInfo(producerId);
085            this.info.setWindowSize(session.connection.getProducerWindowSize());
086            if (destination != null && destination.getOptions() != null) {
087                Map<String, String> options = new HashMap<String, String>(destination.getOptions());
088                IntrospectionSupport.setProperties(this.info, options, "producer.");
089            }
090            this.info.setDestination(destination);
091    
092            // Enable producer window flow control if protocol > 3 and the window
093            // size > 0
094            if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
095                producerWindow = new MemoryUsage("Producer Window: " + producerId);
096                producerWindow.setLimit(this.info.getWindowSize());
097                producerWindow.start();
098            }
099    
100            this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
101            this.defaultPriority = Message.DEFAULT_PRIORITY;
102            this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
103            this.startTime = System.currentTimeMillis();
104            this.messageSequence = new AtomicLong(0);
105            this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
106            this.session.addProducer(this);
107            this.session.asyncSendPacket(info);
108            this.setSendTimeout(sendTimeout);
109            setTransformer(session.getTransformer());
110        }
111    
112        public StatsImpl getStats() {
113            return stats;
114        }
115    
116        public JMSProducerStatsImpl getProducerStats() {
117            return stats;
118        }
119    
120        /**
121         * Gets the destination associated with this <CODE>MessageProducer</CODE>.
122         * 
123         * @return this producer's <CODE>Destination/ <CODE>
124         * @throws JMSException if the JMS provider fails to close the producer due to
125         *                      some internal error.
126         * @since 1.1
127         */
128        public Destination getDestination() throws JMSException {
129            checkClosed();
130            return this.info.getDestination();
131        }
132    
133        /**
134         * Closes the message producer.
135         * <P>
136         * Since a provider may allocate some resources on behalf of a <CODE>
137         * MessageProducer</CODE>
138         * outside the Java virtual machine, clients should close them when they are
139         * not needed. Relying on garbage collection to eventually reclaim these
140         * resources may not be timely enough.
141         * 
142         * @throws JMSException if the JMS provider fails to close the producer due
143         *                 to some internal error.
144         */
145        public void close() throws JMSException {
146            if (!closed) {
147                dispose();
148                this.session.asyncSendPacket(info.createRemoveCommand());
149            }
150        }
151    
152        public void dispose() {
153            if (!closed) {
154                this.session.removeProducer(this);
155                if (producerWindow != null) {
156                    producerWindow.stop();
157                }
158                closed = true;
159            }
160        }
161    
162        /**
163         * Check if the instance of this producer has been closed.
164         * 
165         * @throws IllegalStateException
166         */
167        protected void checkClosed() throws IllegalStateException {
168            if (closed) {
169                throw new IllegalStateException("The producer is closed");
170            }
171        }
172    
173        /**
174         * Sends a message to a destination for an unidentified message producer,
175         * specifying delivery mode, priority and time to live.
176         * <P>
177         * Typically, a message producer is assigned a destination at creation time;
178         * however, the JMS API also supports unidentified message producers, which
179         * require that the destination be supplied every time a message is sent.
180         * 
181         * @param destination the destination to send this message to
182         * @param message the message to send
183         * @param deliveryMode the delivery mode to use
184         * @param priority the priority for this message
185         * @param timeToLive the message's lifetime (in milliseconds)
186         * @throws JMSException if the JMS provider fails to send the message due to
187         *                 some internal error.
188         * @throws UnsupportedOperationException if an invalid destination is
189         *                 specified.
190         * @throws InvalidDestinationException if a client uses this method with an
191         *                 invalid destination.
192         * @see javax.jms.Session#createProducer
193         * @since 1.1
194         */
195        public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
196            checkClosed();
197            if (destination == null) {
198                if (info.getDestination() == null) {
199                    throw new UnsupportedOperationException("A destination must be specified.");
200                }
201                throw new InvalidDestinationException("Don't understand null destinations");
202            }
203    
204            ActiveMQDestination dest;
205            if (destination == info.getDestination()) {
206                dest = (ActiveMQDestination)destination;
207            } else if (info.getDestination() == null) {
208                dest = ActiveMQDestination.transform(destination);
209            } else {
210                throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
211            }
212            if (dest == null) {
213                throw new JMSException("No destination specified");
214            }
215    
216            if (transformer != null) {
217                Message transformedMessage = transformer.producerTransform(session, this, message);
218                if (transformedMessage != null) {
219                    message = transformedMessage;
220                }
221            }
222    
223            if (producerWindow != null) {
224                try {
225                    producerWindow.waitForSpace();
226                } catch (InterruptedException e) {
227                    throw new JMSException("Send aborted due to thread interrupt.");
228                }
229            }
230    
231            this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
232    
233            stats.onMessage();
234        }
235    
236        public MessageTransformer getTransformer() {
237            return transformer;
238        }
239    
240        /**
241         * Sets the transformer used to transform messages before they are sent on
242         * to the JMS bus
243         */
244        public void setTransformer(MessageTransformer transformer) {
245            this.transformer = transformer;
246        }
247    
248        /**
249         * @return the time in milli second when this object was created.
250         */
251        protected long getStartTime() {
252            return this.startTime;
253        }
254    
255        /**
256         * @return Returns the messageSequence.
257         */
258        protected long getMessageSequence() {
259            return messageSequence.incrementAndGet();
260        }
261    
262        /**
263         * @param messageSequence The messageSequence to set.
264         */
265        protected void setMessageSequence(AtomicLong messageSequence) {
266            this.messageSequence = messageSequence;
267        }
268    
269        /**
270         * @return Returns the info.
271         */
272        protected ProducerInfo getProducerInfo() {
273            return this.info != null ? this.info : null;
274        }
275    
276        /**
277         * @param info The info to set
278         */
279        protected void setProducerInfo(ProducerInfo info) {
280            this.info = info;
281        }
282    
283        public String toString() {
284            return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
285        }
286    
287        public void onProducerAck(ProducerAck pa) {
288            if (this.producerWindow != null) {
289                this.producerWindow.decreaseUsage(pa.getSize());
290            }
291        }
292    
293    }