001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.LinkedList;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Map.Entry;
027    import java.util.concurrent.ExecutorService;
028    import java.util.concurrent.Executors;
029    import java.util.concurrent.TimeUnit;
030    import java.util.concurrent.atomic.AtomicBoolean;
031    import java.util.concurrent.atomic.AtomicReference;
032    
033    import javax.jms.IllegalStateException;
034    import javax.jms.InvalidDestinationException;
035    import javax.jms.JMSException;
036    import javax.jms.Message;
037    import javax.jms.MessageConsumer;
038    import javax.jms.MessageListener;
039    import javax.jms.TransactionRolledBackException;
040    
041    import org.apache.activemq.blob.BlobDownloader;
042    import org.apache.activemq.command.ActiveMQBlobMessage;
043    import org.apache.activemq.command.ActiveMQDestination;
044    import org.apache.activemq.command.ActiveMQMessage;
045    import org.apache.activemq.command.ActiveMQTempDestination;
046    import org.apache.activemq.command.CommandTypes;
047    import org.apache.activemq.command.ConsumerId;
048    import org.apache.activemq.command.ConsumerInfo;
049    import org.apache.activemq.command.MessageAck;
050    import org.apache.activemq.command.MessageDispatch;
051    import org.apache.activemq.command.MessageId;
052    import org.apache.activemq.command.MessagePull;
053    import org.apache.activemq.command.RemoveInfo;
054    import org.apache.activemq.command.TransactionId;
055    import org.apache.activemq.management.JMSConsumerStatsImpl;
056    import org.apache.activemq.management.StatsCapable;
057    import org.apache.activemq.management.StatsImpl;
058    import org.apache.activemq.selector.SelectorParser;
059    import org.apache.activemq.thread.Scheduler;
060    import org.apache.activemq.transaction.Synchronization;
061    import org.apache.activemq.util.Callback;
062    import org.apache.activemq.util.IntrospectionSupport;
063    import org.apache.activemq.util.JMSExceptionSupport;
064    import org.apache.commons.logging.Log;
065    import org.apache.commons.logging.LogFactory;
066    
067    /**
068     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
069     * from a destination. A <CODE> MessageConsumer</CODE> object is created by
070     * passing a <CODE>Destination</CODE> object to a message-consumer creation
071     * method supplied by a session.
072     * <P>
073     * <CODE>MessageConsumer</CODE> is the parent interface for all message
074     * consumers.
075     * <P>
076     * A message consumer can be created with a message selector. A message selector
077     * allows the client to restrict the messages delivered to the message consumer
078     * to those that match the selector.
079     * <P>
080     * A client may either synchronously receive a message consumer's messages or
081     * have the consumer asynchronously deliver them as they arrive.
082     * <P>
083     * For synchronous receipt, a client can request the next message from a message
084     * consumer using one of its <CODE> receive</CODE> methods. There are several
085     * variations of <CODE>receive</CODE> that allow a client to poll or wait for
086     * the next message.
087     * <P>
088     * For asynchronous delivery, a client can register a
089     * <CODE>MessageListener</CODE> object with a message consumer. As messages
090     * arrive at the message consumer, it delivers them by calling the
091     * <CODE>MessageListener</CODE>'s<CODE>
092     * onMessage</CODE> method.
093     * <P>
094     * It is a client programming error for a <CODE>MessageListener</CODE> to
095     * throw an exception.
096     * 
097     * @version $Revision: 1.22 $
098     * @see javax.jms.MessageConsumer
099     * @see javax.jms.QueueReceiver
100     * @see javax.jms.TopicSubscriber
101     * @see javax.jms.Session
102     */
103    public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
104    
105        @SuppressWarnings("serial")
106        class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> {
107            final TransactionId transactionId;
108            public PreviouslyDeliveredMap(TransactionId transactionId) {
109                this.transactionId = transactionId;
110            }
111        }
112    
113        private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class);
114        protected static final Scheduler scheduler = Scheduler.getInstance();
115        protected final ActiveMQSession session;
116        protected final ConsumerInfo info;
117    
118        // These are the messages waiting to be delivered to the client
119        protected final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
120    
121        // The are the messages that were delivered to the consumer but that have
122        // not been acknowledged. It's kept in reverse order since we
123        // Always walk list in reverse order.
124        private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
125        // track duplicate deliveries in a transaction such that the tx integrity can be validated
126        private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
127        private int deliveredCounter;
128        private int additionalWindowSize;
129        private long redeliveryDelay;
130        private int ackCounter;
131        private int dispatchedCount;
132        private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
133        private JMSConsumerStatsImpl stats;
134    
135        private final String selector;
136        private boolean synchronizationRegistered;
137        private AtomicBoolean started = new AtomicBoolean(false);
138    
139        private MessageAvailableListener availableListener;
140    
141        private RedeliveryPolicy redeliveryPolicy;
142        private boolean optimizeAcknowledge;
143        private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
144        private ExecutorService executorService;
145        private MessageTransformer transformer;
146        private boolean clearDispatchList;
147    
148        private MessageAck pendingAck;
149        private long lastDeliveredSequenceId;
150    
151        private IOException failureError;
152        
153        private long optimizeAckTimestamp = System.currentTimeMillis();
154        private long optimizeAckTimeout = 300;
155        private long failoverRedeliveryWaitPeriod = 0;
156    
157        /**
158         * Create a MessageConsumer
159         * 
160         * @param session
161         * @param dest
162         * @param name
163         * @param selector
164         * @param prefetch
165         * @param maximumPendingMessageCount
166         * @param noLocal
167         * @param browser
168         * @param dispatchAsync
169         * @param messageListener
170         * @throws JMSException
171         */
172        public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
173                String name, String selector, int prefetch,
174                int maximumPendingMessageCount, boolean noLocal, boolean browser,
175                boolean dispatchAsync, MessageListener messageListener) throws JMSException {
176            if (dest == null) {
177                throw new InvalidDestinationException("Don't understand null destinations");
178            } else if (dest.getPhysicalName() == null) {
179                throw new InvalidDestinationException("The destination object was not given a physical name.");
180            } else if (dest.isTemporary()) {
181                String physicalName = dest.getPhysicalName();
182    
183                if (physicalName == null) {
184                    throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
185                }
186    
187                String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
188    
189                if (physicalName.indexOf(connectionID) < 0) {
190                    throw new InvalidDestinationException(
191                                                          "Cannot use a Temporary destination from another Connection");
192                }
193    
194                if (session.connection.isDeleted(dest)) {
195                    throw new InvalidDestinationException(
196                                                          "Cannot use a Temporary destination that has been deleted");
197                }
198                if (prefetch < 0) {
199                    throw new JMSException("Cannot have a prefetch size less than zero");
200                }
201            }
202    
203            this.session = session;
204            this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
205            setTransformer(session.getTransformer());
206    
207            this.info = new ConsumerInfo(consumerId);
208            this.info.setExclusive(this.session.connection.isExclusiveConsumer());
209            this.info.setSubscriptionName(name);
210            this.info.setPrefetchSize(prefetch);
211            this.info.setCurrentPrefetchSize(prefetch);
212            this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
213            this.info.setNoLocal(noLocal);
214            this.info.setDispatchAsync(dispatchAsync);
215            this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
216            this.info.setSelector(null);
217    
218            // Allows the options on the destination to configure the consumerInfo
219            if (dest.getOptions() != null) {
220                Map<String, String> options = new HashMap<String, String>(dest.getOptions());
221                IntrospectionSupport.setProperties(this.info, options, "consumer.");
222            }
223    
224            this.info.setDestination(dest);
225            this.info.setBrowser(browser);
226            if (selector != null && selector.trim().length() != 0) {
227                // Validate the selector
228                SelectorParser.parse(selector);
229                this.info.setSelector(selector);
230                this.selector = selector;
231            } else if (info.getSelector() != null) {
232                // Validate the selector
233                SelectorParser.parse(this.info.getSelector());
234                this.selector = this.info.getSelector();
235            } else {
236                this.selector = null;
237            }
238    
239            this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
240            this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
241                                       && !info.isBrowser();
242            this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
243            this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
244            if (messageListener != null) {
245                setMessageListener(messageListener);
246            }
247            try {
248                this.session.addConsumer(this);
249                this.session.syncSendPacket(info);
250            } catch (JMSException e) {
251                this.session.removeConsumer(this);
252                throw e;
253            }
254    
255            if (session.connection.isStarted()) {
256                start();
257            }
258        }
259    
260        private boolean isAutoAcknowledgeEach() {
261            return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
262        }
263    
264        private boolean isAutoAcknowledgeBatch() {
265            return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
266        }
267    
268        public StatsImpl getStats() {
269            return stats;
270        }
271    
272        public JMSConsumerStatsImpl getConsumerStats() {
273            return stats;
274        }
275    
276        public RedeliveryPolicy getRedeliveryPolicy() {
277            return redeliveryPolicy;
278        }
279    
280        /**
281         * Sets the redelivery policy used when messages are redelivered
282         */
283        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
284            this.redeliveryPolicy = redeliveryPolicy;
285        }
286    
287        public MessageTransformer getTransformer() {
288            return transformer;
289        }
290    
291        /**
292         * Sets the transformer used to transform messages before they are sent on
293         * to the JMS bus
294         */
295        public void setTransformer(MessageTransformer transformer) {
296            this.transformer = transformer;
297        }
298    
299        /**
300         * @return Returns the value.
301         */
302        public ConsumerId getConsumerId() {
303            return info.getConsumerId();
304        }
305    
306        /**
307         * @return the consumer name - used for durable consumers
308         */
309        public String getConsumerName() {
310            return this.info.getSubscriptionName();
311        }
312    
313        /**
314         * @return true if this consumer does not accept locally produced messages
315         */
316        protected boolean isNoLocal() {
317            return info.isNoLocal();
318        }
319    
320        /**
321         * Retrieve is a browser
322         * 
323         * @return true if a browser
324         */
325        protected boolean isBrowser() {
326            return info.isBrowser();
327        }
328    
329        /**
330         * @return ActiveMQDestination
331         */
332        protected ActiveMQDestination getDestination() {
333            return info.getDestination();
334        }
335    
336        /**
337         * @return Returns the prefetchNumber.
338         */
339        public int getPrefetchNumber() {
340            return info.getPrefetchSize();
341        }
342    
343        /**
344         * @return true if this is a durable topic subscriber
345         */
346        public boolean isDurableSubscriber() {
347            return info.getSubscriptionName() != null && info.getDestination().isTopic();
348        }
349    
350        /**
351         * Gets this message consumer's message selector expression.
352         * 
353         * @return this message consumer's message selector, or null if no message
354         *         selector exists for the message consumer (that is, if the message
355         *         selector was not set or was set to null or the empty string)
356         * @throws JMSException if the JMS provider fails to receive the next
357         *                 message due to some internal error.
358         */
359        public String getMessageSelector() throws JMSException {
360            checkClosed();
361            return selector;
362        }
363    
364        /**
365         * Gets the message consumer's <CODE>MessageListener</CODE>.
366         * 
367         * @return the listener for the message consumer, or null if no listener is
368         *         set
369         * @throws JMSException if the JMS provider fails to get the message
370         *                 listener due to some internal error.
371         * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
372         */
373        public MessageListener getMessageListener() throws JMSException {
374            checkClosed();
375            return this.messageListener.get();
376        }
377    
378        /**
379         * Sets the message consumer's <CODE>MessageListener</CODE>.
380         * <P>
381         * Setting the message listener to null is the equivalent of unsetting the
382         * message listener for the message consumer.
383         * <P>
384         * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
385         * while messages are being consumed by an existing listener or the consumer
386         * is being used to consume messages synchronously is undefined.
387         * 
388         * @param listener the listener to which the messages are to be delivered
389         * @throws JMSException if the JMS provider fails to receive the next
390         *                 message due to some internal error.
391         * @see javax.jms.MessageConsumer#getMessageListener
392         */
393        public void setMessageListener(MessageListener listener) throws JMSException {
394            checkClosed();
395            if (info.getPrefetchSize() == 0) {
396                throw new JMSException(
397                                       "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
398            }
399            if (listener != null) {
400                boolean wasRunning = session.isRunning();
401                if (wasRunning) {
402                    session.stop();
403                }
404    
405                this.messageListener.set(listener);
406                session.redispatch(this, unconsumedMessages);
407    
408                if (wasRunning) {
409                    session.start();
410                }
411            } else {
412                this.messageListener.set(null);
413            }
414        }
415    
416        public MessageAvailableListener getAvailableListener() {
417            return availableListener;
418        }
419    
420        /**
421         * Sets the listener used to notify synchronous consumers that there is a
422         * message available so that the {@link MessageConsumer#receiveNoWait()} can
423         * be called.
424         */
425        public void setAvailableListener(MessageAvailableListener availableListener) {
426            this.availableListener = availableListener;
427        }
428    
429        /**
430         * Used to get an enqueued message from the unconsumedMessages list. The
431         * amount of time this method blocks is based on the timeout value. - if
432         * timeout==-1 then it blocks until a message is received. - if timeout==0
433         * then it it tries to not block at all, it returns a message if it is
434         * available - if timeout>0 then it blocks up to timeout amount of time.
435         * Expired messages will consumed by this method.
436         * 
437         * @throws JMSException
438         * @return null if we timeout or if the consumer is closed.
439         */
440        private MessageDispatch dequeue(long timeout) throws JMSException {
441            try {
442                long deadline = 0;
443                if (timeout > 0) {
444                    deadline = System.currentTimeMillis() + timeout;
445                }
446                while (true) {
447                    MessageDispatch md = unconsumedMessages.dequeue(timeout);
448                    if (md == null) {
449                        if (timeout > 0 && !unconsumedMessages.isClosed()) {
450                            timeout = Math.max(deadline - System.currentTimeMillis(), 0);
451                        } else {
452                            if (failureError != null) {
453                                    throw JMSExceptionSupport.create(failureError);
454                            } else {
455                                    return null;
456                            }
457                        }
458                    } else if (md.getMessage() == null) {
459                        return null;
460                    } else if (md.getMessage().isExpired()) {
461                        if (LOG.isDebugEnabled()) {
462                            LOG.debug(getConsumerId() + " received expired message: " + md);
463                        }
464                        beforeMessageIsConsumed(md);
465                        afterMessageIsConsumed(md, true);
466                        if (timeout > 0) {
467                            timeout = Math.max(deadline - System.currentTimeMillis(), 0);
468                        }
469                    } else {
470                        if (LOG.isTraceEnabled()) {
471                            LOG.trace(getConsumerId() + " received message: " + md);
472                        }
473                        return md;
474                    }
475                }
476            } catch (InterruptedException e) {
477                Thread.currentThread().interrupt();
478                throw JMSExceptionSupport.create(e);
479            }
480        }
481    
482        /**
483         * Receives the next message produced for this message consumer.
484         * <P>
485         * This call blocks indefinitely until a message is produced or until this
486         * message consumer is closed.
487         * <P>
488         * If this <CODE>receive</CODE> is done within a transaction, the consumer
489         * retains the message until the transaction commits.
490         * 
491         * @return the next message produced for this message consumer, or null if
492         *         this message consumer is concurrently closed
493         */
494        public Message receive() throws JMSException {
495            checkClosed();
496            checkMessageListener();
497    
498            sendPullCommand(0);
499            MessageDispatch md = dequeue(-1);
500            if (md == null) {
501                return null;
502            }
503    
504            beforeMessageIsConsumed(md);
505            afterMessageIsConsumed(md, false);
506    
507            return createActiveMQMessage(md);
508        }
509    
510        /**
511         * @param md
512         * @return
513         */
514        private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
515            ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
516            if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
517                    ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
518            }
519            if (transformer != null) {
520                Message transformedMessage = transformer.consumerTransform(session, this, m);
521                if (transformedMessage != null) {
522                    m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
523                }
524            }
525            if (session.isClientAcknowledge()) {
526                m.setAcknowledgeCallback(new Callback() {
527                    public void execute() throws Exception {
528                        session.checkClosed();
529                        session.acknowledge();
530                    }
531                });
532            }else if (session.isIndividualAcknowledge()) {
533                m.setAcknowledgeCallback(new Callback() {
534                    public void execute() throws Exception {
535                        session.checkClosed();
536                        acknowledge(md);
537                    }
538                });
539            }
540            return m;
541        }
542    
543        /**
544         * Receives the next message that arrives within the specified timeout
545         * interval.
546         * <P>
547         * This call blocks until a message arrives, the timeout expires, or this
548         * message consumer is closed. A <CODE>timeout</CODE> of zero never
549         * expires, and the call blocks indefinitely.
550         * 
551         * @param timeout the timeout value (in milliseconds), a time out of zero
552         *                never expires.
553         * @return the next message produced for this message consumer, or null if
554         *         the timeout expires or this message consumer is concurrently
555         *         closed
556         */
557        public Message receive(long timeout) throws JMSException {
558            checkClosed();
559            checkMessageListener();
560            if (timeout == 0) {
561                return this.receive();
562    
563            }
564    
565            sendPullCommand(timeout);
566            while (timeout > 0) {
567    
568                MessageDispatch md;
569                if (info.getPrefetchSize() == 0) {
570                    md = dequeue(-1); // We let the broker let us know when we timeout.
571                } else {
572                    md = dequeue(timeout);
573                }
574    
575                if (md == null) {
576                    return null;
577                }
578    
579                beforeMessageIsConsumed(md);
580                afterMessageIsConsumed(md, false);
581                return createActiveMQMessage(md);
582            }
583            return null;
584        }
585    
586        /**
587         * Receives the next message if one is immediately available.
588         * 
589         * @return the next message produced for this message consumer, or null if
590         *         one is not available
591         * @throws JMSException if the JMS provider fails to receive the next
592         *                 message due to some internal error.
593         */
594        public Message receiveNoWait() throws JMSException {
595            checkClosed();
596            checkMessageListener();
597            sendPullCommand(-1);
598    
599            MessageDispatch md;
600            if (info.getPrefetchSize() == 0) {
601                md = dequeue(-1); // We let the broker let us know when we
602                // timeout.
603            } else {
604                md = dequeue(0);
605            }
606    
607            if (md == null) {
608                return null;
609            }
610    
611            beforeMessageIsConsumed(md);
612            afterMessageIsConsumed(md, false);
613            return createActiveMQMessage(md);
614        }
615    
616        /**
617         * Closes the message consumer.
618         * <P>
619         * Since a provider may allocate some resources on behalf of a <CODE>
620         * MessageConsumer</CODE>
621         * outside the Java virtual machine, clients should close them when they are
622         * not needed. Relying on garbage collection to eventually reclaim these
623         * resources may not be timely enough.
624         * <P>
625         * This call blocks until a <CODE>receive</CODE> or message listener in
626         * progress has completed. A blocked message consumer <CODE>receive </CODE>
627         * call returns null when this message consumer is closed.
628         * 
629         * @throws JMSException if the JMS provider fails to close the consumer due
630         *                 to some internal error.
631         */
632        public void close() throws JMSException {
633            if (!unconsumedMessages.isClosed()) {
634                if (session.getTransactionContext().isInTransaction()) {
635                    session.getTransactionContext().addSynchronization(new Synchronization() {
636                        public void afterCommit() throws Exception {
637                            doClose();
638                        }
639    
640                        public void afterRollback() throws Exception {
641                            doClose();
642                        }
643                    });
644                } else {
645                    doClose();
646                } 
647            }
648        }
649    
650        void doClose() throws JMSException {
651            dispose();
652            RemoveInfo removeCommand = info.createRemoveCommand();
653            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
654            this.session.asyncSendPacket(removeCommand);
655        }
656        
657        void clearMessagesInProgress() {
658            // deal with delivered messages async to avoid lock contention with in progress acks
659            clearDispatchList = true;
660            synchronized (unconsumedMessages.getMutex()) {            
661                if (LOG.isDebugEnabled()) {
662                    LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size() + ") on transport interrupt");
663                }
664                // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
665                List<MessageDispatch> list = unconsumedMessages.removeAll();
666                if (!this.info.isBrowser()) {
667                    for (MessageDispatch old : list) {
668                        session.connection.rollbackDuplicate(this, old.getMessage());
669                    }
670                }
671            }
672            // allow dispatch on this connection to resume
673            session.connection.transportInterruptionProcessingComplete();
674        }
675    
676        void deliverAcks() {
677            MessageAck ack = null;
678            if (deliveryingAcknowledgements.compareAndSet(false, true)) {
679                if (isAutoAcknowledgeEach()) {
680                    synchronized(deliveredMessages) {
681                        ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
682                        if (ack != null) {
683                            deliveredMessages.clear();
684                            ackCounter = 0;
685                            } else {
686                                ack = pendingAck;
687                                pendingAck = null;
688                            }
689                    }
690                } else if (pendingAck != null && pendingAck.isStandardAck()) {
691                    ack = pendingAck;
692                    pendingAck = null;
693                }
694                if (ack != null) {
695                    final MessageAck ackToSend = ack;
696                    
697                    if (executorService == null) {
698                        executorService = Executors.newSingleThreadExecutor();
699                    }
700                    executorService.submit(new Runnable() {
701                        public void run() {
702                            try {
703                                session.sendAck(ackToSend,true);
704                            } catch (JMSException e) {
705                                LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
706                            } finally {
707                                deliveryingAcknowledgements.set(false);
708                            }
709                        }
710                    });
711                } else {
712                    deliveryingAcknowledgements.set(false);
713                }
714            }
715        }
716    
717        public void dispose() throws JMSException {
718            if (!unconsumedMessages.isClosed()) {
719                
720                // Do we have any acks we need to send out before closing?
721                // Ack any delivered messages now.
722                if (!session.getTransacted()) { 
723                    deliverAcks();
724                    if (isAutoAcknowledgeBatch()) {
725                        acknowledge();
726                    }
727                }
728                if (executorService != null) {
729                    executorService.shutdown();
730                    try {
731                        executorService.awaitTermination(60, TimeUnit.SECONDS);
732                    } catch (InterruptedException e) {
733                        Thread.currentThread().interrupt();
734                    }
735                }
736                
737                if (session.isClientAcknowledge()) {
738                    if (!this.info.isBrowser()) {
739                        // rollback duplicates that aren't acknowledged
740                        List<MessageDispatch> tmp = null;
741                        synchronized (this.deliveredMessages) {
742                            tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
743                        }
744                        for (MessageDispatch old : tmp) {
745                            this.session.connection.rollbackDuplicate(this, old.getMessage());
746                        }
747                        tmp.clear();
748                    }
749                }
750                if (!session.isTransacted()) {
751                    synchronized(deliveredMessages) {
752                        deliveredMessages.clear();
753                    }
754                }
755                unconsumedMessages.close();
756                this.session.removeConsumer(this);
757                List<MessageDispatch> list = unconsumedMessages.removeAll();
758                if (!this.info.isBrowser()) {
759                    for (MessageDispatch old : list) {
760                        // ensure we don't filter this as a duplicate
761                        session.connection.rollbackDuplicate(this, old.getMessage());
762                    }
763                }
764            }
765        }
766    
767        /**
768         * @throws IllegalStateException
769         */
770        protected void checkClosed() throws IllegalStateException {
771            if (unconsumedMessages.isClosed()) {
772                throw new IllegalStateException("The Consumer is closed");
773            }
774        }
775    
776        /**
777         * If we have a zero prefetch specified then send a pull command to the
778         * broker to pull a message we are about to receive
779         */
780        protected void sendPullCommand(long timeout) throws JMSException {
781            clearDispatchList();
782            if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
783                MessagePull messagePull = new MessagePull();
784                messagePull.configure(info);
785                messagePull.setTimeout(timeout);
786                session.asyncSendPacket(messagePull);
787            }
788        }
789    
790        protected void checkMessageListener() throws JMSException {
791            session.checkMessageListener();
792        }
793    
794        protected void setOptimizeAcknowledge(boolean value) {
795            if (optimizeAcknowledge && !value) {
796                deliverAcks();
797            }
798            optimizeAcknowledge = value;
799        }
800    
801        protected void setPrefetchSize(int prefetch) {
802            deliverAcks();
803            this.info.setCurrentPrefetchSize(prefetch);
804        }
805    
806        private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
807            md.setDeliverySequenceId(session.getNextDeliveryId());
808            lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
809            if (!isAutoAcknowledgeBatch()) {
810                synchronized(deliveredMessages) {
811                    deliveredMessages.addFirst(md);
812                }
813                if (session.getTransacted()) {
814                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
815                }
816            }
817        }
818        
819        private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
820            if (unconsumedMessages.isClosed()) {
821                return;
822            }
823            if (messageExpired) {
824                synchronized (deliveredMessages) {
825                    deliveredMessages.remove(md);
826                }
827                stats.getExpiredMessageCount().increment();
828                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
829            } else {
830                stats.onMessage();
831                if (session.getTransacted()) {
832                    // Do nothing.
833                } else if (isAutoAcknowledgeEach()) {
834                    if (deliveryingAcknowledgements.compareAndSet(false, true)) {
835                        synchronized (deliveredMessages) {
836                            if (!deliveredMessages.isEmpty()) {
837                                if (optimizeAcknowledge) {
838                                    ackCounter++;
839                                    if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
840                                            MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
841                                            if (ack != null) {
842                                                deliveredMessages.clear();
843                                                ackCounter = 0;
844                                                session.sendAck(ack);
845                                                optimizeAckTimestamp = System.currentTimeMillis();
846                                            }
847                                    }
848                                } else {
849                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
850                                    if (ack!=null) {
851                                        deliveredMessages.clear();
852                                        session.sendAck(ack);
853                                    }
854                                }
855                            }
856                        }
857                        deliveryingAcknowledgements.set(false);
858                    }
859                } else if (isAutoAcknowledgeBatch()) {
860                    ackLater(md, MessageAck.STANDARD_ACK_TYPE);
861                } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
862                    boolean messageUnackedByConsumer = false;
863                    synchronized (deliveredMessages) {
864                        messageUnackedByConsumer = deliveredMessages.contains(md);
865                    }
866                    if (messageUnackedByConsumer) {
867                        ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
868                    }
869                } 
870                else {
871                    throw new IllegalStateException("Invalid session state.");
872                }
873            }
874        }
875    
876        /**
877         * Creates a MessageAck for all messages contained in deliveredMessages.
878         * Caller should hold the lock for deliveredMessages.
879         * 
880         * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) 
881         * @return <code>null</code> if nothing to ack.
882         */
883            private MessageAck makeAckForAllDeliveredMessages(byte type) {
884                    synchronized (deliveredMessages) {
885                            if (deliveredMessages.isEmpty())
886                                    return null;
887                                
888                            MessageDispatch md = deliveredMessages.getFirst();
889                        MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
890                        ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
891                        return ack;
892                    }
893            }
894    
895        private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
896    
897            // Don't acknowledge now, but we may need to let the broker know the
898            // consumer got the message to expand the pre-fetch window
899            if (session.getTransacted()) {
900                session.doStartTransaction();
901                if (!synchronizationRegistered) {
902                    synchronizationRegistered = true;
903                    session.getTransactionContext().addSynchronization(new Synchronization() {
904                        public void beforeEnd() throws Exception {
905                            acknowledge();
906                            synchronizationRegistered = false;
907                        }
908    
909                        public void afterCommit() throws Exception {
910                            commit();
911                            synchronizationRegistered = false;
912                        }
913    
914                        public void afterRollback() throws Exception {
915                            rollback();
916                            synchronizationRegistered = false;
917                        }
918                    });
919                }
920            }
921    
922            deliveredCounter++;
923            
924            MessageAck oldPendingAck = pendingAck;
925            pendingAck = new MessageAck(md, ackType, deliveredCounter);
926            pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
927            if( oldPendingAck==null ) {
928                pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
929            } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
930                pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
931            } else {
932                // old pending ack being superseded by ack of another type, if is is not a delivered
933                // ack and hence important, send it now so it is not lost.
934                if ( !oldPendingAck.isDeliveredAck()) {
935                    if (LOG.isDebugEnabled()) {
936                        LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
937                    }
938                    session.sendAck(oldPendingAck);
939                } else {
940                    if (LOG.isDebugEnabled()) {
941                        LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
942                    }
943                }
944            }
945            
946            if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
947                session.sendAck(pendingAck);
948                pendingAck=null;
949                deliveredCounter = 0;
950                additionalWindowSize = 0;
951            }
952        }
953    
954        /**
955         * Acknowledge all the messages that have been delivered to the client up to
956         * this point.
957         * 
958         * @throws JMSException
959         */
960        public void acknowledge() throws JMSException {
961            clearDispatchList();
962            waitForRedeliveries();
963            synchronized(deliveredMessages) {
964                // Acknowledge all messages so far.
965                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
966                if (ack == null)
967                    return; // no msgs
968                
969                if (session.getTransacted()) {
970                    rollbackOnFailedRecoveryRedelivery();
971                    session.doStartTransaction();
972                    ack.setTransactionId(session.getTransactionContext().getTransactionId());
973                }
974                session.sendAck(ack);
975                pendingAck = null;
976                
977                // Adjust the counters
978                deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
979                additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
980                
981                if (!session.getTransacted()) {  
982                    deliveredMessages.clear();
983                } 
984            }
985        }
986        
987        private void waitForRedeliveries() {
988            if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) {
989                long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
990                int numberNotReplayed;
991                do {
992                    numberNotReplayed = 0;
993                    synchronized(deliveredMessages) {
994                        if (previouslyDeliveredMessages != null) { 
995                            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
996                                if (!entry.getValue()) {
997                                    numberNotReplayed++;
998                                }
999                            }
1000                        }
1001                    }
1002                    if (numberNotReplayed > 0) {
1003                        LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: "
1004                                + previouslyDeliveredMessages.transactionId +  ", to consumer :" + this.getConsumerId());
1005                        try {
1006                            Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
1007                        } catch (InterruptedException outOfhere) {
1008                            break;
1009                        }
1010                    }
1011                } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
1012            }
1013        }
1014    
1015        /*
1016         * called with deliveredMessages locked
1017         */
1018        private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
1019            if (previouslyDeliveredMessages != null) {
1020                // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
1021                // as messages have been dispatched else where.
1022                int numberNotReplayed = 0;
1023                for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1024                    if (!entry.getValue()) {
1025                        numberNotReplayed++;
1026                        if (LOG.isDebugEnabled()) {
1027                            LOG.debug("previously delivered message has not been replayed in transaction: "
1028                                    + previouslyDeliveredMessages.transactionId 
1029                                    + " , messageId: " + entry.getKey());
1030                        }
1031                    }
1032                }
1033                if (numberNotReplayed > 0) {
1034                    String message = "rolling back transaction (" 
1035                        + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed
1036                        + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
1037                    LOG.warn(message);
1038                    throw new TransactionRolledBackException(message);   
1039                }
1040            }
1041        }
1042    
1043        void acknowledge(MessageDispatch md) throws JMSException {
1044            MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
1045            session.sendAck(ack);
1046            synchronized(deliveredMessages){
1047                deliveredMessages.remove(md);
1048            }
1049        }
1050    
1051        public void commit() throws JMSException {
1052            synchronized (deliveredMessages) {
1053                deliveredMessages.clear();
1054                clearPreviouslyDelivered();
1055            }
1056            redeliveryDelay = 0;
1057        }
1058    
1059        public void rollback() throws JMSException {
1060            synchronized (unconsumedMessages.getMutex()) {
1061                if (optimizeAcknowledge) {
1062                    // remove messages read but not acked at the broker yet through
1063                    // optimizeAcknowledge
1064                    if (!this.info.isBrowser()) {
1065                        synchronized(deliveredMessages) {
1066                            for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
1067                                // ensure we don't filter this as a duplicate
1068                                MessageDispatch md = deliveredMessages.removeLast();
1069                                session.connection.rollbackDuplicate(this, md.getMessage());
1070                            }
1071                        }
1072                    }
1073                }
1074                synchronized(deliveredMessages) {
1075                    rollbackPreviouslyDeliveredAndNotRedelivered();
1076                    if (deliveredMessages.isEmpty()) {
1077                        return;
1078                    }
1079        
1080                    // Only increase the redelivery delay after the first redelivery..
1081                    MessageDispatch lastMd = deliveredMessages.getFirst();
1082                    final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
1083                    if (currentRedeliveryCount > 0) {
1084                        redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
1085                    }
1086                    MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
1087        
1088                    for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1089                        MessageDispatch md = iter.next();
1090                        md.getMessage().onMessageRolledBack();
1091                        // ensure we don't filter this as a duplicate
1092                        session.connection.rollbackDuplicate(this, md.getMessage());
1093                    }
1094        
1095                    if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
1096                        && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
1097                        // We need to NACK the messages so that they get sent to the
1098                        // DLQ.
1099                        // Acknowledge the last message.
1100                        
1101                        MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
1102                                            ack.setFirstMessageId(firstMsgId);
1103                        session.sendAck(ack,true);
1104                        // Adjust the window size.
1105                        additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1106                        redeliveryDelay = 0;
1107                    } else {
1108                        
1109                        // only redelivery_ack after first delivery
1110                        if (currentRedeliveryCount > 0) {
1111                            MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
1112                            ack.setFirstMessageId(firstMsgId);
1113                            session.sendAck(ack,true);
1114                        }
1115        
1116                        // stop the delivery of messages.
1117                        unconsumedMessages.stop();
1118        
1119                        for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1120                            MessageDispatch md = iter.next();
1121                            unconsumedMessages.enqueueFirst(md);
1122                        }
1123        
1124                        if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
1125                            // Start up the delivery again a little later.
1126                            scheduler.executeAfterDelay(new Runnable() {
1127                                public void run() {
1128                                    try {
1129                                        if (started.get()) {
1130                                            start();
1131                                        }
1132                                    } catch (JMSException e) {
1133                                        session.connection.onAsyncException(e);
1134                                    }
1135                                }
1136                            }, redeliveryDelay);
1137                        } else {
1138                            start();
1139                        }
1140        
1141                    }
1142                    deliveredCounter -= deliveredMessages.size();
1143                    deliveredMessages.clear();
1144                }
1145            }
1146            if (messageListener.get() != null) {
1147                session.redispatch(this, unconsumedMessages);
1148            }
1149        }
1150    
1151        /*
1152         * called with unconsumedMessages && deliveredMessages locked
1153         * remove any message not re-delivered as they can't be replayed to this 
1154         * consumer on rollback
1155         */
1156        private void rollbackPreviouslyDeliveredAndNotRedelivered() {
1157            if (previouslyDeliveredMessages != null) {
1158                for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1159                    if (!entry.getValue()) {              
1160                        removeFromDeliveredMessages(entry.getKey());
1161                    }
1162                }
1163                clearPreviouslyDelivered();
1164            }
1165        }
1166    
1167        /*
1168         * called with deliveredMessages locked
1169         */
1170        private void removeFromDeliveredMessages(MessageId key) {
1171            Iterator<MessageDispatch> iterator = deliveredMessages.iterator();
1172            while (iterator.hasNext()) {
1173                MessageDispatch candidate = iterator.next();
1174                if (key.equals(candidate.getMessage().getMessageId())) {
1175                    session.connection.rollbackDuplicate(this, candidate.getMessage());
1176                    iterator.remove();
1177                    break;
1178                }
1179            }
1180        }
1181        /*
1182         * called with deliveredMessages locked
1183         */
1184        private void clearPreviouslyDelivered() {
1185            if (previouslyDeliveredMessages != null) {
1186                previouslyDeliveredMessages.clear();
1187                previouslyDeliveredMessages = null;
1188            }
1189        }
1190    
1191        public void dispatch(MessageDispatch md) {
1192            MessageListener listener = this.messageListener.get();
1193            try {
1194                clearDispatchList();
1195                synchronized (unconsumedMessages.getMutex()) {
1196                    if (!unconsumedMessages.isClosed()) {
1197                        if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
1198                            if (listener != null && unconsumedMessages.isRunning()) {
1199                                ActiveMQMessage message = createActiveMQMessage(md);
1200                                beforeMessageIsConsumed(md);
1201                                try {
1202                                    boolean expired = message.isExpired();
1203                                    if (!expired) {
1204                                        listener.onMessage(message);
1205                                    }
1206                                    afterMessageIsConsumed(md, expired);
1207                                } catch (RuntimeException e) {
1208                                    if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
1209                                        // Redeliver the message
1210                                    } else {
1211                                        // Transacted or Client ack: Deliver the
1212                                        // next message.
1213                                        afterMessageIsConsumed(md, false);
1214                                    }
1215                                    LOG.error(getConsumerId() + " Exception while processing message: " + e, e);
1216                                }
1217                            } else {
1218                                unconsumedMessages.enqueue(md);
1219                                if (availableListener != null) {
1220                                    availableListener.onMessageAvailable(this);
1221                                }
1222                            }
1223                        } else {
1224                            if (!session.isTransacted()) {
1225                                if (LOG.isDebugEnabled()) {
1226                                    LOG.debug(getConsumerId() + " ignoring (auto acking) duplicate: " + md.getMessage());
1227                                }
1228                                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
1229                                session.sendAck(ack);
1230                            } else {
1231                                if (LOG.isDebugEnabled()) {
1232                                    LOG.debug(getConsumerId() + " tracking transacted redlivery of duplicate: " + md.getMessage());
1233                                }
1234                                boolean needsPoisonAck = false;
1235                                synchronized (deliveredMessages) {
1236                                    if (previouslyDeliveredMessages != null) {
1237                                        previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
1238                                    } else {
1239                                        // delivery while pending redelivery to another consumer on the same connection
1240                                        // not waiting for redelivery will help here
1241                                        needsPoisonAck = true;
1242                                    }
1243                                }
1244                                if (needsPoisonAck) {
1245                                    LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
1246                                            + " consumer on this connection, failoverRedeliveryWaitPeriod=" 
1247                                            + failoverRedeliveryWaitPeriod + ". Message: " + md);
1248                                    MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
1249                                    poisonAck.setFirstMessageId(md.getMessage().getMessageId());
1250                                    session.sendAck(poisonAck);
1251                                } else {
1252                                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
1253                                }
1254                            }
1255                        }
1256                    }
1257                }
1258                if (++dispatchedCount % 1000 == 0) {
1259                    dispatchedCount = 0;
1260                    Thread.yield();
1261                }
1262            } catch (Exception e) {
1263                session.connection.onClientInternalException(e);
1264            }
1265        }
1266    
1267        // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
1268        private void clearDispatchList() {
1269            if (clearDispatchList) {
1270                synchronized (deliveredMessages) {  
1271                    if (clearDispatchList) {
1272                        if (!deliveredMessages.isEmpty()) {
1273                            if (session.isTransacted()) {    
1274                                if (LOG.isDebugEnabled()) {
1275                                    LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1276                                }
1277                                if (previouslyDeliveredMessages == null) {
1278                                    previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
1279                                }
1280                                for (MessageDispatch delivered : deliveredMessages) {
1281                                    previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
1282                                }
1283                            } else {
1284                                if (LOG.isDebugEnabled()) {
1285                                    LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1286                                }
1287                                deliveredMessages.clear();
1288                                pendingAck = null;
1289                            }
1290                        }
1291                        clearDispatchList = false;
1292                    }
1293                }
1294            }
1295        }
1296    
1297        public int getMessageSize() {
1298            return unconsumedMessages.size();
1299        }
1300    
1301        public void start() throws JMSException {
1302            if (unconsumedMessages.isClosed()) {
1303                return;
1304            }
1305            started.set(true);
1306            unconsumedMessages.start();
1307            session.executor.wakeup();
1308        }
1309    
1310        public void stop() {
1311            started.set(false);
1312            unconsumedMessages.stop();
1313        }
1314    
1315        public String toString() {
1316            return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1317                   + " }";
1318        }
1319    
1320        /**
1321         * Delivers a message to the message listener.
1322         * 
1323         * @return
1324         * @throws JMSException
1325         */
1326        public boolean iterate() {
1327            MessageListener listener = this.messageListener.get();
1328            if (listener != null) {
1329                MessageDispatch md = unconsumedMessages.dequeueNoWait();
1330                if (md != null) {
1331                    try {
1332                        ActiveMQMessage message = createActiveMQMessage(md);
1333                        beforeMessageIsConsumed(md);
1334                        listener.onMessage(message);
1335                        afterMessageIsConsumed(md, false);
1336                    } catch (JMSException e) {
1337                        session.connection.onClientInternalException(e);
1338                    }
1339                    return true;
1340                }
1341            }
1342            return false;
1343        }
1344    
1345        public boolean isInUse(ActiveMQTempDestination destination) {
1346            return info.getDestination().equals(destination);
1347        }
1348    
1349        public long getLastDeliveredSequenceId() {
1350            return lastDeliveredSequenceId;
1351        }
1352    
1353            public IOException getFailureError() {
1354                    return failureError;
1355            }
1356    
1357            public void setFailureError(IOException failureError) {
1358                    this.failureError = failureError;
1359            }
1360    }