001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.store.jdbc.adapter; 019 020 import java.sql.Connection; 021 import java.sql.PreparedStatement; 022 import java.sql.ResultSet; 023 import java.sql.SQLException; 024 import java.sql.Statement; 025 026 import javax.jms.JMSException; 027 import javax.transaction.xa.XAException; 028 029 import org.activemq.message.ActiveMQXid; 030 import org.activemq.service.SubscriberEntry; 031 import org.activemq.store.TransactionStore.RecoveryListener; 032 import org.activemq.store.jdbc.JDBCAdapter; 033 import org.activemq.store.jdbc.StatementProvider; 034 import org.activemq.util.LongSequenceGenerator; 035 import org.activemq.service.MessageIdentity; 036 import org.apache.commons.logging.Log; 037 import org.apache.commons.logging.LogFactory; 038 039 /** 040 * Implements all the default JDBC operations that are used 041 * by the JDBCPersistenceAdapter. 042 * <p/> 043 * Subclassing is encouraged to override the default 044 * implementation of methods to account for differences 045 * in JDBC Driver implementations. 046 * <p/> 047 * The JDBCAdapter inserts and extracts BLOB data using the 048 * getBytes()/setBytes() operations. 049 * <p/> 050 * The databases/JDBC drivers that use this adapter are: 051 * <ul> 052 * <li></li> 053 * </ul> 054 * 055 * @version $Revision: 1.1 $ 056 */ 057 public class DefaultJDBCAdapter implements JDBCAdapter { 058 059 private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class); 060 061 final protected StatementProvider statementProvider; 062 protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 063 064 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { 065 s.setBytes(index, data); 066 } 067 068 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException { 069 return rs.getBytes(index); 070 } 071 072 /** 073 * @param provider 074 */ 075 public DefaultJDBCAdapter(StatementProvider provider) { 076 this.statementProvider = new CachingStatementProvider(provider); 077 } 078 079 public DefaultJDBCAdapter() { 080 this(new DefaultStatementProvider()); 081 } 082 083 public LongSequenceGenerator getSequenceGenerator() { 084 return sequenceGenerator; 085 } 086 087 public void doCreateTables(Connection c) throws SQLException { 088 Statement s = null; 089 try { 090 s = c.createStatement(); 091 String[] createStatments = statementProvider.getCreateSchemaStatments(); 092 for (int i = 0; i < createStatments.length; i++) { 093 // This will fail usually since the tables will be 094 // created allready. 095 try { 096 boolean rc = s.execute(createStatments[i]); 097 } 098 catch (SQLException e) { 099 log.info("Could not create JDBC tables; they could already exist." + 100 " Failure was: " + createStatments[i] + " Message: " + e.getMessage() + 101 " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode() ); 102 } 103 } 104 c.commit(); 105 } 106 finally { 107 try { 108 s.close(); 109 } 110 catch (Throwable e) { 111 } 112 } 113 } 114 115 public void doDropTables(Connection c) throws SQLException { 116 Statement s = null; 117 try { 118 s = c.createStatement(); 119 String[] dropStatments = statementProvider.getDropSchemaStatments(); 120 for (int i = 0; i < dropStatments.length; i++) { 121 // This will fail usually since the tables will be 122 // created allready. 123 try { 124 boolean rc = s.execute(dropStatments[i]); 125 } 126 catch (SQLException e) { 127 log.warn("Could not drop JDBC tables; they may not exist." + 128 " Failure was: " + dropStatments[i] + " Message: " + e.getMessage() + 129 " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode() ); 130 } 131 } 132 c.commit(); 133 } 134 finally { 135 try { 136 s.close(); 137 } 138 catch (Throwable e) { 139 } 140 } 141 } 142 143 public void initSequenceGenerator(Connection c) { 144 PreparedStatement s = null; 145 ResultSet rs = null; 146 try { 147 s = c.prepareStatement(statementProvider.getFindLastSequenceIdInMsgs()); 148 rs = s.executeQuery(); 149 long seq1 = 0; 150 if (rs.next()) { 151 seq1 = rs.getLong(1); 152 } 153 rs.close(); 154 s.close(); 155 s = c.prepareStatement(statementProvider.getFindLastSequenceIdInAcks()); 156 rs = s.executeQuery(); 157 long seq2 = 0; 158 if (rs.next()) { 159 seq2 = rs.getLong(1); 160 } 161 162 sequenceGenerator.setLastSequenceId(Math.max(seq1, seq2)); 163 log.debug("Last sequence id: "+sequenceGenerator.getLastSequenceId()); 164 } 165 catch (SQLException e) { 166 log.warn("Failed to find last sequence number: " + e, e); 167 } 168 finally { 169 try { 170 rs.close(); 171 } 172 catch (Throwable e) { 173 } 174 try { 175 s.close(); 176 } 177 catch (Throwable e) { 178 } 179 } 180 } 181 182 public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data, long expiration) throws SQLException, JMSException { 183 PreparedStatement s = null; 184 try { 185 s = c.prepareStatement(statementProvider.getAddMessageStatment()); 186 s.setLong(1, seq); 187 s.setString(2, destinationName); 188 s.setString(3, messageID); 189 setBinaryData(s, 4, data); 190 s.setLong(5, expiration); 191 if (s.executeUpdate() != 1) { 192 throw new JMSException("Failed to broker message: " + messageID + " in container. "); 193 } 194 } 195 finally { 196 try { 197 s.close(); 198 } 199 catch (Throwable e) { 200 } 201 } 202 } 203 204 public Long getMessageSequenceId(Connection c, String messageID) throws SQLException, JMSException { 205 PreparedStatement s = null; 206 ResultSet rs = null; 207 try { 208 209 s = c.prepareStatement(statementProvider.getFindMessageSequenceIdStatment()); 210 s.setString(1, messageID); 211 rs = s.executeQuery(); 212 213 if (!rs.next()) { 214 return null; 215 } 216 return new Long( rs.getLong(1) ); 217 218 } 219 finally { 220 try { 221 rs.close(); 222 } 223 catch (Throwable e) { 224 } 225 try { 226 s.close(); 227 } 228 catch (Throwable e) { 229 } 230 } 231 } 232 233 public byte[] doGetMessage(Connection c, long seq) throws SQLException { 234 PreparedStatement s = null; 235 ResultSet rs = null; 236 try { 237 238 s = c.prepareStatement(statementProvider.getFindMessageStatment()); 239 s.setLong(1, seq); 240 rs = s.executeQuery(); 241 242 if (!rs.next()) { 243 return null; 244 } 245 return getBinaryData(rs, 1); 246 247 } 248 finally { 249 try { 250 rs.close(); 251 } 252 catch (Throwable e) { 253 } 254 try { 255 s.close(); 256 } 257 catch (Throwable e) { 258 } 259 } 260 } 261 262 public void doGetMessageForUpdate(Connection c, long seq, boolean useLocking, ExpiredMessageResultHandler handler) throws SQLException, JMSException { 263 PreparedStatement s = null; 264 ResultSet rs = null; 265 try { 266 267 if (useLocking) { 268 s = c.prepareStatement(statementProvider.getFindMessageAttributesForUpdateStatment()); 269 } else { 270 s = c.prepareStatement(statementProvider.getFindMessageAttributesStatment()); 271 } 272 s.setLong(1, seq); 273 rs = s.executeQuery(); 274 275 if (rs.next()) { 276 String container = rs.getString(1); 277 String msgid = rs.getString(2); 278 boolean isSentToDeadLetter = rs.getString(3)!=null&&rs.getString(3).equals("Y"); 279 handler.onMessage(seq, container, msgid, isSentToDeadLetter); 280 } 281 } 282 finally { 283 try { 284 rs.close(); 285 } 286 catch (Throwable e) { 287 } 288 try { 289 s.close(); 290 } 291 catch (Throwable e) { 292 } 293 } 294 } 295 296 public void doSetDeadLetterFlag(Connection c, long seq) throws SQLException, JMSException { 297 PreparedStatement s = null; 298 ResultSet rs = null; 299 try { 300 // Update the db with the updated blob 301 s = c.prepareStatement(statementProvider.getSetDeadLetterFlagStatement()); 302 s.setLong(1, seq); 303 int i = s.executeUpdate(); 304 if (i <= 0) 305 throw new JMSException("Failed to broker message: " + seq 306 + " in container."); 307 308 } finally { 309 try { 310 rs.close(); 311 } catch (Throwable e) { 312 } 313 try { 314 s.close(); 315 } catch (Throwable e) { 316 } 317 } 318 } 319 320 public void doRemoveMessage(Connection c, long seq) throws SQLException { 321 PreparedStatement s = null; 322 try { 323 s = c.prepareStatement(statementProvider.getRemoveMessageStatment()); 324 s.setLong(1, seq); 325 if (s.executeUpdate() != 1) { 326 log.error("Could not delete sequenece number for: " + seq); 327 } 328 } 329 finally { 330 try { 331 s.close(); 332 } 333 catch (Throwable e) { 334 } 335 } 336 } 337 338 public void doRecover(Connection c, String destinationName, MessageListResultHandler listener) throws SQLException, JMSException { 339 PreparedStatement s = null; 340 ResultSet rs = null; 341 try { 342 343 s = c.prepareStatement(statementProvider.getFindAllMessagesStatment()); 344 s.setString(1, destinationName); 345 rs = s.executeQuery(); 346 347 while (rs.next()) { 348 long seq = rs.getLong(1); 349 String msgid = rs.getString(2); 350 listener.onMessage(seq, msgid); 351 } 352 353 } 354 finally { 355 try { 356 rs.close(); 357 } 358 catch (Throwable e) { 359 } 360 try { 361 s.close(); 362 } 363 catch (Throwable e) { 364 } 365 } 366 } 367 368 public void doRemoveXid(Connection c, ActiveMQXid xid) throws SQLException, XAException { 369 PreparedStatement s = null; 370 try { 371 s = c.prepareStatement(statementProvider.getRemoveXidStatment()); 372 s.setString(1, xid.toLocalTransactionId()); 373 if (s.executeUpdate() != 1) { 374 throw new XAException("Failed to remove prepared transaction: " + xid + "."); 375 } 376 } 377 finally { 378 try { 379 s.close(); 380 } 381 catch (Throwable e) { 382 } 383 } 384 } 385 386 387 public void doAddXid(Connection c, ActiveMQXid xid) throws SQLException, XAException { 388 PreparedStatement s = null; 389 try { 390 391 s = c.prepareStatement(statementProvider.getAddXidStatment()); 392 s.setString(1, xid.toLocalTransactionId()); 393 if (s.executeUpdate() != 1) { 394 throw new XAException("Failed to store prepared transaction: " + xid); 395 } 396 397 } 398 finally { 399 try { 400 s.close(); 401 } 402 catch (Throwable e) { 403 } 404 } 405 } 406 407 public void doLoadPreparedTransactions(Connection c, RecoveryListener listener) throws SQLException { 408 PreparedStatement s = null; 409 ResultSet rs = null; 410 try { 411 412 s = c.prepareStatement(statementProvider.getFindAllXidStatment()); 413 rs = s.executeQuery(); 414 415 while (rs.next()) { 416 String id = rs.getString(1); 417 418 419 /* 420 byte data[] = this.getBinaryData(rs, 2); 421 try { 422 ActiveMQXid xid = new ActiveMQXid(id); 423 Transaction transaction = XATransactionCommand.fromBytes(data); 424 transactionManager.loadTransaction(xid, transaction); 425 } 426 catch (Exception e) { 427 log.error("Failed to recover prepared transaction due to invalid xid: " + id, e); 428 } 429 */ 430 } 431 } 432 finally { 433 try { 434 rs.close(); 435 } 436 catch (Throwable e) { 437 } 438 try { 439 s.close(); 440 } 441 catch (Throwable e) { 442 } 443 } 444 } 445 446 /** 447 * @throws JMSException 448 * @see org.activemq.store.jdbc.JDBCAdapter#doSetLastAck(java.sql.Connection, java.lang.String, java.lang.String, long) 449 */ 450 public void doSetLastAck(Connection c, String destinationName, String subscriptionID, long seq) throws SQLException, JMSException { 451 PreparedStatement s = null; 452 try { 453 s = c.prepareStatement(statementProvider.getUpdateLastAckOfDurableSub()); 454 s.setLong(1, seq); 455 s.setString(2, subscriptionID); 456 s.setString(3, destinationName); 457 458 if (s.executeUpdate() != 1) { 459 throw new JMSException("Failed to acknowlege message with sequence id: " + seq + " for client: " + subscriptionID); 460 } 461 } 462 finally { 463 try { 464 s.close(); 465 } 466 catch (Throwable e) { 467 } 468 } 469 } 470 471 /** 472 * @throws JMSException 473 * @see org.activemq.store.jdbc.JDBCAdapter#doRecoverSubscription(java.sql.Connection, java.lang.String, java.lang.String, org.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler) 474 */ 475 public void doRecoverSubscription(Connection c, String destinationName, String subscriptionID, MessageListResultHandler listener) throws SQLException, JMSException { 476 // dumpTables(c, destinationName, subscriptionID); 477 478 PreparedStatement s = null; 479 ResultSet rs = null; 480 try { 481 482 // System.out.println(statementProvider.getFindAllDurableSubMessagesStatment()); 483 s = c.prepareStatement(statementProvider.getFindAllDurableSubMessagesStatment()); 484 s.setString(1, destinationName); 485 s.setString(2, subscriptionID); 486 rs = s.executeQuery(); 487 488 while (rs.next()) { 489 long seq = rs.getLong(1); 490 String msgid = rs.getString(2); 491 listener.onMessage(seq, msgid); 492 } 493 494 } 495 finally { 496 try { 497 rs.close(); 498 } 499 catch (Throwable e) { 500 } 501 try { 502 s.close(); 503 } 504 catch (Throwable e) { 505 } 506 } 507 } 508 509 /** 510 * @see org.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, org.activemq.service.SubscriberEntry) 511 */ 512 public void doSetSubscriberEntry(Connection c, String destinationName, String sub, SubscriberEntry subscriberEntry) throws SQLException { 513 514 PreparedStatement s = null; 515 try { 516 s = c.prepareStatement(statementProvider.getUpdateDurableSubStatment()); 517 s.setInt(1, subscriberEntry.getSubscriberID()); 518 s.setString(2, subscriberEntry.getClientID()); 519 s.setString(3, subscriberEntry.getConsumerName()); 520 s.setString(4, subscriberEntry.getSelector()); 521 s.setString(5, sub); 522 s.setString(6, destinationName); 523 524 // If the sub was not there then we need to create it. 525 if (s.executeUpdate() != 1) { 526 s.close(); 527 528 long id=0; 529 ResultSet rs=null; 530 s = c.prepareStatement(statementProvider.getFindLastSequenceIdInMsgs()); 531 try { 532 rs = s.executeQuery(); 533 if (rs.next()) { 534 id = rs.getLong(1); 535 } 536 } finally { 537 try { 538 rs.close(); 539 } catch (Throwable e) { 540 } 541 } 542 s.close(); 543 544 s = c.prepareStatement(statementProvider.getCreateDurableSubStatment()); 545 s.setInt(1, subscriberEntry.getSubscriberID()); 546 s.setString(2, subscriberEntry.getClientID()); 547 s.setString(3, subscriberEntry.getConsumerName()); 548 s.setString(4, subscriberEntry.getSelector()); 549 s.setString(5, sub); 550 s.setString(6, destinationName); 551 552 s.setLong(7, id); 553 554 if (s.executeUpdate() != 1) { 555 log.error("Failed to store durable subscription for: " + sub); 556 } 557 } 558 } 559 finally { 560 try { 561 s.close(); 562 } 563 catch (Throwable e) { 564 } 565 } 566 } 567 568 /** 569 * @see org.activemq.store.jdbc.JDBCAdapter#doGetSubscriberEntry(java.sql.Connection, java.lang.Object) 570 */ 571 public SubscriberEntry doGetSubscriberEntry(Connection c, String destinationName, String sub) throws SQLException { 572 PreparedStatement s = null; 573 ResultSet rs = null; 574 try { 575 576 s = c.prepareStatement(statementProvider.getFindDurableSubStatment()); 577 s.setString(1, sub); 578 s.setString(2, destinationName); 579 rs = s.executeQuery(); 580 581 if (!rs.next()) { 582 return null; 583 } 584 585 SubscriberEntry answer = new SubscriberEntry(); 586 answer.setSubscriberID(rs.getInt(1)); 587 answer.setClientID(rs.getString(2)); 588 answer.setConsumerName(rs.getString(3)); 589 answer.setDestination(rs.getString(4)); 590 591 return answer; 592 593 } 594 finally { 595 try { 596 rs.close(); 597 } 598 catch (Throwable e) { 599 } 600 try { 601 s.close(); 602 } 603 catch (Throwable e) { 604 } 605 } 606 } 607 608 public void doRemoveAllMessages(Connection c, String destinationName) throws SQLException, JMSException { 609 PreparedStatement s = null; 610 try { 611 s = c.prepareStatement(statementProvider.getRemoveAllMessagesStatment()); 612 s.setString(1, destinationName); 613 s.executeUpdate(); 614 s.close(); 615 616 s = c.prepareStatement(statementProvider.getRemoveAllSubscriptionsStatment()); 617 s.setString(1, destinationName); 618 s.executeUpdate(); 619 620 } 621 finally { 622 try { 623 s.close(); 624 } 625 catch (Throwable e) { 626 } 627 } 628 } 629 630 public void doDeleteSubscription(Connection c, String destinationName, String subscription) throws SQLException, JMSException { 631 PreparedStatement s = null; 632 try { 633 s = c.prepareStatement(statementProvider.getDeleteSubscriptionStatment()); 634 s.setString(1, subscription); 635 s.setString(2, destinationName); 636 637 s.executeUpdate(); 638 } 639 finally { 640 try { 641 s.close(); 642 } 643 catch (Throwable e) { 644 } 645 } 646 } 647 648 public void doDeleteOldMessages(Connection c) throws SQLException, JMSException { 649 PreparedStatement s = null; 650 try { 651 s = c.prepareStatement(statementProvider.getDeleteOldMessagesStatment()); 652 //s.setLong(1, System.currentTimeMillis()); 653 int i = s.executeUpdate(); 654 log.debug("Deleted "+i+" old message(s)."); 655 } 656 finally { 657 try { 658 s.close(); 659 } 660 catch (Throwable e) { 661 } 662 } 663 } 664 665 public void doGetExpiredMessages(Connection c, ExpiredMessageResultHandler handler) throws SQLException, JMSException { 666 PreparedStatement s = null; 667 ResultSet rs = null; 668 try { 669 s = c.prepareStatement(statementProvider.getFindExpiredMessagesStatment()); 670 s.setLong(1, System.currentTimeMillis()); 671 rs = s.executeQuery(); 672 while(rs.next()) { 673 long seq = rs.getLong(1); 674 String container = rs.getString(2); 675 String msgid = rs.getString(3); 676 boolean isSentToDeadLetter = rs.getString(4)!=null&&rs.getString(4).equals("Y"); 677 handler.onMessage(seq, container, msgid, isSentToDeadLetter); 678 } 679 } 680 finally { 681 try { 682 s.close(); 683 } 684 catch (Throwable e) { 685 } 686 } 687 } 688 689 public void doDeleteExpiredMessage(Connection c, MessageIdentity messageIdentity) throws SQLException, JMSException { 690 PreparedStatement s = null; 691 ResultSet rs = null; 692 try { 693 s = c.prepareStatement(statementProvider.getDeleteMessageStatement()); 694 Long seq = (Long)messageIdentity.getSequenceNumber(); 695 s.setLong(1, seq.longValue()); 696 s.setString(2, messageIdentity.getMessageID()); 697 int i = s.executeUpdate(); 698 log.debug("Deleted "+i+" old message."); 699 } 700 finally { 701 try { 702 s.close(); 703 } 704 catch (Throwable e) { 705 } 706 } 707 } 708 709 public StatementProvider getStatementProvider() { 710 return statementProvider; 711 } 712 713 /* 714 * Usefull for debuging. 715 * 716 public void dumpTables(Connection c, String destinationName, String subscriptionID) throws SQLException { 717 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 718 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 719 PreparedStatement s = c.prepareStatement("SELECT M.ID, M.MSGID " + 720 "FROM ACTIVEMQ_MSGS M, ACTIVEMQ_ACKS D " + 721 "WHERE D.CONTAINER=? AND D.SUB=? " + 722 "AND M.CONTAINER=D.CONTAINER " + 723 "AND M.ID > D.LAST_ACKED_ID " + 724 "ORDER BY M.ID"); 725 s.setString(1,destinationName); 726 s.setString(2,subscriptionID); 727 printQuery(s,System.out); 728 } 729 730 private void printQuery(Connection c, String query, PrintStream out) throws SQLException { 731 printQuery(c.prepareStatement(query), out); 732 } 733 734 private void printQuery(PreparedStatement s, PrintStream out) throws SQLException { 735 736 ResultSet set=null; 737 try { 738 set = s.executeQuery(); 739 ResultSetMetaData metaData = set.getMetaData(); 740 for( int i=1; i<= metaData.getColumnCount(); i++ ) { 741 if(i==1) 742 out.print("||"); 743 out.print(metaData.getColumnName(i)+"||"); 744 } 745 out.println(); 746 while(set.next()) { 747 for( int i=1; i<= metaData.getColumnCount(); i++ ) { 748 if(i==1) 749 out.print("|"); 750 out.print(set.getString(i)+"|"); 751 } 752 out.println(); 753 } 754 } finally { 755 try { set.close(); } catch (Throwable ignore) {} 756 try { s.close(); } catch (Throwable ignore) {} 757 } 758 } 759 */ 760 }