001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.amq; 018 019 import java.io.IOException; 020 import java.io.InterruptedIOException; 021 import java.util.ArrayList; 022 import java.util.Collections; 023 import java.util.HashSet; 024 import java.util.Iterator; 025 import java.util.LinkedHashMap; 026 import java.util.List; 027 import java.util.Map; 028 import java.util.Set; 029 import java.util.Map.Entry; 030 import java.util.concurrent.CountDownLatch; 031 import java.util.concurrent.atomic.AtomicReference; 032 import java.util.concurrent.locks.Lock; 033 import org.apache.activemq.broker.ConnectionContext; 034 import org.apache.activemq.command.ActiveMQDestination; 035 import org.apache.activemq.command.DataStructure; 036 import org.apache.activemq.command.JournalQueueAck; 037 import org.apache.activemq.command.Message; 038 import org.apache.activemq.command.MessageAck; 039 import org.apache.activemq.command.MessageId; 040 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 041 import org.apache.activemq.kaha.impl.async.Location; 042 import org.apache.activemq.store.AbstractMessageStore; 043 import org.apache.activemq.store.MessageRecoveryListener; 044 import org.apache.activemq.store.PersistenceAdapter; 045 import org.apache.activemq.store.ReferenceStore; 046 import org.apache.activemq.store.ReferenceStore.ReferenceData; 047 import org.apache.activemq.thread.Task; 048 import org.apache.activemq.thread.TaskRunner; 049 import org.apache.activemq.transaction.Synchronization; 050 import org.apache.activemq.usage.MemoryUsage; 051 import org.apache.activemq.util.Callback; 052 import org.apache.activemq.util.TransactionTemplate; 053 import org.apache.commons.logging.Log; 054 import org.apache.commons.logging.LogFactory; 055 056 /** 057 * A MessageStore that uses a Journal to store it's messages. 058 * 059 * @version $Revision: 1.14 $ 060 */ 061 public class AMQMessageStore extends AbstractMessageStore { 062 private static final Log LOG = LogFactory.getLog(AMQMessageStore.class); 063 protected final AMQPersistenceAdapter peristenceAdapter; 064 protected final AMQTransactionStore transactionStore; 065 protected final ReferenceStore referenceStore; 066 protected final TransactionTemplate transactionTemplate; 067 protected Location lastLocation; 068 protected Location lastWrittenLocation; 069 protected Set<Location> inFlightTxLocations = new HashSet<Location>(); 070 protected final TaskRunner asyncWriteTask; 071 protected CountDownLatch flushLatch; 072 private Map<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>(); 073 private List<MessageAck> messageAcks = new ArrayList<MessageAck>(); 074 /** A MessageStore that we can use to retrieve messages quickly. */ 075 private Map<MessageId, ReferenceData> cpAddedMessageIds; 076 private final boolean debug = LOG.isDebugEnabled(); 077 private final AtomicReference<Location> mark = new AtomicReference<Location>(); 078 protected final Lock lock; 079 080 public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) { 081 super(destination); 082 this.peristenceAdapter = adapter; 083 this.lock = referenceStore.getStoreLock(); 084 this.transactionStore = adapter.getTransactionStore(); 085 this.referenceStore = referenceStore; 086 this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext( 087 new NonCachedMessageEvaluationContext())); 088 asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() { 089 public boolean iterate() { 090 asyncWrite(); 091 return false; 092 } 093 }, "Checkpoint: " + destination); 094 } 095 096 public void setMemoryUsage(MemoryUsage memoryUsage) { 097 referenceStore.setMemoryUsage(memoryUsage); 098 } 099 100 /** 101 * Not synchronize since the Journal has better throughput if you increase the number of concurrent writes that it 102 * is doing. 103 */ 104 public final void addMessage(ConnectionContext context, final Message message) throws IOException { 105 final MessageId id = message.getMessageId(); 106 final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); 107 if (!context.isInTransaction()) { 108 if (debug) { 109 LOG.debug("Journalled message add for: " + id + ", at: " + location); 110 } 111 this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId()); 112 addMessage(message, location); 113 } else { 114 if (debug) { 115 LOG.debug("Journalled transacted message add for: " + id + ", at: " + location); 116 } 117 lock.lock(); 118 try { 119 inFlightTxLocations.add(location); 120 } finally { 121 lock.unlock(); 122 } 123 transactionStore.addMessage(this, message, location); 124 context.getTransaction().addSynchronization(new Synchronization() { 125 public void afterCommit() throws Exception { 126 if (debug) { 127 LOG.debug("Transacted message add commit for: " + id + ", at: " + location); 128 } 129 lock.lock(); 130 try { 131 inFlightTxLocations.remove(location); 132 } finally { 133 lock.unlock(); 134 } 135 addMessage(message, location); 136 } 137 138 public void afterRollback() throws Exception { 139 if (debug) { 140 LOG.debug("Transacted message add rollback for: " + id + ", at: " + location); 141 } 142 lock.lock(); 143 try { 144 inFlightTxLocations.remove(location); 145 } finally { 146 lock.unlock(); 147 } 148 } 149 }); 150 } 151 } 152 153 final void addMessage(final Message message, final Location location) throws InterruptedIOException { 154 ReferenceData data = new ReferenceData(); 155 data.setExpiration(message.getExpiration()); 156 data.setFileId(location.getDataFileId()); 157 data.setOffset(location.getOffset()); 158 lock.lock(); 159 try { 160 lastLocation = location; 161 messages.put(message.getMessageId(), data); 162 } finally { 163 lock.unlock(); 164 } 165 if (messages.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) { 166 flush(); 167 } else { 168 try { 169 asyncWriteTask.wakeup(); 170 } catch (InterruptedException e) { 171 throw new InterruptedIOException(); 172 } 173 } 174 } 175 176 public boolean replayAddMessage(ConnectionContext context, Message message, Location location) { 177 MessageId id = message.getMessageId(); 178 try { 179 // Only add the message if it has not already been added. 180 ReferenceData data = referenceStore.getMessageReference(id); 181 if (data == null) { 182 data = new ReferenceData(); 183 data.setExpiration(message.getExpiration()); 184 data.setFileId(location.getDataFileId()); 185 data.setOffset(location.getOffset()); 186 referenceStore.addMessageReference(context, id, data); 187 return true; 188 } 189 } catch (Throwable e) { 190 LOG.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: " 191 + e, e); 192 } 193 return false; 194 } 195 196 /** 197 */ 198 public void removeMessage(final ConnectionContext context, final MessageAck ack) throws IOException { 199 JournalQueueAck remove = new JournalQueueAck(); 200 remove.setDestination(destination); 201 remove.setMessageAck(ack); 202 final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired()); 203 if (!context.isInTransaction()) { 204 if (debug) { 205 LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location); 206 } 207 removeMessage(ack, location); 208 } else { 209 if (debug) { 210 LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location); 211 } 212 lock.lock(); 213 try { 214 inFlightTxLocations.add(location); 215 } finally { 216 lock.unlock(); 217 } 218 transactionStore.removeMessage(this, ack, location); 219 context.getTransaction().addSynchronization(new Synchronization() { 220 public void afterCommit() throws Exception { 221 if (debug) { 222 LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " 223 + location); 224 } 225 lock.lock(); 226 try { 227 inFlightTxLocations.remove(location); 228 } finally { 229 lock.unlock(); 230 } 231 removeMessage(ack, location); 232 } 233 234 public void afterRollback() throws Exception { 235 if (debug) { 236 LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " 237 + location); 238 } 239 lock.lock(); 240 try { 241 inFlightTxLocations.remove(location); 242 } finally { 243 lock.unlock(); 244 } 245 } 246 }); 247 } 248 } 249 250 final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException { 251 ReferenceData data; 252 lock.lock(); 253 try { 254 lastLocation = location; 255 MessageId id = ack.getLastMessageId(); 256 data = messages.remove(id); 257 if (data == null) { 258 messageAcks.add(ack); 259 } else { 260 // message never got written so datafileReference will still exist 261 AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId()); 262 } 263 } finally { 264 lock.unlock(); 265 } 266 if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) { 267 flush(); 268 } else if (data == null) { 269 try { 270 asyncWriteTask.wakeup(); 271 } catch (InterruptedException e) { 272 throw new InterruptedIOException(); 273 } 274 } 275 } 276 277 public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) { 278 try { 279 // Only remove the message if it has not already been removed. 280 ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId()); 281 if (t != null) { 282 referenceStore.removeMessage(context, messageAck); 283 return true; 284 } 285 } catch (Throwable e) { 286 LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() 287 + "'. Message may have already been acknowledged. reason: " + e); 288 } 289 return false; 290 } 291 292 /** 293 * Waits till the lastest data has landed on the referenceStore 294 * 295 * @throws InterruptedIOException 296 */ 297 public void flush() throws InterruptedIOException { 298 if (LOG.isDebugEnabled()) { 299 LOG.debug("flush starting ..."); 300 } 301 CountDownLatch countDown; 302 lock.lock(); 303 try { 304 if (lastWrittenLocation == lastLocation) { 305 return; 306 } 307 if (flushLatch == null) { 308 flushLatch = new CountDownLatch(1); 309 } 310 countDown = flushLatch; 311 } finally { 312 lock.unlock(); 313 } 314 try { 315 asyncWriteTask.wakeup(); 316 countDown.await(); 317 } catch (InterruptedException e) { 318 throw new InterruptedIOException(); 319 } 320 if (LOG.isDebugEnabled()) { 321 LOG.debug("flush finished"); 322 } 323 } 324 325 /** 326 * @return 327 * @throws IOException 328 */ 329 synchronized void asyncWrite() { 330 try { 331 CountDownLatch countDown; 332 lock.lock(); 333 try { 334 countDown = flushLatch; 335 flushLatch = null; 336 } finally { 337 lock.unlock(); 338 } 339 mark.set(doAsyncWrite()); 340 if (countDown != null) { 341 countDown.countDown(); 342 } 343 } catch (IOException e) { 344 LOG.error("Checkpoint failed: " + e, e); 345 } 346 } 347 348 /** 349 * @return 350 * @throws IOException 351 */ 352 protected Location doAsyncWrite() throws IOException { 353 final List<MessageAck> cpRemovedMessageLocations; 354 final List<Location> cpActiveJournalLocations; 355 final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize(); 356 final Location lastLocation; 357 // swap out the message hash maps.. 358 lock.lock(); 359 try { 360 cpAddedMessageIds = this.messages; 361 cpRemovedMessageLocations = this.messageAcks; 362 cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations); 363 this.messages = new LinkedHashMap<MessageId, ReferenceData>(); 364 this.messageAcks = new ArrayList<MessageAck>(); 365 lastLocation = this.lastLocation; 366 } finally { 367 lock.unlock(); 368 } 369 if (LOG.isDebugEnabled()) { 370 LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: " 371 + cpRemovedMessageLocations.size() + " "); 372 } 373 transactionTemplate.run(new Callback() { 374 public void execute() throws Exception { 375 int size = 0; 376 PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter(); 377 ConnectionContext context = transactionTemplate.getContext(); 378 // Checkpoint the added messages. 379 Iterator<Entry<MessageId, ReferenceData>> iterator = cpAddedMessageIds.entrySet().iterator(); 380 while (iterator.hasNext()) { 381 Entry<MessageId, ReferenceData> entry = iterator.next(); 382 try { 383 if (referenceStore.addMessageReference(context, entry.getKey(), entry.getValue())) { 384 if (LOG.isDebugEnabled()) { 385 LOG.debug("adding message ref:" + entry.getKey()); 386 } 387 size++; 388 } else { 389 if (LOG.isDebugEnabled()) { 390 LOG.debug("not adding duplicate reference: " + entry.getKey() + ", " + entry.getValue()); 391 } 392 } 393 AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, entry 394 .getValue().getFileId()); 395 } catch (Throwable e) { 396 LOG.warn("Message could not be added to long term store: " + e.getMessage(), e); 397 } 398 399 // Commit the batch if it's getting too big 400 if (size >= maxCheckpointMessageAddSize) { 401 persitanceAdapter.commitTransaction(context); 402 persitanceAdapter.beginTransaction(context); 403 size = 0; 404 } 405 } 406 persitanceAdapter.commitTransaction(context); 407 persitanceAdapter.beginTransaction(context); 408 // Checkpoint the removed messages. 409 for (MessageAck ack : cpRemovedMessageLocations) { 410 try { 411 referenceStore.removeMessage(transactionTemplate.getContext(), ack); 412 } catch (Throwable e) { 413 LOG.warn("Message could not be removed from long term store: " + e.getMessage(), e); 414 } 415 } 416 } 417 }); 418 LOG.debug("Batch update done."); 419 lock.lock(); 420 try { 421 cpAddedMessageIds = null; 422 lastWrittenLocation = lastLocation; 423 } finally { 424 lock.unlock(); 425 } 426 if (cpActiveJournalLocations.size() > 0) { 427 Collections.sort(cpActiveJournalLocations); 428 return cpActiveJournalLocations.get(0); 429 } else { 430 return lastLocation; 431 } 432 } 433 434 /** 435 * 436 */ 437 public Message getMessage(MessageId identity) throws IOException { 438 Location location = getLocation(identity); 439 if (location != null) { 440 DataStructure rc = peristenceAdapter.readCommand(location); 441 try { 442 return (Message) rc; 443 } catch (ClassCastException e) { 444 throw new IOException("Could not read message " + identity + " at location " + location 445 + ", expected a message, but got: " + rc); 446 } 447 } 448 return null; 449 } 450 451 protected Location getLocation(MessageId messageId) throws IOException { 452 ReferenceData data = null; 453 lock.lock(); 454 try { 455 // Is it still in flight??? 456 data = messages.get(messageId); 457 if (data == null && cpAddedMessageIds != null) { 458 data = cpAddedMessageIds.get(messageId); 459 } 460 } finally { 461 lock.unlock(); 462 } 463 if (data == null) { 464 data = referenceStore.getMessageReference(messageId); 465 if (data == null) { 466 return null; 467 } 468 } 469 Location location = new Location(); 470 location.setDataFileId(data.getFileId()); 471 location.setOffset(data.getOffset()); 472 return location; 473 } 474 475 /** 476 * Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the 477 * transaction log and then the cache is updated. 478 * 479 * @param listener 480 * @throws Exception 481 */ 482 public void recover(final MessageRecoveryListener listener) throws Exception { 483 flush(); 484 referenceStore.recover(new RecoveryListenerAdapter(this, listener)); 485 } 486 487 public void start() throws Exception { 488 referenceStore.start(); 489 } 490 491 public void stop() throws Exception { 492 flush(); 493 asyncWriteTask.shutdown(); 494 referenceStore.stop(); 495 } 496 497 /** 498 * @return Returns the longTermStore. 499 */ 500 public ReferenceStore getReferenceStore() { 501 return referenceStore; 502 } 503 504 /** 505 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) 506 */ 507 public void removeAllMessages(ConnectionContext context) throws IOException { 508 flush(); 509 referenceStore.removeAllMessages(context); 510 } 511 512 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, 513 String messageRef) throws IOException { 514 throw new IOException("The journal does not support message references."); 515 } 516 517 public String getMessageReference(MessageId identity) throws IOException { 518 throw new IOException("The journal does not support message references."); 519 } 520 521 /** 522 * @return 523 * @throws IOException 524 * @see org.apache.activemq.store.MessageStore#getMessageCount() 525 */ 526 public int getMessageCount() throws IOException { 527 flush(); 528 return referenceStore.getMessageCount(); 529 } 530 531 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { 532 RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener); 533 referenceStore.recoverNextMessages(maxReturned, recoveryListener); 534 if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) { 535 flush(); 536 referenceStore.recoverNextMessages(maxReturned, recoveryListener); 537 } 538 } 539 540 Message getMessage(ReferenceData data) throws IOException { 541 Location location = new Location(); 542 location.setDataFileId(data.getFileId()); 543 location.setOffset(data.getOffset()); 544 DataStructure rc = peristenceAdapter.readCommand(location); 545 try { 546 return (Message) rc; 547 } catch (ClassCastException e) { 548 throw new IOException("Could not read message at location " + location + ", expected a message, but got: " 549 + rc); 550 } 551 } 552 553 public void resetBatching() { 554 referenceStore.resetBatching(); 555 } 556 557 public Location getMark() { 558 return mark.get(); 559 } 560 561 public void dispose(ConnectionContext context) { 562 try { 563 flush(); 564 } catch (InterruptedIOException e) { 565 Thread.currentThread().interrupt(); 566 } 567 referenceStore.dispose(context); 568 super.dispose(context); 569 } 570 571 public void setBatch(MessageId messageId) { 572 try { 573 flush(); 574 } catch (InterruptedIOException e) { 575 LOG.debug("flush on setBatch resulted in exception", e); 576 } 577 getReferenceStore().setBatch(messageId); 578 } 579 580 }