001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.File;
020    import java.io.InputStream;
021    import java.io.Serializable;
022    import java.net.URL;
023    import java.util.Collections;
024    import java.util.Iterator;
025    import java.util.List;
026    import java.util.concurrent.CopyOnWriteArrayList;
027    import java.util.concurrent.atomic.AtomicBoolean;
028    
029    import javax.jms.BytesMessage;
030    import javax.jms.Destination;
031    import javax.jms.IllegalStateException;
032    import javax.jms.InvalidDestinationException;
033    import javax.jms.InvalidSelectorException;
034    import javax.jms.JMSException;
035    import javax.jms.MapMessage;
036    import javax.jms.Message;
037    import javax.jms.MessageConsumer;
038    import javax.jms.MessageListener;
039    import javax.jms.MessageProducer;
040    import javax.jms.ObjectMessage;
041    import javax.jms.Queue;
042    import javax.jms.QueueBrowser;
043    import javax.jms.QueueReceiver;
044    import javax.jms.QueueSender;
045    import javax.jms.QueueSession;
046    import javax.jms.Session;
047    import javax.jms.StreamMessage;
048    import javax.jms.TemporaryQueue;
049    import javax.jms.TemporaryTopic;
050    import javax.jms.TextMessage;
051    import javax.jms.Topic;
052    import javax.jms.TopicPublisher;
053    import javax.jms.TopicSession;
054    import javax.jms.TopicSubscriber;
055    import javax.jms.TransactionRolledBackException;
056    
057    import org.apache.activemq.blob.BlobDownloader;
058    import org.apache.activemq.blob.BlobTransferPolicy;
059    import org.apache.activemq.blob.BlobUploader;
060    import org.apache.activemq.command.ActiveMQBlobMessage;
061    import org.apache.activemq.command.ActiveMQBytesMessage;
062    import org.apache.activemq.command.ActiveMQDestination;
063    import org.apache.activemq.command.ActiveMQMapMessage;
064    import org.apache.activemq.command.ActiveMQMessage;
065    import org.apache.activemq.command.ActiveMQObjectMessage;
066    import org.apache.activemq.command.ActiveMQQueue;
067    import org.apache.activemq.command.ActiveMQStreamMessage;
068    import org.apache.activemq.command.ActiveMQTempDestination;
069    import org.apache.activemq.command.ActiveMQTempQueue;
070    import org.apache.activemq.command.ActiveMQTempTopic;
071    import org.apache.activemq.command.ActiveMQTextMessage;
072    import org.apache.activemq.command.ActiveMQTopic;
073    import org.apache.activemq.command.Command;
074    import org.apache.activemq.command.ConsumerId;
075    import org.apache.activemq.command.MessageAck;
076    import org.apache.activemq.command.MessageDispatch;
077    import org.apache.activemq.command.MessageId;
078    import org.apache.activemq.command.ProducerId;
079    import org.apache.activemq.command.RemoveInfo;
080    import org.apache.activemq.command.Response;
081    import org.apache.activemq.command.SessionId;
082    import org.apache.activemq.command.SessionInfo;
083    import org.apache.activemq.command.TransactionId;
084    import org.apache.activemq.management.JMSSessionStatsImpl;
085    import org.apache.activemq.management.StatsCapable;
086    import org.apache.activemq.management.StatsImpl;
087    import org.apache.activemq.thread.Scheduler;
088    import org.apache.activemq.transaction.Synchronization;
089    import org.apache.activemq.usage.MemoryUsage;
090    import org.apache.activemq.util.Callback;
091    import org.apache.activemq.util.LongSequenceGenerator;
092    import org.apache.commons.logging.Log;
093    import org.apache.commons.logging.LogFactory;
094    
095    /**
096     * <P>
097     * A <CODE>Session</CODE> object is a single-threaded context for producing
098     * and consuming messages. Although it may allocate provider resources outside
099     * the Java virtual machine (JVM), it is considered a lightweight JMS object.
100     * <P>
101     * A session serves several purposes:
102     * <UL>
103     * <LI>It is a factory for its message producers and consumers.
104     * <LI>It supplies provider-optimized message factories.
105     * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
106     * <CODE>TemporaryQueues</CODE>.
107     * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
108     * objects for those clients that need to dynamically manipulate
109     * provider-specific destination names.
110     * <LI>It supports a single series of transactions that combine work spanning
111     * its producers and consumers into atomic units.
112     * <LI>It defines a serial order for the messages it consumes and the messages
113     * it produces.
114     * <LI>It retains messages it consumes until they have been acknowledged.
115     * <LI>It serializes execution of message listeners registered with its message
116     * consumers.
117     * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
118     * </UL>
119     * <P>
120     * A session can create and service multiple message producers and consumers.
121     * <P>
122     * One typical use is to have a thread block on a synchronous
123     * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
124     * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
125     * <P>
126     * If a client desires to have one thread produce messages while others consume
127     * them, the client should use a separate session for its producing thread.
128     * <P>
129     * Once a connection has been started, any session with one or more registered
130     * message listeners is dedicated to the thread of control that delivers
131     * messages to it. It is erroneous for client code to use this session or any of
132     * its constituent objects from another thread of control. The only exception to
133     * this rule is the use of the session or connection <CODE>close</CODE>
134     * method.
135     * <P>
136     * It should be easy for most clients to partition their work naturally into
137     * sessions. This model allows clients to start simply and incrementally add
138     * message processing complexity as their need for concurrency grows.
139     * <P>
140     * The <CODE>close</CODE> method is the only session method that can be called
141     * while some other session method is being executed in another thread.
142     * <P>
143     * A session may be specified as transacted. Each transacted session supports a
144     * single series of transactions. Each transaction groups a set of message sends
145     * and a set of message receives into an atomic unit of work. In effect,
146     * transactions organize a session's input message stream and output message
147     * stream into series of atomic units. When a transaction commits, its atomic
148     * unit of input is acknowledged and its associated atomic unit of output is
149     * sent. If a transaction rollback is done, the transaction's sent messages are
150     * destroyed and the session's input is automatically recovered.
151     * <P>
152     * The content of a transaction's input and output units is simply those
153     * messages that have been produced and consumed within the session's current
154     * transaction.
155     * <P>
156     * A transaction is completed using either its session's <CODE>commit</CODE>
157     * method or its session's <CODE>rollback </CODE> method. The completion of a
158     * session's current transaction automatically begins the next. The result is
159     * that a transacted session always has a current transaction within which its
160     * work is done.
161     * <P>
162     * The Java Transaction Service (JTS) or some other transaction monitor may be
163     * used to combine a session's transaction with transactions on other resources
164     * (databases, other JMS sessions, etc.). Since Java distributed transactions
165     * are controlled via the Java Transaction API (JTA), use of the session's
166     * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
167     * prohibited.
168     * <P>
169     * The JMS API does not require support for JTA; however, it does define how a
170     * provider supplies this support.
171     * <P>
172     * Although it is also possible for a JMS client to handle distributed
173     * transactions directly, it is unlikely that many JMS clients will do this.
174     * Support for JTA in the JMS API is targeted at systems vendors who will be
175     * integrating the JMS API into their application server products.
176     * 
177     * @version $Revision: 1.34 $
178     * @see javax.jms.Session
179     * @see javax.jms.QueueSession
180     * @see javax.jms.TopicSession
181     * @see javax.jms.XASession
182     */
183    public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
184            
185            /**
186             * Only acknowledge an individual message - using message.acknowledge()
187             * as opposed to CLIENT_ACKNOWLEDGE which 
188             * acknowledges all messages consumed by a session at when acknowledge()
189             * is called
190             */
191        public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
192        public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
193    
194        public static interface DeliveryListener {
195            void beforeDelivery(ActiveMQSession session, Message msg);
196    
197            void afterDelivery(ActiveMQSession session, Message msg);
198        }
199    
200        private static final Log LOG = LogFactory.getLog(ActiveMQSession.class);
201        protected static final Scheduler scheduler = Scheduler.getInstance();
202    
203        protected int acknowledgementMode;
204        protected final ActiveMQConnection connection;
205        protected final SessionInfo info;
206        protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
207        protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
208        protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
209        protected final ActiveMQSessionExecutor executor = new ActiveMQSessionExecutor(this);
210        protected final AtomicBoolean started = new AtomicBoolean(false);
211    
212        protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
213        protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
214    
215        protected boolean closed;
216        private volatile boolean synchronizationRegistered;
217        protected boolean asyncDispatch;
218        protected boolean sessionAsyncDispatch;
219        protected final boolean debug;
220        protected Object sendMutex = new Object();
221    
222        private MessageListener messageListener;
223        private JMSSessionStatsImpl stats;
224        private TransactionContext transactionContext;
225        private DeliveryListener deliveryListener;
226        private MessageTransformer transformer;
227        private BlobTransferPolicy blobTransferPolicy;
228        private long lastDeliveredSequenceId;
229    
230        /**
231         * Construct the Session
232         * 
233         * @param connection
234         * @param sessionId
235         * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
236         *                Session.SESSION_TRANSACTED
237         * @param asyncDispatch
238         * @param sessionAsyncDispatch
239         * @throws JMSException on internal error
240         */
241        protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
242            this.debug = LOG.isDebugEnabled();
243            this.connection = connection;
244            this.acknowledgementMode = acknowledgeMode;
245            this.asyncDispatch = asyncDispatch;
246            this.sessionAsyncDispatch = sessionAsyncDispatch;
247            this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
248            setTransactionContext(new TransactionContext(connection));
249            connection.addSession(this);
250            stats = new JMSSessionStatsImpl(producers, consumers);
251            this.connection.asyncSendPacket(info);
252            setTransformer(connection.getTransformer());
253            setBlobTransferPolicy(connection.getBlobTransferPolicy());
254    
255            if (connection.isStarted()) {
256                start();
257            }
258    
259        }
260    
261        protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
262            this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
263        }
264    
265        /**
266         * Sets the transaction context of the session.
267         * 
268         * @param transactionContext - provides the means to control a JMS
269         *                transaction.
270         */
271        public void setTransactionContext(TransactionContext transactionContext) {
272            this.transactionContext = transactionContext;
273        }
274    
275        /**
276         * Returns the transaction context of the session.
277         * 
278         * @return transactionContext - session's transaction context.
279         */
280        public TransactionContext getTransactionContext() {
281            return transactionContext;
282        }
283    
284        /*
285         * (non-Javadoc)
286         * 
287         * @see org.apache.activemq.management.StatsCapable#getStats()
288         */
289        public StatsImpl getStats() {
290            return stats;
291        }
292    
293        /**
294         * Returns the session's statistics.
295         * 
296         * @return stats - session's statistics.
297         */
298        public JMSSessionStatsImpl getSessionStats() {
299            return stats;
300        }
301    
302        /**
303         * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
304         * object is used to send a message containing a stream of uninterpreted
305         * bytes.
306         * 
307         * @return the an ActiveMQBytesMessage
308         * @throws JMSException if the JMS provider fails to create this message due
309         *                 to some internal error.
310         */
311        public BytesMessage createBytesMessage() throws JMSException {
312            ActiveMQBytesMessage message = new ActiveMQBytesMessage();
313            configureMessage(message);
314            return message;
315        }
316    
317        /**
318         * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
319         * object is used to send a self-defining set of name-value pairs, where
320         * names are <CODE>String</CODE> objects and values are primitive values
321         * in the Java programming language.
322         * 
323         * @return an ActiveMQMapMessage
324         * @throws JMSException if the JMS provider fails to create this message due
325         *                 to some internal error.
326         */
327        public MapMessage createMapMessage() throws JMSException {
328            ActiveMQMapMessage message = new ActiveMQMapMessage();
329            configureMessage(message);
330            return message;
331        }
332    
333        /**
334         * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
335         * interface is the root interface of all JMS messages. A
336         * <CODE>Message</CODE> object holds all the standard message header
337         * information. It can be sent when a message containing only header
338         * information is sufficient.
339         * 
340         * @return an ActiveMQMessage
341         * @throws JMSException if the JMS provider fails to create this message due
342         *                 to some internal error.
343         */
344        public Message createMessage() throws JMSException {
345            ActiveMQMessage message = new ActiveMQMessage();
346            configureMessage(message);
347            return message;
348        }
349    
350        /**
351         * Creates an <CODE>ObjectMessage</CODE> object. An
352         * <CODE>ObjectMessage</CODE> object is used to send a message that
353         * contains a serializable Java object.
354         * 
355         * @return an ActiveMQObjectMessage
356         * @throws JMSException if the JMS provider fails to create this message due
357         *                 to some internal error.
358         */
359        public ObjectMessage createObjectMessage() throws JMSException {
360            ActiveMQObjectMessage message = new ActiveMQObjectMessage();
361            configureMessage(message);
362            return message;
363        }
364    
365        /**
366         * Creates an initialized <CODE>ObjectMessage</CODE> object. An
367         * <CODE>ObjectMessage</CODE> object is used to send a message that
368         * contains a serializable Java object.
369         * 
370         * @param object the object to use to initialize this message
371         * @return an ActiveMQObjectMessage
372         * @throws JMSException if the JMS provider fails to create this message due
373         *                 to some internal error.
374         */
375        public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
376            ActiveMQObjectMessage message = new ActiveMQObjectMessage();
377            configureMessage(message);
378            message.setObject(object);
379            return message;
380        }
381    
382        /**
383         * Creates a <CODE>StreamMessage</CODE> object. A
384         * <CODE>StreamMessage</CODE> object is used to send a self-defining
385         * stream of primitive values in the Java programming language.
386         * 
387         * @return an ActiveMQStreamMessage
388         * @throws JMSException if the JMS provider fails to create this message due
389         *                 to some internal error.
390         */
391        public StreamMessage createStreamMessage() throws JMSException {
392            ActiveMQStreamMessage message = new ActiveMQStreamMessage();
393            configureMessage(message);
394            return message;
395        }
396    
397        /**
398         * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
399         * object is used to send a message containing a <CODE>String</CODE>
400         * object.
401         * 
402         * @return an ActiveMQTextMessage
403         * @throws JMSException if the JMS provider fails to create this message due
404         *                 to some internal error.
405         */
406        public TextMessage createTextMessage() throws JMSException {
407            ActiveMQTextMessage message = new ActiveMQTextMessage();
408            configureMessage(message);
409            return message;
410        }
411    
412        /**
413         * Creates an initialized <CODE>TextMessage</CODE> object. A
414         * <CODE>TextMessage</CODE> object is used to send a message containing a
415         * <CODE>String</CODE>.
416         * 
417         * @param text the string used to initialize this message
418         * @return an ActiveMQTextMessage
419         * @throws JMSException if the JMS provider fails to create this message due
420         *                 to some internal error.
421         */
422        public TextMessage createTextMessage(String text) throws JMSException {
423            ActiveMQTextMessage message = new ActiveMQTextMessage();
424            message.setText(text);
425            configureMessage(message);
426            return message;
427        }
428    
429        /**
430         * Creates an initialized <CODE>BlobMessage</CODE> object. A
431         * <CODE>BlobMessage</CODE> object is used to send a message containing a
432         * <CODE>URL</CODE> which points to some network addressible BLOB.
433         * 
434         * @param url the network addressable URL used to pass directly to the
435         *                consumer
436         * @return a BlobMessage
437         * @throws JMSException if the JMS provider fails to create this message due
438         *                 to some internal error.
439         */
440        public BlobMessage createBlobMessage(URL url) throws JMSException {
441            return createBlobMessage(url, false);
442        }
443    
444        /**
445         * Creates an initialized <CODE>BlobMessage</CODE> object. A
446         * <CODE>BlobMessage</CODE> object is used to send a message containing a
447         * <CODE>URL</CODE> which points to some network addressible BLOB.
448         * 
449         * @param url the network addressable URL used to pass directly to the
450         *                consumer
451         * @param deletedByBroker indicates whether or not the resource is deleted
452         *                by the broker when the message is acknowledged
453         * @return a BlobMessage
454         * @throws JMSException if the JMS provider fails to create this message due
455         *                 to some internal error.
456         */
457        public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
458            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
459            configureMessage(message);
460            message.setURL(url);
461            message.setDeletedByBroker(deletedByBroker);
462            message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
463            return message;
464        }
465    
466        /**
467         * Creates an initialized <CODE>BlobMessage</CODE> object. A
468         * <CODE>BlobMessage</CODE> object is used to send a message containing
469         * the <CODE>File</CODE> content. Before the message is sent the file
470         * conent will be uploaded to the broker or some other remote repository
471         * depending on the {@link #getBlobTransferPolicy()}.
472         * 
473         * @param file the file to be uploaded to some remote repo (or the broker)
474         *                depending on the strategy
475         * @return a BlobMessage
476         * @throws JMSException if the JMS provider fails to create this message due
477         *                 to some internal error.
478         */
479        public BlobMessage createBlobMessage(File file) throws JMSException {
480            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
481            configureMessage(message);
482            message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
483            message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
484            message.setDeletedByBroker(true);
485            message.setName(file.getName());
486            return message;
487        }
488    
489        /**
490         * Creates an initialized <CODE>BlobMessage</CODE> object. A
491         * <CODE>BlobMessage</CODE> object is used to send a message containing
492         * the <CODE>File</CODE> content. Before the message is sent the file
493         * conent will be uploaded to the broker or some other remote repository
494         * depending on the {@link #getBlobTransferPolicy()}.
495         * 
496         * @param in the stream to be uploaded to some remote repo (or the broker)
497         *                depending on the strategy
498         * @return a BlobMessage
499         * @throws JMSException if the JMS provider fails to create this message due
500         *                 to some internal error.
501         */
502        public BlobMessage createBlobMessage(InputStream in) throws JMSException {
503            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
504            configureMessage(message);
505            message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
506            message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
507            message.setDeletedByBroker(true);
508            return message;
509        }
510    
511        /**
512         * Indicates whether the session is in transacted mode.
513         * 
514         * @return true if the session is in transacted mode
515         * @throws JMSException if there is some internal error.
516         */
517        public boolean getTransacted() throws JMSException {
518            checkClosed();
519            return isTransacted();
520        }
521    
522        /**
523         * Returns the acknowledgement mode of the session. The acknowledgement mode
524         * is set at the time that the session is created. If the session is
525         * transacted, the acknowledgement mode is ignored.
526         * 
527         * @return If the session is not transacted, returns the current
528         *         acknowledgement mode for the session. If the session is
529         *         transacted, returns SESSION_TRANSACTED.
530         * @throws JMSException
531         * @see javax.jms.Connection#createSession(boolean,int)
532         * @since 1.1 exception JMSException if there is some internal error.
533         */
534        public int getAcknowledgeMode() throws JMSException {
535            checkClosed();
536            return this.acknowledgementMode;
537        }
538    
539        /**
540         * Commits all messages done in this transaction and releases any locks
541         * currently held.
542         * 
543         * @throws JMSException if the JMS provider fails to commit the transaction
544         *                 due to some internal error.
545         * @throws TransactionRolledBackException if the transaction is rolled back
546         *                 due to some internal error during commit.
547         * @throws javax.jms.IllegalStateException if the method is not called by a
548         *                 transacted session.
549         */
550        public void commit() throws JMSException {
551            checkClosed();
552            if (!getTransacted()) {
553                throw new javax.jms.IllegalStateException("Not a transacted session");
554            }
555            if (LOG.isDebugEnabled()) {
556                LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
557            }
558            transactionContext.commit();
559        }
560    
561        /**
562         * Rolls back any messages done in this transaction and releases any locks
563         * currently held.
564         * 
565         * @throws JMSException if the JMS provider fails to roll back the
566         *                 transaction due to some internal error.
567         * @throws javax.jms.IllegalStateException if the method is not called by a
568         *                 transacted session.
569         */
570        public void rollback() throws JMSException {
571            checkClosed();
572            if (!getTransacted()) {
573                throw new javax.jms.IllegalStateException("Not a transacted session");
574            }
575            if (LOG.isDebugEnabled()) {
576                LOG.debug(getSessionId() + " Transaction Rollback");
577            }
578            transactionContext.rollback();
579        }
580    
581        /**
582         * Closes the session.
583         * <P>
584         * Since a provider may allocate some resources on behalf of a session
585         * outside the JVM, clients should close the resources when they are not
586         * needed. Relying on garbage collection to eventually reclaim these
587         * resources may not be timely enough.
588         * <P>
589         * There is no need to close the producers and consumers of a closed
590         * session.
591         * <P>
592         * This call will block until a <CODE>receive</CODE> call or message
593         * listener in progress has completed. A blocked message consumer
594         * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
595         * is closed.
596         * <P>
597         * Closing a transacted session must roll back the transaction in progress.
598         * <P>
599         * This method is the only <CODE>Session</CODE> method that can be called
600         * concurrently.
601         * <P>
602         * Invoking any other <CODE>Session</CODE> method on a closed session must
603         * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
604         * closed session must <I>not </I> throw an exception.
605         * 
606         * @throws JMSException if the JMS provider fails to close the session due
607         *                 to some internal error.
608         */
609        public void close() throws JMSException {
610            if (!closed) {
611                if (getTransactionContext().isInXATransaction()) {
612                    if (!synchronizationRegistered) {
613                        synchronizationRegistered = true;
614                        getTransactionContext().addSynchronization(new Synchronization() {
615    
616                                            public void afterCommit() throws Exception {
617                                                doClose();
618                                                synchronizationRegistered = false;
619                                            }
620    
621                                            public void afterRollback() throws Exception {
622                                                doClose();
623                                                synchronizationRegistered = false;
624                                            }
625                                        });
626                    }
627    
628                } else {
629                    doClose();
630                }
631            }
632        }
633    
634        private void doClose() throws JMSException {
635            dispose();
636            RemoveInfo removeCommand = info.createRemoveCommand();
637            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
638            connection.asyncSendPacket(removeCommand);
639        }
640    
641        void clearMessagesInProgress() {
642            executor.clearMessagesInProgress();        
643            // we are called from inside the transport reconnection logic
644            // which involves us clearing all the connections' consumers
645            // dispatch and delivered lists. So rather than trying to 
646            // grab a mutex (which could be already owned by the message 
647            // listener calling the send or an ack) we allow it to complete in 
648            // a separate thread via the scheduler and notify us via 
649            // connection.transportInterruptionProcessingComplete()
650            //
651            for (final ActiveMQMessageConsumer consumer : consumers) {
652                scheduler.executeAfterDelay(new Runnable() {
653                    public void run() {
654                        consumer.clearMessagesInProgress();
655                    }}, 0l);
656            }
657        }
658    
659        void deliverAcks() {
660            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
661                ActiveMQMessageConsumer consumer = iter.next();
662                consumer.deliverAcks();
663            }
664        }
665    
666        public synchronized void dispose() throws JMSException {
667            if (!closed) {
668    
669                try {
670                    executor.stop();
671    
672                    for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
673                        ActiveMQMessageConsumer consumer = iter.next();
674                        consumer.setFailureError(connection.getFirstFailureError());
675                        consumer.dispose();
676                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
677                    }
678                    consumers.clear();
679    
680                    for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
681                        ActiveMQMessageProducer producer = iter.next();
682                        producer.dispose();
683                    }
684                    producers.clear();
685    
686                    try {
687                        if (getTransactionContext().isInLocalTransaction()) {
688                            rollback();
689                        }
690                    } catch (JMSException e) {
691                    }
692    
693                } finally {
694                    connection.removeSession(this);
695                    this.transactionContext = null;
696                    closed = true;
697                }
698            }
699        }
700    
701        /**
702         * Checks that the session is not closed then configures the message
703         */
704        protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
705            checkClosed();
706            message.setConnection(connection);
707        }
708    
709        /**
710         * Check if the session is closed. It is used for ensuring that the session
711         * is open before performing various operations.
712         * 
713         * @throws IllegalStateException if the Session is closed
714         */
715        protected void checkClosed() throws IllegalStateException {
716            if (closed) {
717                throw new IllegalStateException("The Session is closed");
718            }
719        }
720    
721        /**
722         * Stops message delivery in this session, and restarts message delivery
723         * with the oldest unacknowledged message.
724         * <P>
725         * All consumers deliver messages in a serial order. Acknowledging a
726         * received message automatically acknowledges all messages that have been
727         * delivered to the client.
728         * <P>
729         * Restarting a session causes it to take the following actions:
730         * <UL>
731         * <LI>Stop message delivery
732         * <LI>Mark all messages that might have been delivered but not
733         * acknowledged as "redelivered"
734         * <LI>Restart the delivery sequence including all unacknowledged messages
735         * that had been previously delivered. Redelivered messages do not have to
736         * be delivered in exactly their original delivery order.
737         * </UL>
738         * 
739         * @throws JMSException if the JMS provider fails to stop and restart
740         *                 message delivery due to some internal error.
741         * @throws IllegalStateException if the method is called by a transacted
742         *                 session.
743         */
744        public void recover() throws JMSException {
745    
746            checkClosed();
747            if (getTransacted()) {
748                throw new IllegalStateException("This session is transacted");
749            }
750    
751            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
752                ActiveMQMessageConsumer c = iter.next();
753                c.rollback();
754            }
755    
756        }
757    
758        /**
759         * Returns the session's distinguished message listener (optional).
760         * 
761         * @return the message listener associated with this session
762         * @throws JMSException if the JMS provider fails to get the message
763         *                 listener due to an internal error.
764         * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
765         * @see javax.jms.ServerSessionPool
766         * @see javax.jms.ServerSession
767         */
768        public MessageListener getMessageListener() throws JMSException {
769            checkClosed();
770            return this.messageListener;
771        }
772    
773        /**
774         * Sets the session's distinguished message listener (optional).
775         * <P>
776         * When the distinguished message listener is set, no other form of message
777         * receipt in the session can be used; however, all forms of sending
778         * messages are still supported.
779         * <P>
780         * This is an expert facility not used by regular JMS clients.
781         * 
782         * @param listener the message listener to associate with this session
783         * @throws JMSException if the JMS provider fails to set the message
784         *                 listener due to an internal error.
785         * @see javax.jms.Session#getMessageListener()
786         * @see javax.jms.ServerSessionPool
787         * @see javax.jms.ServerSession
788         */
789        public void setMessageListener(MessageListener listener) throws JMSException {
790            checkClosed();
791            this.messageListener = listener;
792    
793            if (listener != null) {
794                executor.setDispatchedBySessionPool(true);
795            }
796        }
797    
798        /**
799         * Optional operation, intended to be used only by Application Servers, not
800         * by ordinary JMS clients.
801         * 
802         * @see javax.jms.ServerSession
803         */
804        public void run() {
805            MessageDispatch messageDispatch;
806            while ((messageDispatch = executor.dequeueNoWait()) != null) {
807                final MessageDispatch md = messageDispatch;
808                ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
809                if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
810                    // TODO: Ack it without delivery to client
811                    continue;
812                }
813    
814                if (isClientAcknowledge()||isIndividualAcknowledge()) {
815                    message.setAcknowledgeCallback(new Callback() {
816                        public void execute() throws Exception {
817                        }
818                    });
819                }
820    
821                if (deliveryListener != null) {
822                    deliveryListener.beforeDelivery(this, message);
823                }
824    
825                md.setDeliverySequenceId(getNextDeliveryId());
826    
827                try {
828                    messageListener.onMessage(message);
829                } catch (RuntimeException e) {
830                    LOG.error("error dispatching message: ", e);
831                    // A problem while invoking the MessageListener does not
832                    // in general indicate a problem with the connection to the broker, i.e.
833                    // it will usually be sufficient to let the afterDelivery() method either
834                    // commit or roll back in order to deal with the exception.
835                    // However, we notify any registered client internal exception listener
836                    // of the problem.
837                    connection.onClientInternalException(e);
838                }
839    
840                try {
841                    MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
842                    ack.setFirstMessageId(md.getMessage().getMessageId());
843                    doStartTransaction();
844                    ack.setTransactionId(getTransactionContext().getTransactionId());
845                    if (ack.getTransactionId() != null) {
846                        getTransactionContext().addSynchronization(new Synchronization() {
847    
848                            public void afterRollback() throws Exception {
849                                md.getMessage().onMessageRolledBack();
850                                // ensure we don't filter this as a duplicate
851                                connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
852                                RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
853                                int redeliveryCounter = md.getMessage().getRedeliveryCounter();
854                                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
855                                    && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
856                                    // We need to NACK the messages so that they get
857                                    // sent to the
858                                    // DLQ.
859                                    // Acknowledge the last message.
860                                    MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
861                                    ack.setFirstMessageId(md.getMessage().getMessageId());
862                                    asyncSendPacket(ack);
863                                } else {
864                                    
865                                    MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
866                                    ack.setFirstMessageId(md.getMessage().getMessageId());
867                                    asyncSendPacket(ack);
868    
869                                    // Figure out how long we should wait to resend
870                                    // this message.
871                                    long redeliveryDelay = 0;
872                                    for (int i = 0; i < redeliveryCounter; i++) {
873                                        redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
874                                    }
875                                    scheduler.executeAfterDelay(new Runnable() {
876    
877                                        public void run() {
878                                            ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
879                                        }
880                                    }, redeliveryDelay);
881                                }
882                            }
883                        });
884                    }
885                    asyncSendPacket(ack);
886                } catch (Throwable e) {
887                    connection.onClientInternalException(e);
888                }
889    
890                if (deliveryListener != null) {
891                    deliveryListener.afterDelivery(this, message);
892                }
893            }
894        }
895    
896        /**
897         * Creates a <CODE>MessageProducer</CODE> to send messages to the
898         * specified destination.
899         * <P>
900         * A client uses a <CODE>MessageProducer</CODE> object to send messages to
901         * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
902         * inherit from <CODE>Destination</CODE>, they can be used in the
903         * destination parameter to create a <CODE>MessageProducer</CODE> object.
904         * 
905         * @param destination the <CODE>Destination</CODE> to send to, or null if
906         *                this is a producer which does not have a specified
907         *                destination.
908         * @return the MessageProducer
909         * @throws JMSException if the session fails to create a MessageProducer due
910         *                 to some internal error.
911         * @throws InvalidDestinationException if an invalid destination is
912         *                 specified.
913         * @since 1.1
914         */
915        public MessageProducer createProducer(Destination destination) throws JMSException {
916            checkClosed();
917            if (destination instanceof CustomDestination) {
918                CustomDestination customDestination = (CustomDestination)destination;
919                return customDestination.createProducer(this);
920            }
921            int timeSendOut = connection.getSendTimeout();
922            return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
923        }
924    
925        /**
926         * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
927         * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
928         * <CODE>Destination</CODE>, they can be used in the destination
929         * parameter to create a <CODE>MessageConsumer</CODE>.
930         * 
931         * @param destination the <CODE>Destination</CODE> to access.
932         * @return the MessageConsumer
933         * @throws JMSException if the session fails to create a consumer due to
934         *                 some internal error.
935         * @throws InvalidDestinationException if an invalid destination is
936         *                 specified.
937         * @since 1.1
938         */
939        public MessageConsumer createConsumer(Destination destination) throws JMSException {
940            return createConsumer(destination, (String) null);
941        }
942    
943        /**
944         * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
945         * using a message selector. Since <CODE> Queue</CODE> and
946         * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
947         * can be used in the destination parameter to create a
948         * <CODE>MessageConsumer</CODE>.
949         * <P>
950         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
951         * that have been sent to a destination.
952         * 
953         * @param destination the <CODE>Destination</CODE> to access
954         * @param messageSelector only messages with properties matching the message
955         *                selector expression are delivered. A value of null or an
956         *                empty string indicates that there is no message selector
957         *                for the message consumer.
958         * @return the MessageConsumer
959         * @throws JMSException if the session fails to create a MessageConsumer due
960         *                 to some internal error.
961         * @throws InvalidDestinationException if an invalid destination is
962         *                 specified.
963         * @throws InvalidSelectorException if the message selector is invalid.
964         * @since 1.1
965         */
966        public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
967            return createConsumer(destination, messageSelector, false);
968        }
969    
970        /**
971         * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
972         * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
973         * <CODE>Destination</CODE>, they can be used in the destination
974         * parameter to create a <CODE>MessageConsumer</CODE>.
975         *
976         * @param destination the <CODE>Destination</CODE> to access.
977         * @param messageListener the listener to use for async consumption of messages
978         * @return the MessageConsumer
979         * @throws JMSException if the session fails to create a consumer due to
980         *                 some internal error.
981         * @throws InvalidDestinationException if an invalid destination is
982         *                 specified.
983         * @since 1.1
984         */
985        public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
986            return createConsumer(destination, null, messageListener);
987        }
988    
989        /**
990         * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
991         * using a message selector. Since <CODE> Queue</CODE> and
992         * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
993         * can be used in the destination parameter to create a
994         * <CODE>MessageConsumer</CODE>.
995         * <P>
996         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
997         * that have been sent to a destination.
998         *
999         * @param destination the <CODE>Destination</CODE> to access
1000         * @param messageSelector only messages with properties matching the message
1001         *                selector expression are delivered. A value of null or an
1002         *                empty string indicates that there is no message selector
1003         *                for the message consumer.
1004         * @param messageListener the listener to use for async consumption of messages
1005         * @return the MessageConsumer
1006         * @throws JMSException if the session fails to create a MessageConsumer due
1007         *                 to some internal error.
1008         * @throws InvalidDestinationException if an invalid destination is
1009         *                 specified.
1010         * @throws InvalidSelectorException if the message selector is invalid.
1011         * @since 1.1
1012         */
1013        public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1014            return createConsumer(destination, messageSelector, false, messageListener);
1015        }
1016    
1017        /**
1018         * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1019         * using a message selector. This method can specify whether messages
1020         * published by its own connection should be delivered to it, if the
1021         * destination is a topic.
1022         * <P>
1023         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1024         * <CODE>Destination</CODE>, they can be used in the destination
1025         * parameter to create a <CODE>MessageConsumer</CODE>.
1026         * <P>
1027         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1028         * that have been published to a destination.
1029         * <P>
1030         * In some cases, a connection may both publish and subscribe to a topic.
1031         * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1032         * inhibit the delivery of messages published by its own connection. The
1033         * default value for this attribute is False. The <CODE>noLocal</CODE>
1034         * value must be supported by destinations that are topics.
1035         * 
1036         * @param destination the <CODE>Destination</CODE> to access
1037         * @param messageSelector only messages with properties matching the message
1038         *                selector expression are delivered. A value of null or an
1039         *                empty string indicates that there is no message selector
1040         *                for the message consumer.
1041         * @param noLocal - if true, and the destination is a topic, inhibits the
1042         *                delivery of messages published by its own connection. The
1043         *                behavior for <CODE>NoLocal</CODE> is not specified if
1044         *                the destination is a queue.
1045         * @return the MessageConsumer
1046         * @throws JMSException if the session fails to create a MessageConsumer due
1047         *                 to some internal error.
1048         * @throws InvalidDestinationException if an invalid destination is
1049         *                 specified.
1050         * @throws InvalidSelectorException if the message selector is invalid.
1051         * @since 1.1
1052         */
1053        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1054            return createConsumer(destination, messageSelector, noLocal, null);
1055        }
1056    
1057        /**
1058         * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1059         * using a message selector. This method can specify whether messages
1060         * published by its own connection should be delivered to it, if the
1061         * destination is a topic.
1062         * <P>
1063         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1064         * <CODE>Destination</CODE>, they can be used in the destination
1065         * parameter to create a <CODE>MessageConsumer</CODE>.
1066         * <P>
1067         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1068         * that have been published to a destination.
1069         * <P>
1070         * In some cases, a connection may both publish and subscribe to a topic.
1071         * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1072         * inhibit the delivery of messages published by its own connection. The
1073         * default value for this attribute is False. The <CODE>noLocal</CODE>
1074         * value must be supported by destinations that are topics.
1075         *
1076         * @param destination the <CODE>Destination</CODE> to access
1077         * @param messageSelector only messages with properties matching the message
1078         *                selector expression are delivered. A value of null or an
1079         *                empty string indicates that there is no message selector
1080         *                for the message consumer.
1081         * @param noLocal - if true, and the destination is a topic, inhibits the
1082         *                delivery of messages published by its own connection. The
1083         *                behavior for <CODE>NoLocal</CODE> is not specified if
1084         *                the destination is a queue.
1085         * @param messageListener the listener to use for async consumption of messages
1086         * @return the MessageConsumer
1087         * @throws JMSException if the session fails to create a MessageConsumer due
1088         *                 to some internal error.
1089         * @throws InvalidDestinationException if an invalid destination is
1090         *                 specified.
1091         * @throws InvalidSelectorException if the message selector is invalid.
1092         * @since 1.1
1093         */
1094        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1095            checkClosed();
1096    
1097            if (destination instanceof CustomDestination) {
1098                CustomDestination customDestination = (CustomDestination)destination;
1099                return customDestination.createConsumer(this, messageSelector, noLocal);
1100            }
1101    
1102            ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1103            int prefetch = 0;
1104            if (destination instanceof Topic) {
1105                prefetch = prefetchPolicy.getTopicPrefetch();
1106            } else {
1107                prefetch = prefetchPolicy.getQueuePrefetch();
1108            }
1109            ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1110            return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1111                    prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1112        }
1113    
1114        /**
1115         * Creates a queue identity given a <CODE>Queue</CODE> name.
1116         * <P>
1117         * This facility is provided for the rare cases where clients need to
1118         * dynamically manipulate queue identity. It allows the creation of a queue
1119         * identity with a provider-specific name. Clients that depend on this
1120         * ability are not portable.
1121         * <P>
1122         * Note that this method is not for creating the physical queue. The
1123         * physical creation of queues is an administrative task and is not to be
1124         * initiated by the JMS API. The one exception is the creation of temporary
1125         * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1126         * method.
1127         * 
1128         * @param queueName the name of this <CODE>Queue</CODE>
1129         * @return a <CODE>Queue</CODE> with the given name
1130         * @throws JMSException if the session fails to create a queue due to some
1131         *                 internal error.
1132         * @since 1.1
1133         */
1134        public Queue createQueue(String queueName) throws JMSException {
1135            checkClosed();
1136            if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1137                return new ActiveMQTempQueue(queueName);
1138            }
1139            return new ActiveMQQueue(queueName);
1140        }
1141    
1142        /**
1143         * Creates a topic identity given a <CODE>Topic</CODE> name.
1144         * <P>
1145         * This facility is provided for the rare cases where clients need to
1146         * dynamically manipulate topic identity. This allows the creation of a
1147         * topic identity with a provider-specific name. Clients that depend on this
1148         * ability are not portable.
1149         * <P>
1150         * Note that this method is not for creating the physical topic. The
1151         * physical creation of topics is an administrative task and is not to be
1152         * initiated by the JMS API. The one exception is the creation of temporary
1153         * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1154         * method.
1155         * 
1156         * @param topicName the name of this <CODE>Topic</CODE>
1157         * @return a <CODE>Topic</CODE> with the given name
1158         * @throws JMSException if the session fails to create a topic due to some
1159         *                 internal error.
1160         * @since 1.1
1161         */
1162        public Topic createTopic(String topicName) throws JMSException {
1163            checkClosed();
1164            if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1165                return new ActiveMQTempTopic(topicName);
1166            }
1167            return new ActiveMQTopic(topicName);
1168        }
1169    
1170        /**
1171         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1172         * the specified queue.
1173         * 
1174         * @param queue the <CODE>queue</CODE> to access
1175         * @exception InvalidDestinationException if an invalid destination is
1176         *                    specified
1177         * @since 1.1
1178         */
1179        /**
1180         * Creates a durable subscriber to the specified topic.
1181         * <P>
1182         * If a client needs to receive all the messages published on a topic,
1183         * including the ones published while the subscriber is inactive, it uses a
1184         * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1185         * record of this durable subscription and insures that all messages from
1186         * the topic's publishers are retained until they are acknowledged by this
1187         * durable subscriber or they have expired.
1188         * <P>
1189         * Sessions with durable subscribers must always provide the same client
1190         * identifier. In addition, each client must specify a name that uniquely
1191         * identifies (within client identifier) each durable subscription it
1192         * creates. Only one session at a time can have a
1193         * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1194         * <P>
1195         * A client can change an existing durable subscription by creating a
1196         * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1197         * and/or message selector. Changing a durable subscriber is equivalent to
1198         * unsubscribing (deleting) the old one and creating a new one.
1199         * <P>
1200         * In some cases, a connection may both publish and subscribe to a topic.
1201         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1202         * inhibit the delivery of messages published by its own connection. The
1203         * default value for this attribute is false.
1204         * 
1205         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1206         * @param name the name used to identify this subscription
1207         * @return the TopicSubscriber
1208         * @throws JMSException if the session fails to create a subscriber due to
1209         *                 some internal error.
1210         * @throws InvalidDestinationException if an invalid topic is specified.
1211         * @since 1.1
1212         */
1213        public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1214            checkClosed();
1215            return createDurableSubscriber(topic, name, null, false);
1216        }
1217    
1218        /**
1219         * Creates a durable subscriber to the specified topic, using a message
1220         * selector and specifying whether messages published by its own connection
1221         * should be delivered to it.
1222         * <P>
1223         * If a client needs to receive all the messages published on a topic,
1224         * including the ones published while the subscriber is inactive, it uses a
1225         * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1226         * record of this durable subscription and insures that all messages from
1227         * the topic's publishers are retained until they are acknowledged by this
1228         * durable subscriber or they have expired.
1229         * <P>
1230         * Sessions with durable subscribers must always provide the same client
1231         * identifier. In addition, each client must specify a name which uniquely
1232         * identifies (within client identifier) each durable subscription it
1233         * creates. Only one session at a time can have a
1234         * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1235         * inactive durable subscriber is one that exists but does not currently
1236         * have a message consumer associated with it.
1237         * <P>
1238         * A client can change an existing durable subscription by creating a
1239         * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1240         * and/or message selector. Changing a durable subscriber is equivalent to
1241         * unsubscribing (deleting) the old one and creating a new one.
1242         * 
1243         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1244         * @param name the name used to identify this subscription
1245         * @param messageSelector only messages with properties matching the message
1246         *                selector expression are delivered. A value of null or an
1247         *                empty string indicates that there is no message selector
1248         *                for the message consumer.
1249         * @param noLocal if set, inhibits the delivery of messages published by its
1250         *                own connection
1251         * @return the Queue Browser
1252         * @throws JMSException if the session fails to create a subscriber due to
1253         *                 some internal error.
1254         * @throws InvalidDestinationException if an invalid topic is specified.
1255         * @throws InvalidSelectorException if the message selector is invalid.
1256         * @since 1.1
1257         */
1258        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1259            checkClosed();
1260    
1261            if (topic instanceof CustomDestination) {
1262                CustomDestination customDestination = (CustomDestination)topic;
1263                return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1264            }
1265    
1266            connection.checkClientIDWasManuallySpecified();
1267            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1268            int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1269            int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1270            return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1271                                               noLocal, false, asyncDispatch);
1272        }
1273    
1274        /**
1275         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1276         * the specified queue.
1277         * 
1278         * @param queue the <CODE>queue</CODE> to access
1279         * @return the Queue Browser
1280         * @throws JMSException if the session fails to create a browser due to some
1281         *                 internal error.
1282         * @throws InvalidDestinationException if an invalid destination is
1283         *                 specified
1284         * @since 1.1
1285         */
1286        public QueueBrowser createBrowser(Queue queue) throws JMSException {
1287            checkClosed();
1288            return createBrowser(queue, null);
1289        }
1290    
1291        /**
1292         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1293         * the specified queue using a message selector.
1294         * 
1295         * @param queue the <CODE>queue</CODE> to access
1296         * @param messageSelector only messages with properties matching the message
1297         *                selector expression are delivered. A value of null or an
1298         *                empty string indicates that there is no message selector
1299         *                for the message consumer.
1300         * @return the Queue Browser
1301         * @throws JMSException if the session fails to create a browser due to some
1302         *                 internal error.
1303         * @throws InvalidDestinationException if an invalid destination is
1304         *                 specified
1305         * @throws InvalidSelectorException if the message selector is invalid.
1306         * @since 1.1
1307         */
1308        public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1309            checkClosed();
1310            return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1311        }
1312    
1313        /**
1314         * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1315         * of the <CODE>Connection</CODE> unless it is deleted earlier.
1316         * 
1317         * @return a temporary queue identity
1318         * @throws JMSException if the session fails to create a temporary queue due
1319         *                 to some internal error.
1320         * @since 1.1
1321         */
1322        public TemporaryQueue createTemporaryQueue() throws JMSException {
1323            checkClosed();
1324            return (TemporaryQueue)connection.createTempDestination(false);
1325        }
1326    
1327        /**
1328         * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1329         * of the <CODE>Connection</CODE> unless it is deleted earlier.
1330         * 
1331         * @return a temporary topic identity
1332         * @throws JMSException if the session fails to create a temporary topic due
1333         *                 to some internal error.
1334         * @since 1.1
1335         */
1336        public TemporaryTopic createTemporaryTopic() throws JMSException {
1337            checkClosed();
1338            return (TemporaryTopic)connection.createTempDestination(true);
1339        }
1340    
1341        /**
1342         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1343         * the specified queue.
1344         * 
1345         * @param queue the <CODE>Queue</CODE> to access
1346         * @return
1347         * @throws JMSException if the session fails to create a receiver due to
1348         *                 some internal error.
1349         * @throws JMSException
1350         * @throws InvalidDestinationException if an invalid queue is specified.
1351         */
1352        public QueueReceiver createReceiver(Queue queue) throws JMSException {
1353            checkClosed();
1354            return createReceiver(queue, null);
1355        }
1356    
1357        /**
1358         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1359         * the specified queue using a message selector.
1360         * 
1361         * @param queue the <CODE>Queue</CODE> to access
1362         * @param messageSelector only messages with properties matching the message
1363         *                selector expression are delivered. A value of null or an
1364         *                empty string indicates that there is no message selector
1365         *                for the message consumer.
1366         * @return QueueReceiver
1367         * @throws JMSException if the session fails to create a receiver due to
1368         *                 some internal error.
1369         * @throws InvalidDestinationException if an invalid queue is specified.
1370         * @throws InvalidSelectorException if the message selector is invalid.
1371         */
1372        public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1373            checkClosed();
1374    
1375            if (queue instanceof CustomDestination) {
1376                CustomDestination customDestination = (CustomDestination)queue;
1377                return customDestination.createReceiver(this, messageSelector);
1378            }
1379    
1380            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1381            return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1382                                             prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1383        }
1384    
1385        /**
1386         * Creates a <CODE>QueueSender</CODE> object to send messages to the
1387         * specified queue.
1388         * 
1389         * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1390         *                unidentified producer
1391         * @return QueueSender
1392         * @throws JMSException if the session fails to create a sender due to some
1393         *                 internal error.
1394         * @throws InvalidDestinationException if an invalid queue is specified.
1395         */
1396        public QueueSender createSender(Queue queue) throws JMSException {
1397            checkClosed();
1398            if (queue instanceof CustomDestination) {
1399                CustomDestination customDestination = (CustomDestination)queue;
1400                return customDestination.createSender(this);
1401            }
1402            int timeSendOut = connection.getSendTimeout();
1403            return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1404        }
1405    
1406        /**
1407         * Creates a nondurable subscriber to the specified topic. <p/>
1408         * <P>
1409         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1410         * that have been published to a topic. <p/>
1411         * <P>
1412         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1413         * receive only messages that are published while they are active. <p/>
1414         * <P>
1415         * In some cases, a connection may both publish and subscribe to a topic.
1416         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1417         * inhibit the delivery of messages published by its own connection. The
1418         * default value for this attribute is false.
1419         * 
1420         * @param topic the <CODE>Topic</CODE> to subscribe to
1421         * @return TopicSubscriber
1422         * @throws JMSException if the session fails to create a subscriber due to
1423         *                 some internal error.
1424         * @throws InvalidDestinationException if an invalid topic is specified.
1425         */
1426        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1427            checkClosed();
1428            return createSubscriber(topic, null, false);
1429        }
1430    
1431        /**
1432         * Creates a nondurable subscriber to the specified topic, using a message
1433         * selector or specifying whether messages published by its own connection
1434         * should be delivered to it. <p/>
1435         * <P>
1436         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1437         * that have been published to a topic. <p/>
1438         * <P>
1439         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1440         * receive only messages that are published while they are active. <p/>
1441         * <P>
1442         * Messages filtered out by a subscriber's message selector will never be
1443         * delivered to the subscriber. From the subscriber's perspective, they do
1444         * not exist. <p/>
1445         * <P>
1446         * In some cases, a connection may both publish and subscribe to a topic.
1447         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1448         * inhibit the delivery of messages published by its own connection. The
1449         * default value for this attribute is false.
1450         * 
1451         * @param topic the <CODE>Topic</CODE> to subscribe to
1452         * @param messageSelector only messages with properties matching the message
1453         *                selector expression are delivered. A value of null or an
1454         *                empty string indicates that there is no message selector
1455         *                for the message consumer.
1456         * @param noLocal if set, inhibits the delivery of messages published by its
1457         *                own connection
1458         * @return TopicSubscriber
1459         * @throws JMSException if the session fails to create a subscriber due to
1460         *                 some internal error.
1461         * @throws InvalidDestinationException if an invalid topic is specified.
1462         * @throws InvalidSelectorException if the message selector is invalid.
1463         */
1464        public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1465            checkClosed();
1466    
1467            if (topic instanceof CustomDestination) {
1468                CustomDestination customDestination = (CustomDestination)topic;
1469                return customDestination.createSubscriber(this, messageSelector, noLocal);
1470            }
1471    
1472            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1473            return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1474                .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1475        }
1476    
1477        /**
1478         * Creates a publisher for the specified topic. <p/>
1479         * <P>
1480         * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1481         * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1482         * a topic, it defines a new sequence of messages that have no ordering
1483         * relationship with the messages it has previously sent.
1484         * 
1485         * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1486         *                an unidentified producer
1487         * @return TopicPublisher
1488         * @throws JMSException if the session fails to create a publisher due to
1489         *                 some internal error.
1490         * @throws InvalidDestinationException if an invalid topic is specified.
1491         */
1492        public TopicPublisher createPublisher(Topic topic) throws JMSException {
1493            checkClosed();
1494    
1495            if (topic instanceof CustomDestination) {
1496                CustomDestination customDestination = (CustomDestination)topic;
1497                return customDestination.createPublisher(this);
1498            }
1499            int timeSendOut = connection.getSendTimeout();
1500            return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1501        }
1502    
1503        /**
1504         * Unsubscribes a durable subscription that has been created by a client.
1505         * <P>
1506         * This method deletes the state being maintained on behalf of the
1507         * subscriber by its provider.
1508         * <P>
1509         * It is erroneous for a client to delete a durable subscription while there
1510         * is an active <CODE>MessageConsumer </CODE> or
1511         * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1512         * message is part of a pending transaction or has not been acknowledged in
1513         * the session.
1514         * 
1515         * @param name the name used to identify this subscription
1516         * @throws JMSException if the session fails to unsubscribe to the durable
1517         *                 subscription due to some internal error.
1518         * @throws InvalidDestinationException if an invalid subscription name is
1519         *                 specified.
1520         * @since 1.1
1521         */
1522        public void unsubscribe(String name) throws JMSException {
1523            checkClosed();
1524            connection.unsubscribe(name);
1525        }
1526    
1527        public void dispatch(MessageDispatch messageDispatch) {
1528            try {
1529                executor.execute(messageDispatch);
1530            } catch (InterruptedException e) {
1531                Thread.currentThread().interrupt();
1532                connection.onClientInternalException(e);
1533            }
1534        }
1535    
1536        /**
1537         * Acknowledges all consumed messages of the session of this consumed
1538         * message.
1539         * <P>
1540         * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1541         * for use when a client has specified that its JMS session's consumed
1542         * messages are to be explicitly acknowledged. By invoking
1543         * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1544         * all messages consumed by the session that the message was delivered to.
1545         * <P>
1546         * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1547         * sessions and sessions specified to use implicit acknowledgement modes.
1548         * <P>
1549         * A client may individually acknowledge each message as it is consumed, or
1550         * it may choose to acknowledge messages as an application-defined group
1551         * (which is done by calling acknowledge on the last received message of the
1552         * group, thereby acknowledging all messages consumed by the session.)
1553         * <P>
1554         * Messages that have been received but not acknowledged may be redelivered.
1555         * 
1556         * @throws JMSException if the JMS provider fails to acknowledge the
1557         *                 messages due to some internal error.
1558         * @throws javax.jms.IllegalStateException if this method is called on a
1559         *                 closed session.
1560         * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1561         */
1562        public void acknowledge() throws JMSException {
1563            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1564                ActiveMQMessageConsumer c = iter.next();
1565                c.acknowledge();
1566            }
1567        }
1568    
1569        /**
1570         * Add a message consumer.
1571         * 
1572         * @param consumer - message consumer.
1573         * @throws JMSException
1574         */
1575        protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1576            this.consumers.add(consumer);
1577            if (consumer.isDurableSubscriber()) {
1578                stats.onCreateDurableSubscriber();
1579            }
1580            this.connection.addDispatcher(consumer.getConsumerId(), this);
1581        }
1582    
1583        /**
1584         * Remove the message consumer.
1585         * 
1586         * @param consumer - consumer to be removed.
1587         * @throws JMSException
1588         */
1589        protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1590            this.connection.removeDispatcher(consumer.getConsumerId());
1591            if (consumer.isDurableSubscriber()) {
1592                stats.onRemoveDurableSubscriber();
1593            }
1594            this.consumers.remove(consumer);
1595            this.connection.removeDispatcher(consumer);
1596        }
1597    
1598        /**
1599         * Adds a message producer.
1600         * 
1601         * @param producer - message producer to be added.
1602         * @throws JMSException
1603         */
1604        protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1605            this.producers.add(producer);
1606            this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1607        }
1608    
1609        /**
1610         * Removes a message producer.
1611         * 
1612         * @param producer - message producer to be removed.
1613         * @throws JMSException
1614         */
1615        protected void removeProducer(ActiveMQMessageProducer producer) {
1616            this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1617            this.producers.remove(producer);
1618        }
1619    
1620        /**
1621         * Start this Session.
1622         * 
1623         * @throws JMSException
1624         */
1625        protected void start() throws JMSException {
1626            started.set(true);
1627            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1628                ActiveMQMessageConsumer c = iter.next();
1629                c.start();
1630            }
1631            executor.start();
1632        }
1633    
1634        /**
1635         * Stops this session.
1636         * 
1637         * @throws JMSException
1638         */
1639        protected void stop() throws JMSException {
1640    
1641            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1642                ActiveMQMessageConsumer c = iter.next();
1643                c.stop();
1644            }
1645    
1646            started.set(false);
1647            executor.stop();
1648        }
1649    
1650        /**
1651         * Returns the session id.
1652         * 
1653         * @return value - session id.
1654         */
1655        protected SessionId getSessionId() {
1656            return info.getSessionId();
1657        }
1658    
1659        /**
1660         * @return
1661         */
1662        protected ConsumerId getNextConsumerId() {
1663            return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1664        }
1665    
1666        /**
1667         * @return
1668         */
1669        protected ProducerId getNextProducerId() {
1670            return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1671        }
1672    
1673        /**
1674         * Sends the message for dispatch by the broker.
1675         * 
1676         * @param producer - message producer.
1677         * @param destination - message destination.
1678         * @param message - message to be sent.
1679         * @param deliveryMode - JMS messsage delivery mode.
1680         * @param priority - message priority.
1681         * @param timeToLive - message expiration.
1682         * @param producerWindow
1683         * @throws JMSException
1684         */
1685        protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1686                            MemoryUsage producerWindow, int sendTimeout) throws JMSException {
1687    
1688            checkClosed();
1689            if (destination.isTemporary() && connection.isDeleted(destination)) {
1690                throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1691            }
1692            synchronized (sendMutex) {
1693                // tell the Broker we are about to start a new transaction
1694                doStartTransaction();
1695                TransactionId txid = transactionContext.getTransactionId();
1696                long sequenceNumber = producer.getMessageSequence();
1697    
1698                //Set the "JMS" header fields on the orriginal message, see 1.1 spec section 3.4.11
1699                message.setJMSDestination(destination);
1700                message.setJMSDeliveryMode(deliveryMode);
1701                long expiration = 0L;
1702                if (!producer.getDisableMessageTimestamp()) {
1703                    long timeStamp = System.currentTimeMillis();
1704                    message.setJMSTimestamp(timeStamp);
1705                    if (timeToLive > 0) {
1706                        expiration = timeToLive + timeStamp;
1707                    }
1708                }
1709                message.setJMSExpiration(expiration);
1710                message.setJMSPriority(priority);
1711                message.setJMSRedelivered(false);
1712    
1713                // transform to our own message format here
1714                ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1715    
1716                // Set the message id.
1717                if (msg == message) {
1718                    msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1719                } else {
1720                    msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1721                    message.setJMSMessageID(msg.getMessageId().toString());
1722                }
1723                //clear the brokerPath in case we are re-sending this message
1724                msg.setBrokerPath(null);
1725    
1726    
1727                msg.setTransactionId(txid);
1728                if (connection.isCopyMessageOnSend()) {
1729                    msg = (ActiveMQMessage)msg.copy();
1730                }
1731                msg.setConnection(connection);
1732                msg.onSend();
1733                msg.setProducerId(msg.getMessageId().getProducerId());
1734                if (LOG.isTraceEnabled()) {
1735                    LOG.trace(getSessionId() + " sending message: " + msg);
1736                }
1737                if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1738                    this.connection.asyncSendPacket(msg);
1739                    if (producerWindow != null) {
1740                        // Since we defer lots of the marshaling till we hit the
1741                        // wire, this might not
1742                        // provide and accurate size. We may change over to doing
1743                        // more aggressive marshaling,
1744                        // to get more accurate sizes.. this is more important once
1745                        // users start using producer window
1746                        // flow control.
1747                        int size = msg.getSize();
1748                        producerWindow.increaseUsage(size);
1749                    }
1750                } else {
1751                    if (sendTimeout > 0) {
1752                        this.connection.syncSendPacket(msg,sendTimeout);
1753                    }else {
1754                        this.connection.syncSendPacket(msg);
1755                    }
1756                }
1757    
1758            }
1759        }
1760    
1761        /**
1762         * Send TransactionInfo to indicate transaction has started
1763         * 
1764         * @throws JMSException if some internal error occurs
1765         */
1766        protected void doStartTransaction() throws JMSException {
1767            if (getTransacted() && !transactionContext.isInXATransaction()) {
1768                transactionContext.begin();
1769            }
1770        }
1771    
1772        /**
1773         * Checks whether the session has unconsumed messages.
1774         * 
1775         * @return true - if there are unconsumed messages.
1776         */
1777        public boolean hasUncomsumedMessages() {
1778            return executor.hasUncomsumedMessages();
1779        }
1780    
1781        /**
1782         * Checks whether the session uses transactions.
1783         * 
1784         * @return true - if the session uses transactions.
1785         */
1786        public boolean isTransacted() {
1787            return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
1788        }
1789    
1790        /**
1791         * Checks whether the session used client acknowledgment.
1792         * 
1793         * @return true - if the session uses client acknowledgment.
1794         */
1795        protected boolean isClientAcknowledge() {
1796            return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1797        }
1798    
1799        /**
1800         * Checks whether the session used auto acknowledgment.
1801         * 
1802         * @return true - if the session uses client acknowledgment.
1803         */
1804        public boolean isAutoAcknowledge() {
1805            return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1806        }
1807    
1808        /**
1809         * Checks whether the session used dup ok acknowledgment.
1810         * 
1811         * @return true - if the session uses client acknowledgment.
1812         */
1813        public boolean isDupsOkAcknowledge() {
1814            return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1815        }
1816        
1817        public boolean isIndividualAcknowledge(){
1818            return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1819        }
1820    
1821        /**
1822         * Returns the message delivery listener.
1823         * 
1824         * @return deliveryListener - message delivery listener.
1825         */
1826        public DeliveryListener getDeliveryListener() {
1827            return deliveryListener;
1828        }
1829    
1830        /**
1831         * Sets the message delivery listener.
1832         * 
1833         * @param deliveryListener - message delivery listener.
1834         */
1835        public void setDeliveryListener(DeliveryListener deliveryListener) {
1836            this.deliveryListener = deliveryListener;
1837        }
1838    
1839        /**
1840         * Returns the SessionInfo bean.
1841         * 
1842         * @return info - SessionInfo bean.
1843         * @throws JMSException
1844         */
1845        protected SessionInfo getSessionInfo() throws JMSException {
1846            SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1847            return info;
1848        }
1849    
1850        /**
1851         * Send the asynchronus command.
1852         * 
1853         * @param command - command to be executed.
1854         * @throws JMSException
1855         */
1856        public void asyncSendPacket(Command command) throws JMSException {
1857            connection.asyncSendPacket(command);
1858        }
1859    
1860        /**
1861         * Send the synchronus command.
1862         * 
1863         * @param command - command to be executed.
1864         * @return Response
1865         * @throws JMSException
1866         */
1867        public Response syncSendPacket(Command command) throws JMSException {
1868            return connection.syncSendPacket(command);
1869        }
1870    
1871        public long getNextDeliveryId() {
1872            return deliveryIdGenerator.getNextSequenceId();
1873        }
1874    
1875        public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
1876    
1877            List<MessageDispatch> c = unconsumedMessages.removeAll();
1878            for (MessageDispatch md : c) {
1879                this.connection.rollbackDuplicate(dispatcher, md.getMessage());
1880            }
1881            Collections.reverse(c);
1882    
1883            for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
1884                MessageDispatch md = iter.next();
1885                executor.executeFirst(md);
1886            }
1887    
1888        }
1889    
1890        public boolean isRunning() {
1891            return started.get();
1892        }
1893    
1894        public boolean isAsyncDispatch() {
1895            return asyncDispatch;
1896        }
1897    
1898        public void setAsyncDispatch(boolean asyncDispatch) {
1899            this.asyncDispatch = asyncDispatch;
1900        }
1901    
1902        /**
1903         * @return Returns the sessionAsyncDispatch.
1904         */
1905        public boolean isSessionAsyncDispatch() {
1906            return sessionAsyncDispatch;
1907        }
1908    
1909        /**
1910         * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1911         */
1912        public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
1913            this.sessionAsyncDispatch = sessionAsyncDispatch;
1914        }
1915    
1916        public MessageTransformer getTransformer() {
1917            return transformer;
1918        }
1919    
1920        public ActiveMQConnection getConnection() {
1921            return connection;
1922        }
1923    
1924        /**
1925         * Sets the transformer used to transform messages before they are sent on
1926         * to the JMS bus or when they are received from the bus but before they are
1927         * delivered to the JMS client
1928         */
1929        public void setTransformer(MessageTransformer transformer) {
1930            this.transformer = transformer;
1931        }
1932    
1933        public BlobTransferPolicy getBlobTransferPolicy() {
1934            return blobTransferPolicy;
1935        }
1936    
1937        /**
1938         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1939         * OBjects) are transferred from producers to brokers to consumers
1940         */
1941        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1942            this.blobTransferPolicy = blobTransferPolicy;
1943        }
1944    
1945        public List getUnconsumedMessages() {
1946            return executor.getUnconsumedMessages();
1947        }
1948    
1949        public String toString() {
1950            return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
1951        }
1952    
1953        public void checkMessageListener() throws JMSException {
1954            if (messageListener != null) {
1955                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1956            }
1957            for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
1958                ActiveMQMessageConsumer consumer = i.next();
1959                if (consumer.getMessageListener() != null) {
1960                    throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1961                }
1962            }
1963        }
1964    
1965        protected void setOptimizeAcknowledge(boolean value) {
1966            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1967                ActiveMQMessageConsumer c = iter.next();
1968                c.setOptimizeAcknowledge(value);
1969            }
1970        }
1971    
1972        protected void setPrefetchSize(ConsumerId id, int prefetch) {
1973            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1974                ActiveMQMessageConsumer c = iter.next();
1975                if (c.getConsumerId().equals(id)) {
1976                    c.setPrefetchSize(prefetch);
1977                    break;
1978                }
1979            }
1980        }
1981    
1982        protected void close(ConsumerId id) {
1983            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1984                ActiveMQMessageConsumer c = iter.next();
1985                if (c.getConsumerId().equals(id)) {
1986                    try {
1987                        c.close();
1988                    } catch (JMSException e) {
1989                        LOG.warn("Exception closing consumer", e);
1990                    }
1991                    LOG.warn("Closed consumer on Command");
1992                    break;
1993                }
1994            }
1995        }
1996    
1997        public boolean isInUse(ActiveMQTempDestination destination) {
1998            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1999                ActiveMQMessageConsumer c = iter.next();
2000                if (c.isInUse(destination)) {
2001                    return true;
2002                }
2003            }
2004            return false;
2005        }
2006        
2007        /**
2008         * highest sequence id of the last message delivered by this session.
2009         * Passed to the broker in the close command, maintained by dispose()
2010         * @return lastDeliveredSequenceId
2011         */
2012        public long getLastDeliveredSequenceId() {
2013            return lastDeliveredSequenceId;
2014        }
2015        
2016        protected void sendAck(MessageAck ack) throws JMSException {
2017            sendAck(ack,false);
2018        }
2019        
2020        protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2021            if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2022                asyncSendPacket(ack);
2023            } else {
2024                syncSendPacket(ack);
2025            }
2026        }
2027    }