001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq;
019    
020    import javax.jms.DeliveryMode;
021    import javax.jms.Destination;
022    import javax.jms.IllegalStateException;
023    import javax.jms.InvalidDestinationException;
024    import javax.jms.JMSException;
025    import javax.jms.Message;
026    import javax.jms.MessageFormatException;
027    import javax.jms.MessageProducer;
028    
029    import org.activemq.management.JMSProducerStatsImpl;
030    import org.activemq.management.StatsCapable;
031    import org.activemq.management.StatsImpl;
032    import org.activemq.message.ActiveMQDestination;
033    import org.activemq.util.IdGenerator;
034    
035    /**
036     * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
037     * destination. A <CODE>MessageProducer</CODE> object is created by passing a
038     * <CODE>Destination</CODE> object to a message-producer creation method
039     * supplied by a session.
040     * <P>
041     * <CODE>MessageProducer</CODE> is the parent interface for all message
042     * producers.
043     * <P>
044     * A client also has the option of creating a message producer without
045     * supplying a destination. In this case, a destination must be provided with
046     * every send operation. A typical use for this kind of message producer is to
047     * send replies to requests using the request's <CODE>JMSReplyTo</CODE>
048     * destination.
049     * <P>
050     * A client can specify a default delivery mode, priority, and time to live for
051     * messages sent by a message producer. It can also specify the delivery mode,
052     * priority, and time to live for an individual message.
053     * <P>
054     * A client can specify a time-to-live value in milliseconds for each message
055     * it sends. This value defines a message expiration time that is the sum of
056     * the message's time-to-live and the GMT when it is sent (for transacted
057     * sends, this is the time the client sends the message, not the time the
058     * transaction is committed).
059     * <P>
060     * A JMS provider should do its best to expire messages accurately; however,
061     * the JMS API does not define the accuracy provided.
062     *
063     * @version $Revision: 1.1.1.1 $
064     * @see javax.jms.TopicPublisher
065     * @see javax.jms.QueueSender
066     * @see javax.jms.Session#createProducer
067     */
068    public class ActiveMQMessageProducer implements MessageProducer, StatsCapable, Closeable {
069        protected ActiveMQSession session;
070        private IdGenerator idGenerator;
071        protected boolean closed;
072        private boolean disableMessageID;
073        private boolean disableMessageTimestamp;
074        private int defaultDeliveryMode;
075        private int defaultPriority;
076        private long defaultTimeToLive;
077        protected ActiveMQDestination defaultDestination;
078        private long startTime;
079        private JMSProducerStatsImpl stats;
080        private short producerId;
081       
082        private boolean reuseMessageId; //hint to reuse the same messageId - if already set
083    
084        protected ActiveMQMessageProducer(ActiveMQSession theSession, ActiveMQDestination destination) throws JMSException {
085            this.session = theSession;
086            this.defaultDestination = destination;
087            this.idGenerator = new IdGenerator();
088            this.disableMessageID = false;
089            this.disableMessageTimestamp = theSession.connection.isDisableTimeStampsByDefault();
090            this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
091            this.defaultPriority = Message.DEFAULT_PRIORITY;
092            this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
093            this.startTime = System.currentTimeMillis();
094            this.session.addProducer(this);
095            this.stats = new JMSProducerStatsImpl(theSession.getSessionStats(), destination);
096        }
097    
098        public StatsImpl getStats() {
099            return stats;
100        }
101    
102        public JMSProducerStatsImpl getProducerStats() {
103            return stats;
104        }
105    
106        /**
107         * Sets whether message IDs are disabled.
108         * <P>
109         * Since message IDs take some effort to create and increase a message's
110         * size, some JMS providers may be able to optimize message overhead if
111         * they are given a hint that the message ID is not used by an application.
112         * By calling the <CODE>setDisableMessageID</CODE> method on this message
113         * producer, a JMS client enables this potential optimization for all
114         * messages sent by this message producer. If the JMS provider accepts this
115         * hint, these messages must have the message ID set to null; if the
116         * provider ignores the hint, the message ID must be set to its normal
117         * unique value.
118         * <P>
119         * Message IDs are enabled by default.
120         *
121         * @param value indicates if message IDs are disabled
122         * @throws JMSException if the JMS provider fails to close the producer due to
123         *                      some internal error.
124         */
125        public void setDisableMessageID(boolean value) throws JMSException {
126            checkClosed();
127            this.disableMessageID = value;
128        }
129    
130        /**
131         * Gets an indication of whether message IDs are disabled.
132         *
133         * @return an indication of whether message IDs are disabled
134         * @throws JMSException if the JMS provider fails to determine if message IDs are
135         *                      disabled due to some internal error.
136         */
137        public boolean getDisableMessageID() throws JMSException {
138            checkClosed();
139            return this.disableMessageID;
140        }
141    
142        /**
143         * Sets whether message timestamps are disabled.
144         * <P>
145         * Since timestamps take some effort to create and increase a message's
146         * size, some JMS providers may be able to optimize message overhead if
147         * they are given a hint that the timestamp is not used by an application.
148         * By calling the <CODE>setDisableMessageTimestamp</CODE> method on this
149         * message producer, a JMS client enables this potential optimization for
150         * all messages sent by this message producer. If the JMS provider accepts
151         * this hint, these messages must have the timestamp set to zero; if the
152         * provider ignores the hint, the timestamp must be set to its normal
153         * value.
154         * <P>
155         * Message timestamps are enabled by default.
156         *
157         * @param value indicates if message timestamps are disabled
158         * @throws JMSException if the JMS provider fails to close the producer due to
159         *                      some internal error.
160         */
161        public void setDisableMessageTimestamp(boolean value) throws JMSException {
162            checkClosed();
163            this.disableMessageTimestamp = value;
164        }
165    
166        /**
167         * Gets an indication of whether message timestamps are disabled.
168         *
169         * @return an indication of whether message timestamps are disabled
170         * @throws JMSException if the JMS provider fails to close the producer due to
171         *                      some internal error.
172         */
173        public boolean getDisableMessageTimestamp() throws JMSException {
174            checkClosed();
175            return this.disableMessageTimestamp;
176        }
177    
178        /**
179         * Sets the producer's default delivery mode.
180         * <P>
181         * Delivery mode is set to <CODE>PERSISTENT</CODE> by default.
182         *
183         * @param newDeliveryMode the message delivery mode for this message producer; legal
184         *                        values are <code>DeliveryMode.NON_PERSISTENT</code> and
185         *                        <code>DeliveryMode.PERSISTENT</code>
186         * @throws JMSException if the JMS provider fails to set the delivery mode due to
187         *                      some internal error.
188         * @see javax.jms.MessageProducer#getDeliveryMode
189         * @see javax.jms.DeliveryMode#NON_PERSISTENT
190         * @see javax.jms.DeliveryMode#PERSISTENT
191         * @see javax.jms.Message#DEFAULT_DELIVERY_MODE
192         */
193        public void setDeliveryMode(int newDeliveryMode) throws JMSException {
194            if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT) {
195                throw new IllegalStateException("unkown delivery mode: " + newDeliveryMode);
196            }
197            checkClosed();
198            this.defaultDeliveryMode = newDeliveryMode;
199        }
200    
201        /**
202         * Gets the producer's default delivery mode.
203         *
204         * @return the message delivery mode for this message producer
205         * @throws JMSException if the JMS provider fails to close the producer due to
206         *                      some internal error.
207         */
208        public int getDeliveryMode() throws JMSException {
209            checkClosed();
210            return this.defaultDeliveryMode;
211        }
212    
213        /**
214         * Sets the producer's default priority.
215         * <P>
216         * The JMS API defines ten levels of priority value, with 0 as the lowest
217         * priority and 9 as the highest. Clients should consider priorities 0-4 as
218         * gradations of normal priority and priorities 5-9 as gradations of
219         * expedited priority. Priority is set to 4 by default.
220         *
221         * @param newDefaultPriority the message priority for this message producer; must be a
222         *                           value between 0 and 9
223         * @throws JMSException if the JMS provider fails to set the delivery mode due to
224         *                      some internal error.
225         * @see javax.jms.MessageProducer#getPriority
226         * @see javax.jms.Message#DEFAULT_PRIORITY
227         */
228        public void setPriority(int newDefaultPriority) throws JMSException {
229            if (newDefaultPriority < 0 || newDefaultPriority > 9) {
230                throw new IllegalStateException("default priority must be a value between 0 and 9");
231            }
232            checkClosed();
233            this.defaultPriority = newDefaultPriority;
234        }
235    
236        /**
237         * Gets the producer's default priority.
238         *
239         * @return the message priority for this message producer
240         * @throws JMSException if the JMS provider fails to close the producer due to
241         *                      some internal error.
242         * @see javax.jms.MessageProducer#setPriority
243         */
244        public int getPriority() throws JMSException {
245            checkClosed();
246            return this.defaultPriority;
247        }
248    
249        /**
250         * Sets the default length of time in milliseconds from its dispatch time
251         * that a produced message should be retained by the message system.
252         * <P>
253         * Time to live is set to zero by default.
254         *
255         * @param timeToLive the message time to live in milliseconds; zero is unlimited
256         * @throws JMSException if the JMS provider fails to set the time to live due to
257         *                      some internal error.
258         * @see javax.jms.MessageProducer#getTimeToLive
259         * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
260         */
261        public void setTimeToLive(long timeToLive) throws JMSException {
262            if (timeToLive < 0l) {
263                throw new IllegalStateException("cannot set a negative timeToLive");
264            }
265            checkClosed();
266            this.defaultTimeToLive = timeToLive;
267        }
268    
269        /**
270         * Gets the default length of time in milliseconds from its dispatch time
271         * that a produced message should be retained by the message system.
272         *
273         * @return the message time to live in milliseconds; zero is unlimited
274         * @throws JMSException if the JMS provider fails to get the time to live due to
275         *                      some internal error.
276         * @see javax.jms.MessageProducer#setTimeToLive
277         */
278        public long getTimeToLive() throws JMSException {
279            checkClosed();
280            return this.defaultTimeToLive;
281        }
282    
283        /**
284         * Gets the destination associated with this <CODE>MessageProducer</CODE>.
285         *
286         * @return this producer's <CODE>Destination/ <CODE>
287         * @throws JMSException if the JMS provider fails to close the producer due to
288         *                      some internal error.
289         * @since 1.1
290         */
291        public Destination getDestination() throws JMSException {
292            checkClosed();
293            return this.defaultDestination;
294        }
295    
296        /**
297         * Closes the message producer.
298         * <P>
299         * Since a provider may allocate some resources on behalf of a <CODE>
300         * MessageProducer</CODE> outside the Java virtual machine, clients should
301         * close them when they are not needed. Relying on garbage collection to
302         * eventually reclaim these resources may not be timely enough.
303         *
304         * @throws JMSException if the JMS provider fails to close the producer due to
305         *                      some internal error.
306         */
307        public void close() throws JMSException {
308            this.session.removeProducer(this);
309            closed = true;
310        }
311    
312        protected void checkClosed() throws IllegalStateException {
313            if (closed) {
314                throw new IllegalStateException("The producer is closed");
315            }
316        }
317    
318        /**
319         * Sends a message using the <CODE>MessageProducer</CODE>'s default
320         * delivery mode, priority, and time to live.
321         *
322         * @param message the message to send
323         * @throws JMSException                if the JMS provider fails to send the message due to some
324         *                                     internal error.
325         * @throws MessageFormatException      if an invalid message is specified.
326         * @throws InvalidDestinationException if a client uses this method with a <CODE>
327         *                                     MessageProducer</CODE> with an invalid destination.
328         * @throws java.lang.UnsupportedOperationException
329         *                                     if a client uses this method with a <CODE>
330         *                                     MessageProducer</CODE> that did not specify a
331         *                                     destination at creation time.
332         * @see javax.jms.Session#createProducer
333         * @see javax.jms.MessageProducer
334         * @since 1.1
335         */
336        public void send(Message message) throws JMSException {
337            this.send(this.defaultDestination, message, this.defaultDeliveryMode, this.defaultPriority,
338                    this.defaultTimeToLive);
339        }
340    
341        /**
342         * Sends a message to the destination, specifying delivery mode, priority,
343         * and time to live.
344         *
345         * @param message      the message to send
346         * @param deliveryMode the delivery mode to use
347         * @param priority     the priority for this message
348         * @param timeToLive   the message's lifetime (in milliseconds)
349         * @throws JMSException                if the JMS provider fails to send the message due to some
350         *                                     internal error.
351         * @throws MessageFormatException      if an invalid message is specified.
352         * @throws InvalidDestinationException if a client uses this method with a <CODE>
353         *                                     MessageProducer</CODE> with an invalid destination.
354         * @throws java.lang.UnsupportedOperationException
355         *                                     if a client uses this method with a <CODE>
356         *                                     MessageProducer</CODE> that did not specify a
357         *                                     destination at creation time.
358         * @see javax.jms.Session#createProducer
359         * @since 1.1
360         */
361        public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
362            this.send(this.defaultDestination, message, deliveryMode, priority, timeToLive);
363        }
364    
365        /**
366         * Sends a message to a destination for an unidentified message producer.
367         * Uses the <CODE>MessageProducer</CODE>'s default delivery mode,
368         * priority, and time to live.
369         * <P>
370         * Typically, a message producer is assigned a destination at creation
371         * time; however, the JMS API also supports unidentified message producers,
372         * which require that the destination be supplied every time a message is
373         * sent.
374         *
375         * @param destination the destination to send this message to
376         * @param message     the message to send
377         * @throws JMSException                if the JMS provider fails to send the message due to some
378         *                                     internal error.
379         * @throws MessageFormatException      if an invalid message is specified.
380         * @throws InvalidDestinationException if a client uses this method with an invalid destination.
381         * @throws java.lang.UnsupportedOperationException
382         *                                     if a client uses this method with a <CODE>
383         *                                     MessageProducer</CODE> that specified a destination at
384         *                                     creation time.
385         * @see javax.jms.Session#createProducer
386         * @see javax.jms.MessageProducer
387         * @since 1.1
388         */
389        public void send(Destination destination, Message message) throws JMSException {
390            this.send(destination, message, this.defaultDeliveryMode, this.defaultPriority, this.defaultTimeToLive);
391        }
392    
393        /**
394         * Sends a message to a destination for an unidentified message producer,
395         * specifying delivery mode, priority and time to live.
396         * <P>
397         * Typically, a message producer is assigned a destination at creation
398         * time; however, the JMS API also supports unidentified message producers,
399         * which require that the destination be supplied every time a message is
400         * sent.
401         *
402         * @param destination  the destination to send this message to
403         * @param message      the message to send
404         * @param deliveryMode the delivery mode to use
405         * @param priority     the priority for this message
406         * @param timeToLive   the message's lifetime (in milliseconds)
407         * @throws JMSException                if the JMS provider fails to send the message due to some
408         *                                     internal error.
409         * @throws MessageFormatException      if an invalid message is specified.
410         * @throws InvalidDestinationException if a client uses this method with an invalid destination.
411         * @see javax.jms.Session#createProducer
412         * @since 1.1
413         */
414        public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
415                throws JMSException {
416            checkClosed();
417            if (destination == null) {
418                throw new UnsupportedOperationException("Dont understand null destinations");
419            }        
420            if( this.defaultDestination!=null && destination!=this.defaultDestination && session.connection.isJ2EEcompliant()) {
421                throw new UnsupportedOperationException("This producer can only send messages to: "+defaultDestination.getPhysicalName());
422            }
423            if (this.defaultDestination==null){
424                this.session.connection.startAdvisoryForTempDestination(destination);
425            }
426            this.session.send(this, destination, message, deliveryMode, priority, timeToLive,reuseMessageId);
427            stats.onMessage(message);
428        }
429        
430        /**
431         * @return Returns the reuseMessageId.
432         */
433        public boolean isResuseMessageId() {
434            return reuseMessageId;
435        }
436        /**
437         * @param reuseMessageId The resuseMessageId to set.
438         */
439        public void setReuseMessageId(boolean reuseMessageId) {
440            this.reuseMessageId = reuseMessageId;
441        }
442    
443    
444        /**
445         * @return Returns the producerId.
446         */
447        protected short getProducerId() {
448            return producerId;
449        }
450        
451        /**
452         * @param producerId The producerId to set.
453         */
454        public void setProducerId(short producerId) {
455            this.producerId = producerId;
456        }
457    
458    
459        protected long getStartTime() {
460            return this.startTime;
461        }
462    
463        protected IdGenerator getIdGenerator() {
464            return this.idGenerator;
465        }
466        
467        protected long getNextSequenceNumber(){
468            return idGenerator.getNextSequence();
469        }
470        
471        protected String getProducerMessageKey(){
472            return idGenerator.getSeed();
473        }
474    }