001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.concurrent.CopyOnWriteArrayList;
024    import java.util.concurrent.CountDownLatch;
025    import java.util.concurrent.TimeUnit;
026    
027    import javax.jms.InvalidSelectorException;
028    import javax.jms.JMSException;
029    
030    import org.apache.activemq.ActiveMQMessageAudit;
031    import org.apache.activemq.broker.Broker;
032    import org.apache.activemq.broker.ConnectionContext;
033    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
034    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
035    import org.apache.activemq.command.ConsumerControl;
036    import org.apache.activemq.command.ConsumerInfo;
037    import org.apache.activemq.command.Message;
038    import org.apache.activemq.command.MessageAck;
039    import org.apache.activemq.command.MessageDispatch;
040    import org.apache.activemq.command.MessageDispatchNotification;
041    import org.apache.activemq.command.MessageId;
042    import org.apache.activemq.command.MessagePull;
043    import org.apache.activemq.command.Response;
044    import org.apache.activemq.thread.Scheduler;
045    import org.apache.activemq.transaction.Synchronization;
046    import org.apache.activemq.usage.SystemUsage;
047    import org.apache.commons.logging.Log;
048    import org.apache.commons.logging.LogFactory;
049    
050    /**
051     * A subscription that honors the pre-fetch option of the ConsumerInfo.
052     * 
053     * @version $Revision: 1.15 $
054     */
055    public abstract class PrefetchSubscription extends AbstractSubscription {
056    
057        private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
058        protected static final Scheduler scheduler = Scheduler.getInstance();
059        
060        protected PendingMessageCursor pending;
061        protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
062        protected int prefetchExtension;
063        protected long enqueueCounter;
064        protected long dispatchCounter;
065        protected long dequeueCounter;
066        private int maxProducersToAudit=32;
067        private int maxAuditDepth=2048;
068        protected final SystemUsage usageManager;
069        private final Object pendingLock = new Object();
070        private final Object dispatchLock = new Object();
071        protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
072        private boolean slowConsumer;
073    
074        private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
075        
076        public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
077            super(broker,context, info);
078            this.usageManager=usageManager;
079            pending = cursor;
080        }
081    
082        public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
083            this(broker,usageManager,context, info, new VMPendingMessageCursor());
084        }
085    
086        /**
087         * Allows a message to be pulled on demand by a client
088         */
089        public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
090            // The slave should not deliver pull messages. TODO: when the slave
091            // becomes a master,
092            // He should send a NULL message to all the consumers to 'wake them up'
093            // in case
094            // they were waiting for a message.
095            if (getPrefetchSize() == 0 && !isSlave()) {
096                final long dispatchCounterBeforePull;
097                    synchronized(this) {
098                            prefetchExtension++;
099                            dispatchCounterBeforePull = dispatchCounter;
100                    }
101                
102                    // Have the destination push us some messages.
103                    for (Destination dest : destinations) {
104                                    dest.iterate();
105                            }
106                    dispatchPending();
107                
108                synchronized(this) {
109                        // If there was nothing dispatched.. we may need to setup a timeout.
110                        if (dispatchCounterBeforePull == dispatchCounter) {
111                            // immediate timeout used by receiveNoWait()
112                            if (pull.getTimeout() == -1) {
113                                // Send a NULL message.
114                                add(QueueMessageReference.NULL_MESSAGE);
115                                dispatchPending();
116                            }
117                            if (pull.getTimeout() > 0) {
118                                scheduler.executeAfterDelay(new Runnable() {
119            
120                                    public void run() {
121                                        pullTimeout(dispatchCounterBeforePull);
122                                    }
123                                }, pull.getTimeout());
124                            }
125                        }
126                }
127            }
128            return null;
129        }
130    
131        /**
132         * Occurs when a pull times out. If nothing has been dispatched since the
133         * timeout was setup, then send the NULL message.
134         */
135        final void pullTimeout(long dispatchCounterBeforePull) {
136            synchronized (pendingLock) {
137                    if (dispatchCounterBeforePull == dispatchCounter) {
138                    try {
139                        add(QueueMessageReference.NULL_MESSAGE);
140                        dispatchPending();
141                    } catch (Exception e) {
142                        context.getConnection().serviceException(e);
143                    }
144                }
145            }
146        }
147    
148        public void add(MessageReference node) throws Exception {
149            synchronized (pendingLock) {
150                // The destination may have just been removed...  
151                if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
152                    // perhaps we should inform the caller that we are no longer valid to dispatch to?
153                    return;
154                }
155                enqueueCounter++;
156                pending.addMessageLast(node);    
157            }
158            dispatchPending();
159        }
160    
161        public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
162            synchronized(pendingLock) {
163                try {
164                    pending.reset();
165                    while (pending.hasNext()) {
166                        MessageReference node = pending.next();
167                        node.decrementReferenceCount();
168                        if (node.getMessageId().equals(mdn.getMessageId())) {
169                            // Synchronize between dispatched list and removal of messages from pending list
170                            // related to remove subscription action
171                            synchronized(dispatchLock) {
172                                pending.remove();
173                                createMessageDispatch(node, node.getMessage());
174                                dispatched.add(node);
175                                onDispatch(node, node.getMessage());
176                            }
177                            return;
178                        }
179                    }
180                } finally {
181                    pending.release();
182                }
183            }
184            throw new JMSException(
185                    "Slave broker out of sync with master: Dispatched message ("
186                            + mdn.getMessageId() + ") was not in the pending list for "
187                            + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
188        }
189    
190        public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
191            // Handle the standard acknowledgment case.
192            boolean callDispatchMatched = false;
193            Destination destination = null;
194            
195            if (!isSlave()) {
196                if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
197                    // suppress unexpected ack exception in this expected case
198                    LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
199                    return;
200                }
201            }
202            if (LOG.isTraceEnabled()) {
203                LOG.trace("ack:" + ack);
204            }
205            synchronized(dispatchLock) {
206                if (ack.isStandardAck()) {
207                    // First check if the ack matches the dispatched. When using failover this might
208                    // not be the case. We don't ever want to ack the wrong messages.
209                    assertAckMatchesDispatched(ack);
210                    
211                    // Acknowledge all dispatched messages up till the message id of
212                    // the acknowledgment.
213                    int index = 0;
214                    boolean inAckRange = false;
215                    List<MessageReference> removeList = new ArrayList<MessageReference>();
216                    for (final MessageReference node : dispatched) {
217                        MessageId messageId = node.getMessageId();
218                        if (ack.getFirstMessageId() == null
219                                || ack.getFirstMessageId().equals(messageId)) {
220                            inAckRange = true;
221                        }
222                        if (inAckRange) {
223                            // Don't remove the nodes until we are committed.  
224                            if (!context.isInTransaction()) {
225                                dequeueCounter++;
226                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
227                                removeList.add(node);
228                            } else {
229                                // setup a Synchronization to remove nodes from the
230                                // dispatched list.
231                                context.getTransaction().addSynchronization(
232                                        new Synchronization() {
233    
234                                            public void afterCommit()
235                                                    throws Exception {
236                                                synchronized(dispatchLock) {
237                                                    dequeueCounter++;
238                                                    dispatched.remove(node);
239                                                    node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
240                                                }
241                                            }
242    
243                                            public void afterRollback() throws Exception {
244                                                synchronized(dispatchLock) {
245                                                    if (isSlave()) {
246                                                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
247                                                    } else {
248                                                        // poisionAck will decrement - otherwise still inflight on client
249                                                    }
250                                                }
251                                            }
252                                        });
253                            }
254                            index++;
255                            acknowledge(context, ack, node);
256                            if (ack.getLastMessageId().equals(messageId)) {                  
257                                // contract prefetch if dispatch required a pull
258                                if (getPrefetchSize() == 0) {
259                                    prefetchExtension = Math.max(0, prefetchExtension - index);
260                                } else if (context.isInTransaction()) {
261                                    // extend prefetch window only if not a pulling consumer
262                                    prefetchExtension = Math.max(prefetchExtension, index);
263                                }
264                                destination = node.getRegionDestination();
265                                callDispatchMatched = true;
266                                break;
267                            }
268                        }
269                    }
270                    for (final MessageReference node : removeList) {
271                        dispatched.remove(node);
272                    }
273                    // this only happens after a reconnect - get an ack which is not
274                    // valid
275                    if (!callDispatchMatched) {
276                        LOG.error("Could not correlate acknowledgment with dispatched message: "
277                                      + ack);
278                    }
279                } else if (ack.isIndividualAck()) {
280                    // Message was delivered and acknowledge - but only delete the
281                    // individual message
282                    for (final MessageReference node : dispatched) {
283                        MessageId messageId = node.getMessageId();
284                        if (ack.getLastMessageId().equals(messageId)) {
285                            // this should never be within a transaction
286                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
287                            destination = node.getRegionDestination();
288                            acknowledge(context, ack, node);
289                            dispatched.remove(node);
290                            prefetchExtension = Math.max(0, prefetchExtension - 1);
291                            callDispatchMatched = true;
292                            break;
293                        }
294                    }
295                }else if (ack.isDeliveredAck()) {
296                    // Message was delivered but not acknowledged: update pre-fetch
297                    // counters.
298                    int index = 0;
299                    for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
300                        final MessageReference node = iter.next();
301                        if (node.isExpired()) {
302                            if (broker.isExpired(node)) {
303                                node.getRegionDestination().messageExpired(context, this, node);
304                            }
305                            dispatched.remove(node);
306                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
307                        }
308                        if (ack.getLastMessageId().equals(node.getMessageId())) {
309                            prefetchExtension = Math.max(prefetchExtension, index + 1);
310                            destination = node.getRegionDestination();
311                            callDispatchMatched = true;
312                            break;
313                        }
314                    }
315                    if (!callDispatchMatched) {
316                        throw new JMSException(
317                                "Could not correlate acknowledgment with dispatched message: "
318                                        + ack);
319                    }
320                } else if (ack.isRedeliveredAck()) {
321                    // Message was re-delivered but it was not yet considered to be
322                    // a DLQ message.
323                    boolean inAckRange = false;
324                    for (final MessageReference node : dispatched) {
325                        MessageId messageId = node.getMessageId();
326                        if (ack.getFirstMessageId() == null
327                                || ack.getFirstMessageId().equals(messageId)) {
328                            inAckRange = true;
329                        }
330                        if (inAckRange) {
331                            if (ack.getLastMessageId().equals(messageId)) {
332                                destination = node.getRegionDestination();
333                                callDispatchMatched = true;
334                                break;
335                            }
336                        }
337                    }
338                    if (!callDispatchMatched) {
339                        throw new JMSException(
340                                "Could not correlate acknowledgment with dispatched message: "
341                                        + ack);
342                    }
343                } else if (ack.isPoisonAck()) {
344                    // TODO: what if the message is already in a DLQ???
345                    // Handle the poison ACK case: we need to send the message to a
346                    // DLQ
347                    if (ack.isInTransaction()) {
348                        throw new JMSException("Poison ack cannot be transacted: "
349                                + ack);
350                    }
351                    int index = 0;
352                    boolean inAckRange = false;
353                    List<MessageReference> removeList = new ArrayList<MessageReference>();
354                    for (final MessageReference node : dispatched) {
355                        MessageId messageId = node.getMessageId();
356                        if (ack.getFirstMessageId() == null
357                                || ack.getFirstMessageId().equals(messageId)) {
358                            inAckRange = true;
359                        }
360                        if (inAckRange) {
361                            sendToDLQ(context, node);
362                            node.getRegionDestination().getDestinationStatistics()
363                                    .getInflight().decrement();
364                            removeList.add(node);
365                            dequeueCounter++;
366                            index++;
367                            acknowledge(context, ack, node);
368                            if (ack.getLastMessageId().equals(messageId)) {
369                                prefetchExtension = Math.max(0, prefetchExtension
370                                        - (index + 1));
371                                destination = node.getRegionDestination();
372                                callDispatchMatched = true;
373                                break;
374                            }
375                        }
376                    }
377                    for (final MessageReference node : removeList) {
378                        dispatched.remove(node);
379                    }
380                    if (!callDispatchMatched) {
381                        throw new JMSException(
382                                "Could not correlate acknowledgment with dispatched message: "
383                                        + ack);
384                    }
385                }
386            }
387            if (callDispatchMatched && destination != null) {    
388                destination.wakeup();
389                dispatchPending();
390            } else {
391                if (isSlave()) {
392                    throw new JMSException(
393                            "Slave broker out of sync with master: Acknowledgment ("
394                                    + ack + ") was not in the dispatch list: "
395                                    + dispatched);
396                } else {
397                    LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
398                            + ack);
399                }
400            }
401        }
402    
403        /**
404         * Checks an ack versus the contents of the dispatched list.
405         * 
406         * @param ack
407         * @param firstAckedMsg
408         * @param lastAckedMsg
409         * @throws JMSException if it does not match
410         */
411            protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
412            MessageId firstAckedMsg = ack.getFirstMessageId();
413            MessageId lastAckedMsg = ack.getLastMessageId();
414            int checkCount = 0;
415            boolean checkFoundStart = false;
416            boolean checkFoundEnd = false;
417            for (MessageReference node : dispatched) {
418    
419                if (firstAckedMsg == null) {
420                    checkFoundStart = true;
421                } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
422                    checkFoundStart = true;
423                }
424    
425                if (checkFoundStart) {
426                    checkCount++;
427                }
428    
429                if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
430                    checkFoundEnd = true;
431                    break;
432                }
433            }
434            if (!checkFoundStart && firstAckedMsg != null)
435                throw new JMSException("Unmatched acknowledge: " + ack
436                        + "; Could not find Message-ID " + firstAckedMsg
437                        + " in dispatched-list (start of ack)");
438            if (!checkFoundEnd && lastAckedMsg != null)
439                throw new JMSException("Unmatched acknowledge: " + ack
440                        + "; Could not find Message-ID " + lastAckedMsg
441                        + " in dispatched-list (end of ack)");
442            if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
443                throw new JMSException("Unmatched acknowledge: " + ack
444                        + "; Expected message count (" + ack.getMessageCount()
445                        + ") differs from count in dispatched-list (" + checkCount
446                        + ")");
447            }
448        }
449    
450        /**
451         * @param context
452         * @param node
453         * @throws IOException
454         * @throws Exception
455         */
456        protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
457            broker.sendToDeadLetterQueue(context, node);
458        }
459        
460        public int getInFlightSize() {
461            return dispatched.size();
462        }
463        
464        /**
465         * Used to determine if the broker can dispatch to the consumer.
466         * 
467         * @return
468         */
469        public boolean isFull() {
470            return dispatched.size() - prefetchExtension >= info.getPrefetchSize();
471        }
472    
473        /**
474         * @return true when 60% or more room is left for dispatching messages
475         */
476        public boolean isLowWaterMark() {
477            return (dispatched.size() - prefetchExtension) <= (info.getPrefetchSize() * .4);
478        }
479    
480        /**
481         * @return true when 10% or less room is left for dispatching messages
482         */
483        public boolean isHighWaterMark() {
484            return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9);
485        }
486    
487        public int countBeforeFull() {
488            return info.getPrefetchSize() + prefetchExtension - dispatched.size();
489        }
490    
491        public int getPendingQueueSize() {
492            return pending.size();
493        }
494    
495        public int getDispatchedQueueSize() {
496            return dispatched.size();
497        }
498    
499        public long getDequeueCounter() {
500            return dequeueCounter;
501        }
502    
503        public long getDispatchedCounter() {
504            return dispatchCounter;
505        }
506    
507        public long getEnqueueCounter() {
508            return enqueueCounter;
509        }
510    
511        public boolean isRecoveryRequired() {
512            return pending.isRecoveryRequired();
513        }
514    
515        public PendingMessageCursor getPending() {
516            return this.pending;
517        }
518    
519        public void setPending(PendingMessageCursor pending) {
520            this.pending = pending;
521            if (this.pending!=null) {
522                this.pending.setSystemUsage(usageManager);
523                this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
524            }
525        }
526    
527       public void add(ConnectionContext context, Destination destination) throws Exception {
528            synchronized(pendingLock) {
529                super.add(context, destination);
530                pending.add(context, destination);
531            }
532        }
533    
534        public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
535            List<MessageReference> rc = new ArrayList<MessageReference>();
536            synchronized(pendingLock) {
537                super.remove(context, destination);
538                // Here is a potential problem concerning Inflight stat:
539                // Messages not already committed or rolled back may not be removed from dispatched list at the moment
540                // Except if each commit or rollback callback action comes before remove of subscriber.
541                rc.addAll(pending.remove(context, destination));
542    
543                // Synchronized to DispatchLock
544                synchronized(dispatchLock) {
545                        for (MessageReference r : dispatched) {
546                            if( r.getRegionDestination() == destination) {
547                                    rc.add((QueueMessageReference)r);
548                            }
549                        }
550                    destination.getDestinationStatistics().getDispatched().subtract(dispatched.size());
551                    destination.getDestinationStatistics().getInflight().subtract(dispatched.size());
552                    dispatched.clear();
553                }            
554            }
555            return rc;
556        }
557    
558        protected void dispatchPending() throws IOException {
559            if (!isSlave()) {
560               synchronized(pendingLock) {
561                    try {
562                        int numberToDispatch = countBeforeFull();
563                        if (numberToDispatch > 0) {
564                            slowConsumer=false;
565                            pending.setMaxBatchSize(numberToDispatch);
566                            int count = 0;
567                            pending.reset();
568                            while (pending.hasNext() && !isFull()
569                                    && count < numberToDispatch) {
570                                MessageReference node = pending.next();
571                                if (node == null) {
572                                    break;
573                                }
574                                
575                                // Synchronize between dispatched list and remove of message from pending list
576                                // related to remove subscription action
577                                synchronized(dispatchLock) {
578                                    pending.remove();
579                                    node.decrementReferenceCount();
580                                    if( !isDropped(node) && canDispatch(node)) {
581    
582                                        // Message may have been sitting in the pending
583                                        // list a while waiting for the consumer to ak the message.
584                                        if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
585                                            //increment number to dispatch
586                                            numberToDispatch++;
587                                            if (broker.isExpired(node)) {
588                                                node.getRegionDestination().messageExpired(context, this, node);
589                                            }
590                                            continue;
591                                        }
592                                        dispatch(node);
593                                        count++;
594                                    }
595                                }
596                            }
597                        }else {
598                            if (!slowConsumer) {
599                                slowConsumer=true;
600                                ConnectionContext c = new ConnectionContext();
601                                c.setBroker(context.getBroker());
602                                for (Destination dest :destinations) {
603                                    dest.slowConsumer(c,this);
604                                }
605                                
606                            }
607                        }
608                    } finally {
609                        pending.release();
610                    }
611                }
612            }
613        }
614    
615        protected boolean dispatch(final MessageReference node) throws IOException {
616            final Message message = node.getMessage();
617            if (message == null) {
618                return false;
619            }
620            
621            okForAckAsDispatchDone.countDown();
622            
623            // No reentrant lock - Patch needed to IndirectMessageReference on method lock
624            if (!isSlave()) {
625    
626                MessageDispatch md = createMessageDispatch(node, message);
627                // NULL messages don't count... they don't get Acked.
628                if (node != QueueMessageReference.NULL_MESSAGE) {
629                    dispatchCounter++;
630                    dispatched.add(node);
631                } else {
632                    prefetchExtension = Math.max(0, prefetchExtension - 1);
633                }
634                if (info.isDispatchAsync()) {
635                    md.setTransmitCallback(new Runnable() {
636    
637                        public void run() {
638                            // Since the message gets queued up in async dispatch,
639                            // we don't want to
640                            // decrease the reference count until it gets put on the
641                            // wire.
642                            onDispatch(node, message);
643                        }
644                    });
645                    context.getConnection().dispatchAsync(md);
646                } else {
647                    context.getConnection().dispatchSync(md);
648                    onDispatch(node, message);
649                }
650                return true;
651            } else {
652                return false;
653            }
654        }
655    
656        protected void onDispatch(final MessageReference node, final Message message) {
657            if (node.getRegionDestination() != null) {
658                if (node != QueueMessageReference.NULL_MESSAGE) {
659                    node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
660                    node.getRegionDestination().getDestinationStatistics().getInflight().increment();   
661                    if (LOG.isTraceEnabled()) {
662                        LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() 
663                                + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
664                    }
665                }
666            }
667            
668            if (info.isDispatchAsync()) {
669                try {
670                    dispatchPending();
671                } catch (IOException e) {
672                    context.getConnection().serviceExceptionAsync(e);
673                }
674            }
675        }
676    
677        /**
678         * inform the MessageConsumer on the client to change it's prefetch
679         * 
680         * @param newPrefetch
681         */
682        public void updateConsumerPrefetch(int newPrefetch) {
683            if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
684                ConsumerControl cc = new ConsumerControl();
685                cc.setConsumerId(info.getConsumerId());
686                cc.setPrefetch(newPrefetch);
687                context.getConnection().dispatchAsync(cc);
688            }
689        }
690    
691        /**
692         * @param node
693         * @param message
694         * @return MessageDispatch
695         */
696        protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
697            if (node == QueueMessageReference.NULL_MESSAGE) {
698                MessageDispatch md = new MessageDispatch();
699                md.setMessage(null);
700                md.setConsumerId(info.getConsumerId());
701                md.setDestination(null);
702                return md;
703            } else {
704                MessageDispatch md = new MessageDispatch();
705                md.setConsumerId(info.getConsumerId());
706                md.setDestination(node.getRegionDestination().getActiveMQDestination());
707                md.setMessage(message);
708                md.setRedeliveryCounter(node.getRedeliveryCounter());
709                return md;
710            }
711        }
712    
713        /**
714         * Use when a matched message is about to be dispatched to the client.
715         * 
716         * @param node
717         * @return false if the message should not be dispatched to the client
718         *         (another sub may have already dispatched it for example).
719         * @throws IOException
720         */
721        protected abstract boolean canDispatch(MessageReference node) throws IOException;
722        
723        protected abstract boolean isDropped(MessageReference node);
724    
725        /**
726         * Used during acknowledgment to remove the message.
727         * 
728         * @throws IOException
729         */
730        protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
731    
732        
733        public int getMaxProducersToAudit() {
734            return maxProducersToAudit;
735        }
736    
737        public void setMaxProducersToAudit(int maxProducersToAudit) {
738            this.maxProducersToAudit = maxProducersToAudit;
739        }
740    
741        public int getMaxAuditDepth() {
742            return maxAuditDepth;
743        }
744    
745        public void setMaxAuditDepth(int maxAuditDepth) {
746            this.maxAuditDepth = maxAuditDepth;
747        }
748    }