001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.util.ArrayList; 021 import java.util.Iterator; 022 import java.util.List; 023 import java.util.concurrent.CopyOnWriteArrayList; 024 import java.util.concurrent.CountDownLatch; 025 import java.util.concurrent.TimeUnit; 026 027 import javax.jms.InvalidSelectorException; 028 import javax.jms.JMSException; 029 030 import org.apache.activemq.ActiveMQMessageAudit; 031 import org.apache.activemq.broker.Broker; 032 import org.apache.activemq.broker.ConnectionContext; 033 import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 034 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 035 import org.apache.activemq.command.ConsumerControl; 036 import org.apache.activemq.command.ConsumerInfo; 037 import org.apache.activemq.command.Message; 038 import org.apache.activemq.command.MessageAck; 039 import org.apache.activemq.command.MessageDispatch; 040 import org.apache.activemq.command.MessageDispatchNotification; 041 import org.apache.activemq.command.MessageId; 042 import org.apache.activemq.command.MessagePull; 043 import org.apache.activemq.command.Response; 044 import org.apache.activemq.thread.Scheduler; 045 import org.apache.activemq.transaction.Synchronization; 046 import org.apache.activemq.usage.SystemUsage; 047 import org.apache.commons.logging.Log; 048 import org.apache.commons.logging.LogFactory; 049 050 /** 051 * A subscription that honors the pre-fetch option of the ConsumerInfo. 052 * 053 * @version $Revision: 1.15 $ 054 */ 055 public abstract class PrefetchSubscription extends AbstractSubscription { 056 057 private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class); 058 protected static final Scheduler scheduler = Scheduler.getInstance(); 059 060 protected PendingMessageCursor pending; 061 protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>(); 062 protected int prefetchExtension; 063 protected long enqueueCounter; 064 protected long dispatchCounter; 065 protected long dequeueCounter; 066 private int maxProducersToAudit=32; 067 private int maxAuditDepth=2048; 068 protected final SystemUsage usageManager; 069 private final Object pendingLock = new Object(); 070 private final Object dispatchLock = new Object(); 071 protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(); 072 private boolean slowConsumer; 073 074 private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); 075 076 public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException { 077 super(broker,context, info); 078 this.usageManager=usageManager; 079 pending = cursor; 080 } 081 082 public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { 083 this(broker,usageManager,context, info, new VMPendingMessageCursor()); 084 } 085 086 /** 087 * Allows a message to be pulled on demand by a client 088 */ 089 public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { 090 // The slave should not deliver pull messages. TODO: when the slave 091 // becomes a master, 092 // He should send a NULL message to all the consumers to 'wake them up' 093 // in case 094 // they were waiting for a message. 095 if (getPrefetchSize() == 0 && !isSlave()) { 096 final long dispatchCounterBeforePull; 097 synchronized(this) { 098 prefetchExtension++; 099 dispatchCounterBeforePull = dispatchCounter; 100 } 101 102 // Have the destination push us some messages. 103 for (Destination dest : destinations) { 104 dest.iterate(); 105 } 106 dispatchPending(); 107 108 synchronized(this) { 109 // If there was nothing dispatched.. we may need to setup a timeout. 110 if (dispatchCounterBeforePull == dispatchCounter) { 111 // immediate timeout used by receiveNoWait() 112 if (pull.getTimeout() == -1) { 113 // Send a NULL message. 114 add(QueueMessageReference.NULL_MESSAGE); 115 dispatchPending(); 116 } 117 if (pull.getTimeout() > 0) { 118 scheduler.executeAfterDelay(new Runnable() { 119 120 public void run() { 121 pullTimeout(dispatchCounterBeforePull); 122 } 123 }, pull.getTimeout()); 124 } 125 } 126 } 127 } 128 return null; 129 } 130 131 /** 132 * Occurs when a pull times out. If nothing has been dispatched since the 133 * timeout was setup, then send the NULL message. 134 */ 135 final void pullTimeout(long dispatchCounterBeforePull) { 136 synchronized (pendingLock) { 137 if (dispatchCounterBeforePull == dispatchCounter) { 138 try { 139 add(QueueMessageReference.NULL_MESSAGE); 140 dispatchPending(); 141 } catch (Exception e) { 142 context.getConnection().serviceException(e); 143 } 144 } 145 } 146 } 147 148 public void add(MessageReference node) throws Exception { 149 synchronized (pendingLock) { 150 // The destination may have just been removed... 151 if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) { 152 // perhaps we should inform the caller that we are no longer valid to dispatch to? 153 return; 154 } 155 enqueueCounter++; 156 pending.addMessageLast(node); 157 } 158 dispatchPending(); 159 } 160 161 public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { 162 synchronized(pendingLock) { 163 try { 164 pending.reset(); 165 while (pending.hasNext()) { 166 MessageReference node = pending.next(); 167 node.decrementReferenceCount(); 168 if (node.getMessageId().equals(mdn.getMessageId())) { 169 // Synchronize between dispatched list and removal of messages from pending list 170 // related to remove subscription action 171 synchronized(dispatchLock) { 172 pending.remove(); 173 createMessageDispatch(node, node.getMessage()); 174 dispatched.add(node); 175 onDispatch(node, node.getMessage()); 176 } 177 return; 178 } 179 } 180 } finally { 181 pending.release(); 182 } 183 } 184 throw new JMSException( 185 "Slave broker out of sync with master: Dispatched message (" 186 + mdn.getMessageId() + ") was not in the pending list for " 187 + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName()); 188 } 189 190 public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception { 191 // Handle the standard acknowledgment case. 192 boolean callDispatchMatched = false; 193 Destination destination = null; 194 195 if (!isSlave()) { 196 if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) { 197 // suppress unexpected ack exception in this expected case 198 LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack); 199 return; 200 } 201 } 202 if (LOG.isTraceEnabled()) { 203 LOG.trace("ack:" + ack); 204 } 205 synchronized(dispatchLock) { 206 if (ack.isStandardAck()) { 207 // First check if the ack matches the dispatched. When using failover this might 208 // not be the case. We don't ever want to ack the wrong messages. 209 assertAckMatchesDispatched(ack); 210 211 // Acknowledge all dispatched messages up till the message id of 212 // the acknowledgment. 213 int index = 0; 214 boolean inAckRange = false; 215 List<MessageReference> removeList = new ArrayList<MessageReference>(); 216 for (final MessageReference node : dispatched) { 217 MessageId messageId = node.getMessageId(); 218 if (ack.getFirstMessageId() == null 219 || ack.getFirstMessageId().equals(messageId)) { 220 inAckRange = true; 221 } 222 if (inAckRange) { 223 // Don't remove the nodes until we are committed. 224 if (!context.isInTransaction()) { 225 dequeueCounter++; 226 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 227 removeList.add(node); 228 } else { 229 // setup a Synchronization to remove nodes from the 230 // dispatched list. 231 context.getTransaction().addSynchronization( 232 new Synchronization() { 233 234 public void afterCommit() 235 throws Exception { 236 synchronized(dispatchLock) { 237 dequeueCounter++; 238 dispatched.remove(node); 239 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 240 } 241 } 242 243 public void afterRollback() throws Exception { 244 synchronized(dispatchLock) { 245 if (isSlave()) { 246 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 247 } else { 248 // poisionAck will decrement - otherwise still inflight on client 249 } 250 } 251 } 252 }); 253 } 254 index++; 255 acknowledge(context, ack, node); 256 if (ack.getLastMessageId().equals(messageId)) { 257 // contract prefetch if dispatch required a pull 258 if (getPrefetchSize() == 0) { 259 prefetchExtension = Math.max(0, prefetchExtension - index); 260 } else if (context.isInTransaction()) { 261 // extend prefetch window only if not a pulling consumer 262 prefetchExtension = Math.max(prefetchExtension, index); 263 } 264 destination = node.getRegionDestination(); 265 callDispatchMatched = true; 266 break; 267 } 268 } 269 } 270 for (final MessageReference node : removeList) { 271 dispatched.remove(node); 272 } 273 // this only happens after a reconnect - get an ack which is not 274 // valid 275 if (!callDispatchMatched) { 276 LOG.error("Could not correlate acknowledgment with dispatched message: " 277 + ack); 278 } 279 } else if (ack.isIndividualAck()) { 280 // Message was delivered and acknowledge - but only delete the 281 // individual message 282 for (final MessageReference node : dispatched) { 283 MessageId messageId = node.getMessageId(); 284 if (ack.getLastMessageId().equals(messageId)) { 285 // this should never be within a transaction 286 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 287 destination = node.getRegionDestination(); 288 acknowledge(context, ack, node); 289 dispatched.remove(node); 290 prefetchExtension = Math.max(0, prefetchExtension - 1); 291 callDispatchMatched = true; 292 break; 293 } 294 } 295 }else if (ack.isDeliveredAck()) { 296 // Message was delivered but not acknowledged: update pre-fetch 297 // counters. 298 int index = 0; 299 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) { 300 final MessageReference node = iter.next(); 301 if (node.isExpired()) { 302 if (broker.isExpired(node)) { 303 node.getRegionDestination().messageExpired(context, this, node); 304 } 305 dispatched.remove(node); 306 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 307 } 308 if (ack.getLastMessageId().equals(node.getMessageId())) { 309 prefetchExtension = Math.max(prefetchExtension, index + 1); 310 destination = node.getRegionDestination(); 311 callDispatchMatched = true; 312 break; 313 } 314 } 315 if (!callDispatchMatched) { 316 throw new JMSException( 317 "Could not correlate acknowledgment with dispatched message: " 318 + ack); 319 } 320 } else if (ack.isRedeliveredAck()) { 321 // Message was re-delivered but it was not yet considered to be 322 // a DLQ message. 323 boolean inAckRange = false; 324 for (final MessageReference node : dispatched) { 325 MessageId messageId = node.getMessageId(); 326 if (ack.getFirstMessageId() == null 327 || ack.getFirstMessageId().equals(messageId)) { 328 inAckRange = true; 329 } 330 if (inAckRange) { 331 if (ack.getLastMessageId().equals(messageId)) { 332 destination = node.getRegionDestination(); 333 callDispatchMatched = true; 334 break; 335 } 336 } 337 } 338 if (!callDispatchMatched) { 339 throw new JMSException( 340 "Could not correlate acknowledgment with dispatched message: " 341 + ack); 342 } 343 } else if (ack.isPoisonAck()) { 344 // TODO: what if the message is already in a DLQ??? 345 // Handle the poison ACK case: we need to send the message to a 346 // DLQ 347 if (ack.isInTransaction()) { 348 throw new JMSException("Poison ack cannot be transacted: " 349 + ack); 350 } 351 int index = 0; 352 boolean inAckRange = false; 353 List<MessageReference> removeList = new ArrayList<MessageReference>(); 354 for (final MessageReference node : dispatched) { 355 MessageId messageId = node.getMessageId(); 356 if (ack.getFirstMessageId() == null 357 || ack.getFirstMessageId().equals(messageId)) { 358 inAckRange = true; 359 } 360 if (inAckRange) { 361 sendToDLQ(context, node); 362 node.getRegionDestination().getDestinationStatistics() 363 .getInflight().decrement(); 364 removeList.add(node); 365 dequeueCounter++; 366 index++; 367 acknowledge(context, ack, node); 368 if (ack.getLastMessageId().equals(messageId)) { 369 prefetchExtension = Math.max(0, prefetchExtension 370 - (index + 1)); 371 destination = node.getRegionDestination(); 372 callDispatchMatched = true; 373 break; 374 } 375 } 376 } 377 for (final MessageReference node : removeList) { 378 dispatched.remove(node); 379 } 380 if (!callDispatchMatched) { 381 throw new JMSException( 382 "Could not correlate acknowledgment with dispatched message: " 383 + ack); 384 } 385 } 386 } 387 if (callDispatchMatched && destination != null) { 388 destination.wakeup(); 389 dispatchPending(); 390 } else { 391 if (isSlave()) { 392 throw new JMSException( 393 "Slave broker out of sync with master: Acknowledgment (" 394 + ack + ") was not in the dispatch list: " 395 + dispatched); 396 } else { 397 LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): " 398 + ack); 399 } 400 } 401 } 402 403 /** 404 * Checks an ack versus the contents of the dispatched list. 405 * 406 * @param ack 407 * @param firstAckedMsg 408 * @param lastAckedMsg 409 * @throws JMSException if it does not match 410 */ 411 protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException { 412 MessageId firstAckedMsg = ack.getFirstMessageId(); 413 MessageId lastAckedMsg = ack.getLastMessageId(); 414 int checkCount = 0; 415 boolean checkFoundStart = false; 416 boolean checkFoundEnd = false; 417 for (MessageReference node : dispatched) { 418 419 if (firstAckedMsg == null) { 420 checkFoundStart = true; 421 } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) { 422 checkFoundStart = true; 423 } 424 425 if (checkFoundStart) { 426 checkCount++; 427 } 428 429 if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) { 430 checkFoundEnd = true; 431 break; 432 } 433 } 434 if (!checkFoundStart && firstAckedMsg != null) 435 throw new JMSException("Unmatched acknowledge: " + ack 436 + "; Could not find Message-ID " + firstAckedMsg 437 + " in dispatched-list (start of ack)"); 438 if (!checkFoundEnd && lastAckedMsg != null) 439 throw new JMSException("Unmatched acknowledge: " + ack 440 + "; Could not find Message-ID " + lastAckedMsg 441 + " in dispatched-list (end of ack)"); 442 if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) { 443 throw new JMSException("Unmatched acknowledge: " + ack 444 + "; Expected message count (" + ack.getMessageCount() 445 + ") differs from count in dispatched-list (" + checkCount 446 + ")"); 447 } 448 } 449 450 /** 451 * @param context 452 * @param node 453 * @throws IOException 454 * @throws Exception 455 */ 456 protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception { 457 broker.sendToDeadLetterQueue(context, node); 458 } 459 460 public int getInFlightSize() { 461 return dispatched.size(); 462 } 463 464 /** 465 * Used to determine if the broker can dispatch to the consumer. 466 * 467 * @return 468 */ 469 public boolean isFull() { 470 return dispatched.size() - prefetchExtension >= info.getPrefetchSize(); 471 } 472 473 /** 474 * @return true when 60% or more room is left for dispatching messages 475 */ 476 public boolean isLowWaterMark() { 477 return (dispatched.size() - prefetchExtension) <= (info.getPrefetchSize() * .4); 478 } 479 480 /** 481 * @return true when 10% or less room is left for dispatching messages 482 */ 483 public boolean isHighWaterMark() { 484 return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9); 485 } 486 487 public int countBeforeFull() { 488 return info.getPrefetchSize() + prefetchExtension - dispatched.size(); 489 } 490 491 public int getPendingQueueSize() { 492 return pending.size(); 493 } 494 495 public int getDispatchedQueueSize() { 496 return dispatched.size(); 497 } 498 499 public long getDequeueCounter() { 500 return dequeueCounter; 501 } 502 503 public long getDispatchedCounter() { 504 return dispatchCounter; 505 } 506 507 public long getEnqueueCounter() { 508 return enqueueCounter; 509 } 510 511 public boolean isRecoveryRequired() { 512 return pending.isRecoveryRequired(); 513 } 514 515 public PendingMessageCursor getPending() { 516 return this.pending; 517 } 518 519 public void setPending(PendingMessageCursor pending) { 520 this.pending = pending; 521 if (this.pending!=null) { 522 this.pending.setSystemUsage(usageManager); 523 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 524 } 525 } 526 527 public void add(ConnectionContext context, Destination destination) throws Exception { 528 synchronized(pendingLock) { 529 super.add(context, destination); 530 pending.add(context, destination); 531 } 532 } 533 534 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 535 List<MessageReference> rc = new ArrayList<MessageReference>(); 536 synchronized(pendingLock) { 537 super.remove(context, destination); 538 // Here is a potential problem concerning Inflight stat: 539 // Messages not already committed or rolled back may not be removed from dispatched list at the moment 540 // Except if each commit or rollback callback action comes before remove of subscriber. 541 rc.addAll(pending.remove(context, destination)); 542 543 // Synchronized to DispatchLock 544 synchronized(dispatchLock) { 545 for (MessageReference r : dispatched) { 546 if( r.getRegionDestination() == destination) { 547 rc.add((QueueMessageReference)r); 548 } 549 } 550 destination.getDestinationStatistics().getDispatched().subtract(dispatched.size()); 551 destination.getDestinationStatistics().getInflight().subtract(dispatched.size()); 552 dispatched.clear(); 553 } 554 } 555 return rc; 556 } 557 558 protected void dispatchPending() throws IOException { 559 if (!isSlave()) { 560 synchronized(pendingLock) { 561 try { 562 int numberToDispatch = countBeforeFull(); 563 if (numberToDispatch > 0) { 564 slowConsumer=false; 565 pending.setMaxBatchSize(numberToDispatch); 566 int count = 0; 567 pending.reset(); 568 while (pending.hasNext() && !isFull() 569 && count < numberToDispatch) { 570 MessageReference node = pending.next(); 571 if (node == null) { 572 break; 573 } 574 575 // Synchronize between dispatched list and remove of message from pending list 576 // related to remove subscription action 577 synchronized(dispatchLock) { 578 pending.remove(); 579 node.decrementReferenceCount(); 580 if( !isDropped(node) && canDispatch(node)) { 581 582 // Message may have been sitting in the pending 583 // list a while waiting for the consumer to ak the message. 584 if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) { 585 //increment number to dispatch 586 numberToDispatch++; 587 if (broker.isExpired(node)) { 588 node.getRegionDestination().messageExpired(context, this, node); 589 } 590 continue; 591 } 592 dispatch(node); 593 count++; 594 } 595 } 596 } 597 }else { 598 if (!slowConsumer) { 599 slowConsumer=true; 600 ConnectionContext c = new ConnectionContext(); 601 c.setBroker(context.getBroker()); 602 for (Destination dest :destinations) { 603 dest.slowConsumer(c,this); 604 } 605 606 } 607 } 608 } finally { 609 pending.release(); 610 } 611 } 612 } 613 } 614 615 protected boolean dispatch(final MessageReference node) throws IOException { 616 final Message message = node.getMessage(); 617 if (message == null) { 618 return false; 619 } 620 621 okForAckAsDispatchDone.countDown(); 622 623 // No reentrant lock - Patch needed to IndirectMessageReference on method lock 624 if (!isSlave()) { 625 626 MessageDispatch md = createMessageDispatch(node, message); 627 // NULL messages don't count... they don't get Acked. 628 if (node != QueueMessageReference.NULL_MESSAGE) { 629 dispatchCounter++; 630 dispatched.add(node); 631 } else { 632 prefetchExtension = Math.max(0, prefetchExtension - 1); 633 } 634 if (info.isDispatchAsync()) { 635 md.setTransmitCallback(new Runnable() { 636 637 public void run() { 638 // Since the message gets queued up in async dispatch, 639 // we don't want to 640 // decrease the reference count until it gets put on the 641 // wire. 642 onDispatch(node, message); 643 } 644 }); 645 context.getConnection().dispatchAsync(md); 646 } else { 647 context.getConnection().dispatchSync(md); 648 onDispatch(node, message); 649 } 650 return true; 651 } else { 652 return false; 653 } 654 } 655 656 protected void onDispatch(final MessageReference node, final Message message) { 657 if (node.getRegionDestination() != null) { 658 if (node != QueueMessageReference.NULL_MESSAGE) { 659 node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); 660 node.getRegionDestination().getDestinationStatistics().getInflight().increment(); 661 if (LOG.isTraceEnabled()) { 662 LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() 663 + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size()); 664 } 665 } 666 } 667 668 if (info.isDispatchAsync()) { 669 try { 670 dispatchPending(); 671 } catch (IOException e) { 672 context.getConnection().serviceExceptionAsync(e); 673 } 674 } 675 } 676 677 /** 678 * inform the MessageConsumer on the client to change it's prefetch 679 * 680 * @param newPrefetch 681 */ 682 public void updateConsumerPrefetch(int newPrefetch) { 683 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 684 ConsumerControl cc = new ConsumerControl(); 685 cc.setConsumerId(info.getConsumerId()); 686 cc.setPrefetch(newPrefetch); 687 context.getConnection().dispatchAsync(cc); 688 } 689 } 690 691 /** 692 * @param node 693 * @param message 694 * @return MessageDispatch 695 */ 696 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 697 if (node == QueueMessageReference.NULL_MESSAGE) { 698 MessageDispatch md = new MessageDispatch(); 699 md.setMessage(null); 700 md.setConsumerId(info.getConsumerId()); 701 md.setDestination(null); 702 return md; 703 } else { 704 MessageDispatch md = new MessageDispatch(); 705 md.setConsumerId(info.getConsumerId()); 706 md.setDestination(node.getRegionDestination().getActiveMQDestination()); 707 md.setMessage(message); 708 md.setRedeliveryCounter(node.getRedeliveryCounter()); 709 return md; 710 } 711 } 712 713 /** 714 * Use when a matched message is about to be dispatched to the client. 715 * 716 * @param node 717 * @return false if the message should not be dispatched to the client 718 * (another sub may have already dispatched it for example). 719 * @throws IOException 720 */ 721 protected abstract boolean canDispatch(MessageReference node) throws IOException; 722 723 protected abstract boolean isDropped(MessageReference node); 724 725 /** 726 * Used during acknowledgment to remove the message. 727 * 728 * @throws IOException 729 */ 730 protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException; 731 732 733 public int getMaxProducersToAudit() { 734 return maxProducersToAudit; 735 } 736 737 public void setMaxProducersToAudit(int maxProducersToAudit) { 738 this.maxProducersToAudit = maxProducersToAudit; 739 } 740 741 public int getMaxAuditDepth() { 742 return maxAuditDepth; 743 } 744 745 public void setMaxAuditDepth(int maxAuditDepth) { 746 this.maxAuditDepth = maxAuditDepth; 747 } 748 }