001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * Copyright 2004 Protique Ltd 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 package org.activemq.store.journal; 020 021 import java.io.DataInputStream; 022 import java.io.DataOutputStream; 023 import java.io.File; 024 import java.io.IOException; 025 import java.util.ArrayList; 026 import java.util.Iterator; 027 import java.util.Map; 028 import java.sql.SQLException; 029 030 import javax.jms.JMSException; 031 import javax.transaction.xa.XAException; 032 033 import org.activeio.adapter.PacketByteArrayOutputStream; 034 import org.activeio.adapter.PacketInputStream; 035 import org.activeio.journal.InvalidRecordLocationException; 036 import org.activeio.journal.Journal; 037 import org.activeio.journal.JournalEventListener; 038 import org.activeio.journal.RecordLocation; 039 import org.activeio.journal.active.JournalImpl; 040 import org.activeio.journal.howl.HowlJournal; 041 import org.activemq.io.WireFormat; 042 import org.activemq.io.impl.StatelessDefaultWireFormat; 043 import org.activemq.message.ActiveMQMessage; 044 import org.activemq.message.ActiveMQXid; 045 import org.activemq.message.MessageAck; 046 import org.activemq.message.Packet; 047 import org.activemq.service.MessageIdentity; 048 import org.activemq.store.MessageStore; 049 import org.activemq.store.PersistenceAdapter; 050 import org.activemq.store.TopicMessageStore; 051 import org.activemq.store.TransactionStore; 052 import org.activemq.store.jdbc.JDBCPersistenceAdapter; 053 import org.activemq.store.journal.JournalTransactionStore.Tx; 054 import org.activemq.store.journal.JournalTransactionStore.TxOperation; 055 import org.activemq.util.JMSExceptionHelper; 056 import org.apache.commons.logging.Log; 057 import org.apache.commons.logging.LogFactory; 058 import org.objectweb.howl.log.Configuration; 059 060 import EDU.oswego.cs.dl.util.concurrent.Channel; 061 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon; 062 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 063 import EDU.oswego.cs.dl.util.concurrent.Latch; 064 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 065 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor; 066 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; 067 068 /** 069 * An implementation of {@link PersistenceAdapter} designed for 070 * use with a {@link Journal} and then checkpointing asynchronously 071 * on a timeout with some other long term persistent storage. 072 * 073 * @version $Revision: 1.1 $ 074 */ 075 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener { 076 077 private static final Log log = LogFactory.getLog(JournalPersistenceAdapter.class); 078 public static final String DEFAULT_JOURNAL_TYPE = "default"; 079 public static final String HOWL_JOURNAL_TYPE = "howl"; 080 081 private Journal journal; 082 private String journalType = DEFAULT_JOURNAL_TYPE; 083 private PersistenceAdapter longTermPersistence; 084 private File directory = new File("logs"); 085 private final StatelessDefaultWireFormat wireFormat = new StatelessDefaultWireFormat(); 086 private final ConcurrentHashMap messageStores = new ConcurrentHashMap(); 087 private final ConcurrentHashMap topicMessageStores = new ConcurrentHashMap(); 088 089 private static final int PACKET_RECORD_TYPE = 0; 090 private static final int COMMAND_RECORD_TYPE = 1; 091 private static final int TX_COMMAND_RECORD_TYPE = 2; 092 private static final int ACK_RECORD_TYPE = 3; 093 094 private Channel checkpointRequests = new LinkedQueue(); 095 private QueuedExecutor checkpointExecutor; 096 ClockDaemon clockDaemon; 097 private Object clockTicket; 098 private JournalTransactionStore transactionStore = new JournalTransactionStore(this); 099 private int logFileSize=1024*1024*20; 100 private int logFileCount=2; 101 private long checkpointInterval = 1000 * 60 * 5; 102 103 public JournalPersistenceAdapter() { 104 checkpointExecutor = new QueuedExecutor(new LinkedQueue()); 105 checkpointExecutor.setThreadFactory(new ThreadFactory() { 106 public Thread newThread(Runnable runnable) { 107 Thread answer = new Thread(runnable, "Checkpoint Worker"); 108 answer.setDaemon(true); 109 answer.setPriority(Thread.MAX_PRIORITY); 110 return answer; 111 } 112 }); 113 } 114 115 public JournalPersistenceAdapter(File directory, PersistenceAdapter longTermPersistence) throws IOException { 116 this(); 117 this.directory = directory; 118 this.longTermPersistence = longTermPersistence; 119 } 120 121 public Map getInitialDestinations() { 122 return longTermPersistence.getInitialDestinations(); 123 } 124 125 private MessageStore createMessageStore(String destination, boolean isQueue) throws JMSException { 126 if(isQueue) { 127 return createQueueMessageStore(destination); 128 } else { 129 return createTopicMessageStore(destination); 130 } 131 } 132 133 public MessageStore createQueueMessageStore(String destinationName) throws JMSException { 134 JournalMessageStore store = (JournalMessageStore) messageStores.get(destinationName); 135 if( store == null ) { 136 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destinationName); 137 store = new JournalMessageStore(this, checkpointStore, destinationName); 138 messageStores.put(destinationName, store); 139 } 140 return store; 141 } 142 143 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException { 144 JournalTopicMessageStore store = (JournalTopicMessageStore) topicMessageStores.get(destinationName); 145 if( store == null ) { 146 TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName); 147 store = new JournalTopicMessageStore(this, checkpointStore, destinationName); 148 topicMessageStores.put(destinationName, store); 149 } 150 return store; 151 } 152 153 public TransactionStore createTransactionStore() throws JMSException { 154 return transactionStore; 155 } 156 157 public void beginTransaction() throws JMSException { 158 longTermPersistence.beginTransaction(); 159 } 160 161 public void commitTransaction() throws JMSException { 162 longTermPersistence.commitTransaction(); 163 } 164 165 public void rollbackTransaction() { 166 longTermPersistence.rollbackTransaction(); 167 } 168 169 public synchronized void start() throws JMSException { 170 171 if( longTermPersistence instanceof JDBCPersistenceAdapter ) { 172 // Disabled periodic clean up as it deadlocks with the checkpoint operations. 173 ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0); 174 } 175 176 longTermPersistence.start(); 177 createTransactionStore(); 178 if (journal == null) { 179 try { 180 log.info("Opening journal."); 181 journal = createJournal(); 182 log.info("Opened journal: " + journal); 183 journal.setJournalEventListener(this); 184 } 185 catch (Exception e) { 186 throw JMSExceptionHelper.newJMSException("Failed to open transaction journal: " + e, e); 187 } 188 try { 189 recover(); 190 } 191 catch (Exception e) { 192 throw JMSExceptionHelper.newJMSException("Failed to recover transactions from journal: " + e, e); 193 } 194 } 195 196 // Do a checkpoint periodically. 197 clockTicket = getClockDaemon().executePeriodically(checkpointInterval, new Runnable() { 198 public void run() { 199 checkpoint(false); 200 } 201 }, false); 202 203 } 204 205 public synchronized void stop() throws JMSException { 206 207 if (clockTicket != null) { 208 // Stop the periodical checkpoint. 209 ClockDaemon.cancel(clockTicket); 210 clockTicket=null; 211 clockDaemon.shutDown(); 212 } 213 214 // Take one final checkpoint and stop checkpoint processing. 215 checkpoint(true); 216 checkpointExecutor.shutdownAfterProcessingCurrentlyQueuedTasks(); 217 218 JMSException firstException = null; 219 if (journal != null) { 220 try { 221 journal.close(); 222 journal = null; 223 } 224 catch (Exception e) { 225 firstException = JMSExceptionHelper.newJMSException("Failed to close journals: " + e, e); 226 } 227 } 228 longTermPersistence.stop(); 229 230 if (firstException != null) { 231 throw firstException; 232 } 233 } 234 235 // Properties 236 //------------------------------------------------------------------------- 237 public PersistenceAdapter getLongTermPersistence() { 238 return longTermPersistence; 239 } 240 241 public void setLongTermPersistence(PersistenceAdapter longTermPersistence) { 242 this.longTermPersistence = longTermPersistence; 243 } 244 245 /** 246 * @return Returns the directory. 247 */ 248 public File getDirectory() { 249 return directory; 250 } 251 252 /** 253 * @param directory The directory to set. 254 */ 255 public void setDirectory(File directory) { 256 this.directory = directory; 257 } 258 259 /** 260 * @return Returns the wireFormat. 261 */ 262 public WireFormat getWireFormat() { 263 return wireFormat; 264 } 265 266 public String getJournalType() { 267 return journalType; 268 } 269 270 public void setJournalType(String journalType) { 271 this.journalType = journalType; 272 } 273 274 protected Journal createJournal() throws IOException { 275 if( DEFAULT_JOURNAL_TYPE.equals(journalType) ) { 276 return new JournalImpl(directory,logFileCount,logFileSize); 277 } 278 279 if( HOWL_JOURNAL_TYPE.equals(journalType) ) { 280 try { 281 Configuration config = new Configuration(); 282 config.setLogFileDir(directory.getCanonicalPath()); 283 return new HowlJournal(config); 284 } catch (IOException e) { 285 throw e; 286 } catch (Exception e) { 287 throw (IOException)new IOException("Could not open HOWL journal: "+e.getMessage()).initCause(e); 288 } 289 } 290 291 throw new IllegalStateException("Unsupported valued for journalType attribute: "+journalType); 292 } 293 294 // Implementation methods 295 //------------------------------------------------------------------------- 296 297 /** 298 * The Journal give us a call back so that we can move old data out of the journal. 299 * Taking a checkpoint does this for us. 300 * 301 * @see org.activemq.journal.JournalEventListener#overflowNotification(org.activemq.journal.RecordLocation) 302 */ 303 public void overflowNotification(RecordLocation safeLocation) { 304 checkpoint(false); 305 } 306 307 /** 308 * When we checkpoint we move all the journaled data to long term storage. 309 * @param b 310 */ 311 public void checkpoint(boolean sync) { 312 try { 313 314 if( journal == null ) 315 throw new IllegalStateException("Journal is closed."); 316 317 // Do the checkpoint asynchronously? 318 Latch latch=null; 319 if( sync ) { 320 latch = new Latch(); 321 checkpointRequests.put(latch); 322 } else { 323 checkpointRequests.put(Boolean.TRUE); 324 } 325 326 checkpointExecutor.execute(new Runnable() { 327 public void run() { 328 329 ArrayList listners = new ArrayList(); 330 331 try { 332 // Avoid running a checkpoint too many times in a row. 333 // Consume any queued up checkpoint requests. 334 try { 335 boolean requested = false; 336 Object t; 337 while ((t=checkpointRequests.poll(0)) != null) { 338 if( t.getClass()==Latch.class ) 339 listners.add(t); 340 requested = true; 341 } 342 if (!requested) { 343 return; 344 } 345 } 346 catch (InterruptedException e1) { 347 return; 348 } 349 350 log.debug("Checkpoint started."); 351 RecordLocation newMark = null; 352 353 Iterator iterator = messageStores.values().iterator(); 354 while (iterator.hasNext()) { 355 try { 356 JournalMessageStore ms = (JournalMessageStore) iterator.next(); 357 RecordLocation mark = ms.checkpoint(); 358 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { 359 newMark = mark; 360 } 361 } 362 catch (Exception e) { 363 log.error("Failed to checkpoint a message store: " + e, e); 364 } 365 } 366 367 iterator = topicMessageStores.values().iterator(); 368 while (iterator.hasNext()) { 369 try { 370 JournalTopicMessageStore ms = (JournalTopicMessageStore) iterator.next(); 371 RecordLocation mark = ms.checkpoint(); 372 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { 373 newMark = mark; 374 } 375 } 376 catch (Exception e) { 377 log.error("Failed to checkpoint a message store: " + e, e); 378 } 379 } 380 381 try { 382 if (newMark != null) { 383 if( log.isDebugEnabled() ) 384 log.debug("Marking journal: "+newMark); 385 journal.setMark(newMark, true); 386 } 387 } 388 catch (Exception e) { 389 log.error("Failed to mark the Journal: " + e, e); 390 } 391 392 // Clean up the DB if it's a JDBC store. 393 if( longTermPersistence instanceof JDBCPersistenceAdapter ) { 394 // Disabled periodic clean up as it deadlocks with the checkpoint operations. 395 try { 396 ((JDBCPersistenceAdapter)longTermPersistence).cleanup(); 397 } catch (SQLException sqle) { 398 log.error("Cleanup failed due to: " + sqle, sqle); 399 } 400 } 401 402 log.debug("Checkpoint done."); 403 } finally { 404 for (Iterator iter = listners.iterator(); iter.hasNext();) { 405 Latch latch = (Latch) iter.next(); 406 latch.release(); 407 } 408 } 409 } 410 }); 411 412 if( sync ) { 413 latch.acquire(); 414 } 415 } 416 catch (InterruptedException e) { 417 log.warn("Request to start checkpoint failed: " + e, e); 418 } 419 } 420 421 /** 422 * @param destinationName 423 * @param message 424 * @param sync 425 * @throws JMSException 426 */ 427 public RecordLocation writePacket(String destination, Packet packet, boolean sync) throws JMSException { 428 try { 429 430 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream(); 431 DataOutputStream os = new DataOutputStream(pos); 432 os.writeByte(PACKET_RECORD_TYPE); 433 os.writeUTF(destination); 434 os.close(); 435 org.activeio.Packet p = wireFormat.writePacket(packet, pos); 436 return journal.write(p, sync); 437 } 438 catch (IOException e) { 439 throw createWriteException(packet, e); 440 } 441 } 442 443 /** 444 * @param destinationName 445 * @param message 446 * @param sync 447 * @throws JMSException 448 */ 449 public RecordLocation writeCommand(String command, boolean sync) throws JMSException { 450 try { 451 452 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream(); 453 DataOutputStream os = new DataOutputStream(pos); 454 os.writeByte(COMMAND_RECORD_TYPE); 455 os.writeUTF(command); 456 os.close(); 457 return journal.write(pos.getPacket(), sync); 458 459 } 460 catch (IOException e) { 461 throw createWriteException(command, e); 462 } 463 } 464 465 /** 466 * @param location 467 * @return 468 * @throws JMSException 469 */ 470 public Packet readPacket(RecordLocation location) throws JMSException { 471 try { 472 org.activeio.Packet data = journal.read(location); 473 DataInputStream is = new DataInputStream(new PacketInputStream(data)); 474 byte type = is.readByte(); 475 if (type != PACKET_RECORD_TYPE) { 476 throw new IOException("Record is not a packet type."); 477 } 478 String destination = is.readUTF(); 479 Packet packet = wireFormat.readPacket(data); 480 is.close(); 481 return packet; 482 483 } 484 catch (InvalidRecordLocationException e) { 485 throw createReadException(location, e); 486 } 487 catch (IOException e) { 488 throw createReadException(location, e); 489 } 490 } 491 492 493 /** 494 * Move all the messages that were in the journal into long term storeage. We just replay and do a checkpoint. 495 * 496 * @throws JMSException 497 * @throws IOException 498 * @throws InvalidRecordLocationException 499 * @throws IllegalStateException 500 */ 501 private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, JMSException { 502 503 RecordLocation pos = null; 504 int transactionCounter = 0; 505 506 log.info("Journal Recovery Started."); 507 508 // While we have records in the journal. 509 while ((pos = journal.getNextRecordLocation(pos)) != null) { 510 org.activeio.Packet data = journal.read(pos); 511 DataInputStream is = new DataInputStream(new PacketInputStream(data)); 512 513 // Read the destination and packate from the record. 514 String destination = null; 515 Packet packet = null; 516 try { 517 byte type = is.readByte(); 518 switch (type) { 519 case PACKET_RECORD_TYPE: 520 521 // Is the current packet part of the destination? 522 destination = is.readUTF(); 523 packet = wireFormat.readPacket(data); 524 525 // Try to replay the packet. 526 if (packet instanceof ActiveMQMessage) { 527 ActiveMQMessage msg = (ActiveMQMessage) packet; 528 529 JournalMessageStore store = (JournalMessageStore) createMessageStore(destination, msg.getJMSActiveMQDestination().isQueue()); 530 if( msg.getTransactionId()!=null ) { 531 transactionStore.addMessage(store, msg, pos); 532 } else { 533 store.replayAddMessage(msg); 534 transactionCounter++; 535 } 536 } 537 else if (packet instanceof MessageAck) { 538 MessageAck ack = (MessageAck) packet; 539 JournalMessageStore store = (JournalMessageStore) createMessageStore(destination, ack.getDestination().isQueue()); 540 if( ack.getTransactionId()!=null ) { 541 transactionStore.removeMessage(store, ack, pos); 542 } else { 543 store.replayRemoveMessage(ack); 544 transactionCounter++; 545 } 546 } 547 else { 548 log.error("Unknown type of packet in transaction log which will be discarded: " + packet); 549 } 550 551 break; 552 case TX_COMMAND_RECORD_TYPE: 553 554 TxCommand command = new TxCommand(); 555 command.setType(is.readByte()); 556 command.setWasPrepared(is.readBoolean()); 557 switch(command.getType()) { 558 case TxCommand.LOCAL_COMMIT: 559 case TxCommand.LOCAL_ROLLBACK: 560 command.setTransactionId(is.readUTF()); 561 break; 562 default: 563 command.setTransactionId(ActiveMQXid.read(is)); 564 break; 565 } 566 567 // Try to replay the packet. 568 switch(command.getType()) { 569 case TxCommand.XA_PREPARE: 570 transactionStore.replayPrepare(command.getTransactionId()); 571 break; 572 case TxCommand.XA_COMMIT: 573 case TxCommand.LOCAL_COMMIT: 574 Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); 575 // Replay the committed operations. 576 if( tx!=null) { 577 tx.getOperations(); 578 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { 579 TxOperation op = (TxOperation) iter.next(); 580 if( op.operationType == TxOperation.ADD_OPERATION_TYPE ) { 581 op.store.replayAddMessage((ActiveMQMessage) op.data); 582 } 583 if( op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { 584 op.store.replayRemoveMessage((MessageAck) op.data); 585 } 586 if( op.operationType == TxOperation.ACK_OPERATION_TYPE) { 587 JournalAck ack = (JournalAck) op.data; 588 ((JournalTopicMessageStore)op.store).replayAcknowledge(ack.getSubscription(), new MessageIdentity(ack.getMessageId())); 589 } 590 } 591 transactionCounter++; 592 } 593 break; 594 case TxCommand.LOCAL_ROLLBACK: 595 case TxCommand.XA_ROLLBACK: 596 transactionStore.replayRollback(command.getTransactionId()); 597 break; 598 } 599 600 break; 601 602 case ACK_RECORD_TYPE: 603 604 destination = is.readUTF(); 605 String subscription = is.readUTF(); 606 String messageId = is.readUTF(); 607 Object transactionId=null; 608 609 JournalTopicMessageStore store = (JournalTopicMessageStore) createMessageStore(destination, false); 610 if( transactionId!=null ) { 611 JournalAck ack = new JournalAck(destination, subscription, messageId, transactionId); 612 transactionStore.acknowledge(store, ack, pos); 613 } else { 614 store.replayAcknowledge(subscription, new MessageIdentity(messageId)); 615 transactionCounter++; 616 } 617 618 case COMMAND_RECORD_TYPE: 619 620 break; 621 default: 622 log.error("Unknown type of record in transaction log which will be discarded: " + type); 623 break; 624 } 625 } 626 finally { 627 is.close(); 628 } 629 } 630 631 RecordLocation location = writeCommand("RECOVERED", true); 632 journal.setMark(location, true); 633 634 log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered."); 635 } 636 637 private JMSException createReadException(RecordLocation location, Exception e) { 638 return JMSExceptionHelper.newJMSException("Failed to read to journal for: " + location + ". Reason: " + e, e); 639 } 640 641 protected JMSException createWriteException(Packet packet, Exception e) { 642 return JMSExceptionHelper.newJMSException("Failed to write to journal for: " + packet + ". Reason: " + e, e); 643 } 644 645 private XAException createWriteException(TxCommand command, Exception e) { 646 return (XAException)new XAException("Failed to write to journal for: " + command + ". Reason: " + e).initCause(e); 647 } 648 649 650 protected JMSException createWriteException(String command, Exception e) { 651 return JMSExceptionHelper.newJMSException("Failed to write to journal for command: " + command + ". Reason: " + e, e); 652 } 653 654 protected JMSException createRecoveryFailedException(Exception e) { 655 return JMSExceptionHelper.newJMSException("Failed to recover from journal. Reason: " + e, e); 656 } 657 658 public ClockDaemon getClockDaemon() { 659 if (clockDaemon == null) { 660 clockDaemon = new ClockDaemon(); 661 clockDaemon.setThreadFactory(new ThreadFactory() { 662 public Thread newThread(Runnable runnable) { 663 Thread thread = new Thread(runnable, "Checkpoint Timer"); 664 thread.setDaemon(true); 665 return thread; 666 } 667 }); 668 } 669 return clockDaemon; 670 } 671 672 public void setClockDaemon(ClockDaemon clockDaemon) { 673 this.clockDaemon = clockDaemon; 674 } 675 676 /** 677 * @param xid 678 * @return 679 */ 680 public RecordLocation writeTxCommand(TxCommand command, boolean sync) throws XAException { 681 try { 682 683 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream(); 684 DataOutputStream os = new DataOutputStream(pos); 685 os.writeByte(TX_COMMAND_RECORD_TYPE); 686 os.writeByte(command.getType()); 687 os.writeBoolean(command.getWasPrepared()); 688 switch(command.getType()) { 689 case TxCommand.LOCAL_COMMIT: 690 case TxCommand.LOCAL_ROLLBACK: 691 os.writeUTF( (String) command.getTransactionId() ); 692 break; 693 default: 694 ActiveMQXid xid = (ActiveMQXid) command.getTransactionId(); 695 xid.write(os); 696 break; 697 } 698 os.close(); 699 return journal.write(pos.getPacket(), sync); 700 } 701 catch (IOException e) { 702 throw createWriteException(command, e); 703 } 704 } 705 706 /** 707 * @param destinationName 708 * @param persistentKey 709 * @param messageIdentity 710 * @param b 711 * @return 712 */ 713 public RecordLocation writePacket(String destinationName, String subscription, MessageIdentity messageIdentity, boolean sync) throws JMSException{ 714 try { 715 716 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream(); 717 DataOutputStream os = new DataOutputStream(pos); 718 os.writeByte(ACK_RECORD_TYPE); 719 os.writeUTF(destinationName); 720 os.writeUTF(subscription); 721 os.writeUTF(messageIdentity.getMessageID()); 722 os.close(); 723 return journal.write(pos.getPacket(), sync); 724 725 } 726 catch (IOException e) { 727 throw createWriteException("Ack for message: "+messageIdentity, e); 728 } 729 } 730 731 public JournalTransactionStore getTransactionStore() { 732 return transactionStore; 733 } 734 735 public int getLogFileCount() { 736 return logFileCount; 737 } 738 739 public void setLogFileCount(int logFileCount) { 740 this.logFileCount = logFileCount; 741 } 742 743 public int getLogFileSize() { 744 return logFileSize; 745 } 746 747 public void setLogFileSize(int logFileSize) { 748 this.logFileSize = logFileSize; 749 } 750 751 /** 752 * Verifies if a dead letter has already been sent for a message 753 * @param seq 754 * @param useLocking to prevent concurrency/dups 755 * @return 756 */ 757 public boolean deadLetterAlreadySent(long seq, boolean useLocking) { 758 return longTermPersistence.deadLetterAlreadySent(seq, useLocking); 759 } 760 761 public long getCheckpointInterval() { 762 return checkpointInterval; 763 } 764 public void setCheckpointInterval(long checkpointInterval) { 765 this.checkpointInterval = checkpointInterval; 766 } 767 }