001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.jdbc; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.sql.SQLException; 022 import java.util.Collections; 023 import java.util.Set; 024 import java.util.concurrent.ScheduledFuture; 025 import java.util.concurrent.ScheduledThreadPoolExecutor; 026 import java.util.concurrent.ThreadFactory; 027 import java.util.concurrent.TimeUnit; 028 029 import javax.sql.DataSource; 030 031 import org.apache.activemq.ActiveMQMessageAudit; 032 import org.apache.activemq.broker.BrokerService; 033 import org.apache.activemq.broker.BrokerServiceAware; 034 import org.apache.activemq.broker.ConnectionContext; 035 import org.apache.activemq.command.ActiveMQDestination; 036 import org.apache.activemq.command.ActiveMQQueue; 037 import org.apache.activemq.command.ActiveMQTopic; 038 import org.apache.activemq.command.Message; 039 import org.apache.activemq.command.MessageId; 040 import org.apache.activemq.openwire.OpenWireFormat; 041 import org.apache.activemq.store.MessageStore; 042 import org.apache.activemq.store.PersistenceAdapter; 043 import org.apache.activemq.store.TopicMessageStore; 044 import org.apache.activemq.store.TransactionStore; 045 import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; 046 import org.apache.activemq.store.memory.MemoryTransactionStore; 047 import org.apache.activemq.usage.SystemUsage; 048 import org.apache.activemq.util.ByteSequence; 049 import org.apache.activemq.util.FactoryFinder; 050 import org.apache.activemq.util.IOExceptionSupport; 051 import org.apache.activemq.util.LongSequenceGenerator; 052 import org.apache.activemq.wireformat.WireFormat; 053 import org.apache.commons.logging.Log; 054 import org.apache.commons.logging.LogFactory; 055 056 /** 057 * A {@link PersistenceAdapter} implementation using JDBC for persistence 058 * storage. 059 * 060 * This persistence adapter will correctly remember prepared XA transactions, 061 * but it will not keep track of local transaction commits so that operations 062 * performed against the Message store are done as a single uow. 063 * 064 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter" 065 * 066 * @version $Revision: 1.9 $ 067 */ 068 public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter, 069 BrokerServiceAware { 070 071 private static final Log LOG = LogFactory.getLog(JDBCPersistenceAdapter.class); 072 private static FactoryFinder adapterFactoryFinder = new FactoryFinder( 073 "META-INF/services/org/apache/activemq/store/jdbc/"); 074 private static FactoryFinder lockFactoryFinder = new FactoryFinder( 075 "META-INF/services/org/apache/activemq/store/jdbc/lock/"); 076 077 private WireFormat wireFormat = new OpenWireFormat(); 078 private BrokerService brokerService; 079 private Statements statements; 080 private JDBCAdapter adapter; 081 private MemoryTransactionStore transactionStore; 082 private ScheduledThreadPoolExecutor clockDaemon; 083 private ScheduledFuture<?> cleanupTicket, keepAliveTicket; 084 private int cleanupPeriod = 1000 * 60 * 5; 085 private boolean useExternalMessageReferences; 086 private boolean useDatabaseLock = true; 087 private long lockKeepAlivePeriod = 1000*30; 088 private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL; 089 private DatabaseLocker databaseLocker; 090 private boolean createTablesOnStartup = true; 091 private DataSource lockDataSource; 092 private int transactionIsolation; 093 094 protected int maxProducersToAudit=1024; 095 protected int maxAuditDepth=1000; 096 protected boolean enableAudit=true; 097 protected int auditRecoveryDepth = 1024; 098 protected ActiveMQMessageAudit audit; 099 100 protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 101 102 public JDBCPersistenceAdapter() { 103 } 104 105 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) { 106 super(ds); 107 this.wireFormat = wireFormat; 108 } 109 110 public Set<ActiveMQDestination> getDestinations() { 111 // Get a connection and insert the message into the DB. 112 TransactionContext c = null; 113 try { 114 c = getTransactionContext(); 115 return getAdapter().doGetDestinations(c); 116 } catch (IOException e) { 117 return emptyDestinationSet(); 118 } catch (SQLException e) { 119 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 120 return emptyDestinationSet(); 121 } finally { 122 if (c != null) { 123 try { 124 c.close(); 125 } catch (Throwable e) { 126 } 127 } 128 } 129 } 130 131 @SuppressWarnings("unchecked") 132 private Set<ActiveMQDestination> emptyDestinationSet() { 133 return Collections.EMPTY_SET; 134 } 135 136 protected void createMessageAudit() { 137 if (enableAudit && audit == null) { 138 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 139 TransactionContext c = null; 140 141 try { 142 c = getTransactionContext(); 143 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { 144 public void messageId(MessageId id) { 145 audit.isDuplicate(id); 146 } 147 }); 148 } catch (Exception e) { 149 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); 150 } finally { 151 if (c != null) { 152 try { 153 c.close(); 154 } catch (Throwable e) { 155 } 156 } 157 } 158 } 159 } 160 161 public void initSequenceIdGenerator() { 162 TransactionContext c = null; 163 try { 164 c = getTransactionContext(); 165 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { 166 public void messageId(MessageId id) { 167 audit.isDuplicate(id); 168 } 169 }); 170 } catch (Exception e) { 171 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); 172 } finally { 173 if (c != null) { 174 try { 175 c.close(); 176 } catch (Throwable e) { 177 } 178 } 179 } 180 181 } 182 183 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 184 MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit); 185 if (transactionStore != null) { 186 rc = transactionStore.proxy(rc); 187 } 188 return rc; 189 } 190 191 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 192 TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit); 193 if (transactionStore != null) { 194 rc = transactionStore.proxy(rc); 195 } 196 return rc; 197 } 198 199 /** 200 * Cleanup method to remove any state associated with the given destination 201 * No state retained.... nothing to do 202 * 203 * @param destination Destination to forget 204 */ 205 public void removeQueueMessageStore(ActiveMQQueue destination) { 206 } 207 208 /** 209 * Cleanup method to remove any state associated with the given destination 210 * No state retained.... nothing to do 211 * 212 * @param destination Destination to forget 213 */ 214 public void removeTopicMessageStore(ActiveMQTopic destination) { 215 } 216 217 public TransactionStore createTransactionStore() throws IOException { 218 if (transactionStore == null) { 219 transactionStore = new MemoryTransactionStore(this); 220 } 221 return this.transactionStore; 222 } 223 224 public long getLastMessageBrokerSequenceId() throws IOException { 225 // Get a connection and insert the message into the DB. 226 TransactionContext c = getTransactionContext(); 227 try { 228 long seq = getAdapter().doGetLastMessageStoreSequenceId(c); 229 sequenceGenerator.setLastSequenceId(seq); 230 long brokerSeq = 0; 231 if (seq != 0) { 232 Message last = (Message)wireFormat.unmarshal(new ByteSequence(getAdapter().doGetMessageById(c, seq))); 233 brokerSeq = last.getMessageId().getBrokerSequenceId(); 234 } 235 return brokerSeq; 236 } catch (SQLException e) { 237 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 238 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); 239 } finally { 240 c.close(); 241 } 242 } 243 244 public void start() throws Exception { 245 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 246 247 if (isCreateTablesOnStartup()) { 248 TransactionContext transactionContext = getTransactionContext(); 249 transactionContext.begin(); 250 try { 251 try { 252 getAdapter().doCreateTables(transactionContext); 253 } catch (SQLException e) { 254 LOG.warn("Cannot create tables due to: " + e); 255 JDBCPersistenceAdapter.log("Failure Details: ", e); 256 } 257 } finally { 258 transactionContext.commit(); 259 } 260 } 261 262 if (isUseDatabaseLock()) { 263 DatabaseLocker service = getDatabaseLocker(); 264 if (service == null) { 265 LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter"); 266 } else { 267 service.start(); 268 if (lockKeepAlivePeriod > 0) { 269 keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() { 270 public void run() { 271 databaseLockKeepAlive(); 272 } 273 }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS); 274 } 275 if (brokerService != null) { 276 brokerService.getBroker().nowMasterBroker(); 277 } 278 } 279 } 280 281 cleanup(); 282 283 // Cleanup the db periodically. 284 if (cleanupPeriod > 0) { 285 cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() { 286 public void run() { 287 cleanup(); 288 } 289 }, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS); 290 } 291 292 createMessageAudit(); 293 } 294 295 public synchronized void stop() throws Exception { 296 if (cleanupTicket != null) { 297 cleanupTicket.cancel(true); 298 cleanupTicket = null; 299 } 300 if (keepAliveTicket != null) { 301 keepAliveTicket.cancel(false); 302 keepAliveTicket = null; 303 } 304 305 // do not shutdown clockDaemon as it may kill the thread initiating shutdown 306 DatabaseLocker service = getDatabaseLocker(); 307 if (service != null) { 308 service.stop(); 309 } 310 } 311 312 public void cleanup() { 313 TransactionContext c = null; 314 try { 315 LOG.debug("Cleaning up old messages."); 316 c = getTransactionContext(); 317 getAdapter().doDeleteOldMessages(c); 318 } catch (IOException e) { 319 LOG.warn("Old message cleanup failed due to: " + e, e); 320 } catch (SQLException e) { 321 LOG.warn("Old message cleanup failed due to: " + e); 322 JDBCPersistenceAdapter.log("Failure Details: ", e); 323 } finally { 324 if (c != null) { 325 try { 326 c.close(); 327 } catch (Throwable e) { 328 } 329 } 330 LOG.debug("Cleanup done."); 331 } 332 } 333 334 public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) { 335 this.clockDaemon = clockDaemon; 336 } 337 338 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() { 339 if (clockDaemon == null) { 340 clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { 341 public Thread newThread(Runnable runnable) { 342 Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer"); 343 thread.setDaemon(true); 344 return thread; 345 } 346 }); 347 } 348 return clockDaemon; 349 } 350 351 public JDBCAdapter getAdapter() throws IOException { 352 if (adapter == null) { 353 setAdapter(createAdapter()); 354 } 355 return adapter; 356 } 357 358 public DatabaseLocker getDatabaseLocker() throws IOException { 359 if (databaseLocker == null && isUseDatabaseLock()) { 360 setDatabaseLocker(loadDataBaseLocker()); 361 } 362 return databaseLocker; 363 } 364 365 /** 366 * Sets the database locker strategy to use to lock the database on startup 367 * @throws IOException 368 */ 369 public void setDatabaseLocker(DatabaseLocker locker) throws IOException { 370 databaseLocker = locker; 371 databaseLocker.setPersistenceAdapter(this); 372 databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval()); 373 } 374 375 public DataSource getLockDataSource() throws IOException { 376 if (lockDataSource == null) { 377 lockDataSource = getDataSource(); 378 if (lockDataSource == null) { 379 throw new IllegalArgumentException( 380 "No dataSource property has been configured"); 381 } 382 } else { 383 LOG.info("Using a separate dataSource for locking: " 384 + lockDataSource); 385 } 386 return lockDataSource; 387 } 388 389 public void setLockDataSource(DataSource dataSource) { 390 this.lockDataSource = dataSource; 391 } 392 393 public BrokerService getBrokerService() { 394 return brokerService; 395 } 396 397 public void setBrokerService(BrokerService brokerService) { 398 this.brokerService = brokerService; 399 } 400 401 /** 402 * @throws IOException 403 */ 404 protected JDBCAdapter createAdapter() throws IOException { 405 406 adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter"); 407 408 // Use the default JDBC adapter if the 409 // Database type is not recognized. 410 if (adapter == null) { 411 adapter = new DefaultJDBCAdapter(); 412 LOG.debug("Using default JDBC Adapter: " + adapter); 413 } 414 return adapter; 415 } 416 417 private Object loadAdapter(FactoryFinder finder, String kind) throws IOException { 418 Object adapter = null; 419 TransactionContext c = getTransactionContext(); 420 try { 421 try { 422 // Make the filename file system safe. 423 String dirverName = c.getConnection().getMetaData().getDriverName(); 424 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(); 425 426 try { 427 adapter = finder.newInstance(dirverName); 428 LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass()); 429 } catch (Throwable e) { 430 LOG.info("Database " + kind + " driver override not found for : [" + dirverName 431 + "]. Will use default implementation."); 432 } 433 } catch (SQLException e) { 434 LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: " 435 + e.getMessage()); 436 JDBCPersistenceAdapter.log("Failure Details: ", e); 437 } 438 } finally { 439 c.close(); 440 } 441 return adapter; 442 } 443 444 public void setAdapter(JDBCAdapter adapter) { 445 this.adapter = adapter; 446 this.adapter.setStatements(getStatements()); 447 } 448 449 public WireFormat getWireFormat() { 450 return wireFormat; 451 } 452 453 public void setWireFormat(WireFormat wireFormat) { 454 this.wireFormat = wireFormat; 455 } 456 457 public TransactionContext getTransactionContext(ConnectionContext context) throws IOException { 458 if (context == null) { 459 return getTransactionContext(); 460 } else { 461 TransactionContext answer = (TransactionContext)context.getLongTermStoreContext(); 462 if (answer == null) { 463 answer = getTransactionContext(); 464 context.setLongTermStoreContext(answer); 465 } 466 return answer; 467 } 468 } 469 470 public TransactionContext getTransactionContext() throws IOException { 471 TransactionContext answer = new TransactionContext(getDataSource()); 472 if (transactionIsolation > 0) { 473 answer.setTransactionIsolation(transactionIsolation); 474 } 475 return answer; 476 } 477 478 public void beginTransaction(ConnectionContext context) throws IOException { 479 TransactionContext transactionContext = getTransactionContext(context); 480 transactionContext.begin(); 481 } 482 483 public void commitTransaction(ConnectionContext context) throws IOException { 484 TransactionContext transactionContext = getTransactionContext(context); 485 transactionContext.commit(); 486 } 487 488 public void rollbackTransaction(ConnectionContext context) throws IOException { 489 TransactionContext transactionContext = getTransactionContext(context); 490 transactionContext.rollback(); 491 } 492 493 public int getCleanupPeriod() { 494 return cleanupPeriod; 495 } 496 497 /** 498 * Sets the number of milliseconds until the database is attempted to be 499 * cleaned up for durable topics 500 */ 501 public void setCleanupPeriod(int cleanupPeriod) { 502 this.cleanupPeriod = cleanupPeriod; 503 } 504 505 public void deleteAllMessages() throws IOException { 506 TransactionContext c = getTransactionContext(); 507 try { 508 getAdapter().doDropTables(c); 509 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 510 getAdapter().doCreateTables(c); 511 } catch (SQLException e) { 512 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 513 throw IOExceptionSupport.create(e); 514 } finally { 515 c.close(); 516 } 517 } 518 519 public boolean isUseExternalMessageReferences() { 520 return useExternalMessageReferences; 521 } 522 523 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 524 this.useExternalMessageReferences = useExternalMessageReferences; 525 } 526 527 public boolean isCreateTablesOnStartup() { 528 return createTablesOnStartup; 529 } 530 531 /** 532 * Sets whether or not tables are created on startup 533 */ 534 public void setCreateTablesOnStartup(boolean createTablesOnStartup) { 535 this.createTablesOnStartup = createTablesOnStartup; 536 } 537 538 public boolean isUseDatabaseLock() { 539 return useDatabaseLock; 540 } 541 542 /** 543 * Sets whether or not an exclusive database lock should be used to enable 544 * JDBC Master/Slave. Enabled by default. 545 */ 546 public void setUseDatabaseLock(boolean useDatabaseLock) { 547 this.useDatabaseLock = useDatabaseLock; 548 } 549 550 public static void log(String msg, SQLException e) { 551 String s = msg + e.getMessage(); 552 while (e.getNextException() != null) { 553 e = e.getNextException(); 554 s += ", due to: " + e.getMessage(); 555 } 556 LOG.warn(s, e); 557 } 558 559 public Statements getStatements() { 560 if (statements == null) { 561 statements = new Statements(); 562 } 563 return statements; 564 } 565 566 public void setStatements(Statements statements) { 567 this.statements = statements; 568 } 569 570 /** 571 * @param usageManager The UsageManager that is controlling the 572 * destination's memory usage. 573 */ 574 public void setUsageManager(SystemUsage usageManager) { 575 } 576 577 protected void databaseLockKeepAlive() { 578 boolean stop = false; 579 try { 580 DatabaseLocker locker = getDatabaseLocker(); 581 if (locker != null) { 582 if (!locker.keepAlive()) { 583 stop = true; 584 } 585 } 586 } catch (IOException e) { 587 LOG.error("Failed to get database when trying keepalive: " + e, e); 588 } 589 if (stop) { 590 stopBroker(); 591 } 592 } 593 594 protected void stopBroker() { 595 // we can no longer keep the lock so lets fail 596 LOG.info("No longer able to keep the exclusive lock so giving up being a master"); 597 try { 598 brokerService.stop(); 599 } catch (Exception e) { 600 LOG.warn("Failure occured while stopping broker"); 601 } 602 } 603 604 protected DatabaseLocker loadDataBaseLocker() throws IOException { 605 DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock"); 606 if (locker == null) { 607 locker = new DefaultDatabaseLocker(); 608 LOG.debug("Using default JDBC Locker: " + locker); 609 } 610 return locker; 611 } 612 613 public void setBrokerName(String brokerName) { 614 } 615 616 public String toString() { 617 return "JDBCPersistenceAdapter(" + super.toString() + ")"; 618 } 619 620 public void setDirectory(File dir) { 621 } 622 623 public void checkpoint(boolean sync) throws IOException { 624 } 625 626 public long size(){ 627 return 0; 628 } 629 630 public long getLockKeepAlivePeriod() { 631 return lockKeepAlivePeriod; 632 } 633 634 public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) { 635 this.lockKeepAlivePeriod = lockKeepAlivePeriod; 636 } 637 638 public long getLockAcquireSleepInterval() { 639 return lockAcquireSleepInterval; 640 } 641 642 /** 643 * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker 644 * not applied if DataBaseLocker is injected. 645 */ 646 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) { 647 this.lockAcquireSleepInterval = lockAcquireSleepInterval; 648 } 649 650 /** 651 * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED 652 * This allowable dirty isolation level may not be achievable in clustered DB environments 653 * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABE_READ 654 * see isolation level constants in {@link java.sql.Connection} 655 * @param transactionIsolation the isolation level to use 656 */ 657 public void setTransactionIsolation(int transactionIsolation) { 658 this.transactionIsolation = transactionIsolation; 659 } 660 661 public int getMaxProducersToAudit() { 662 return maxProducersToAudit; 663 } 664 665 public void setMaxProducersToAudit(int maxProducersToAudit) { 666 this.maxProducersToAudit = maxProducersToAudit; 667 } 668 669 public int getMaxAuditDepth() { 670 return maxAuditDepth; 671 } 672 673 public void setMaxAuditDepth(int maxAuditDepth) { 674 this.maxAuditDepth = maxAuditDepth; 675 } 676 677 public boolean isEnableAudit() { 678 return enableAudit; 679 } 680 681 public void setEnableAudit(boolean enableAudit) { 682 this.enableAudit = enableAudit; 683 } 684 685 public int getAuditRecoveryDepth() { 686 return auditRecoveryDepth; 687 } 688 689 public void setAuditRecoveryDepth(int auditRecoveryDepth) { 690 this.auditRecoveryDepth = auditRecoveryDepth; 691 } 692 693 public long getNextSequenceId() { 694 synchronized(sequenceGenerator) { 695 return sequenceGenerator.getNextSequenceId(); 696 } 697 } 698 699 }