001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.amq;
018    
019    import java.io.IOException;
020    import java.io.InterruptedIOException;
021    import java.util.ArrayList;
022    import java.util.Collections;
023    import java.util.HashSet;
024    import java.util.Iterator;
025    import java.util.LinkedHashMap;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Set;
029    import java.util.Map.Entry;
030    import java.util.concurrent.CountDownLatch;
031    import java.util.concurrent.atomic.AtomicReference;
032    import java.util.concurrent.locks.Lock;
033    import org.apache.activemq.broker.ConnectionContext;
034    import org.apache.activemq.command.ActiveMQDestination;
035    import org.apache.activemq.command.DataStructure;
036    import org.apache.activemq.command.JournalQueueAck;
037    import org.apache.activemq.command.Message;
038    import org.apache.activemq.command.MessageAck;
039    import org.apache.activemq.command.MessageId;
040    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
041    import org.apache.activemq.kaha.impl.async.Location;
042    import org.apache.activemq.store.AbstractMessageStore;
043    import org.apache.activemq.store.MessageRecoveryListener;
044    import org.apache.activemq.store.PersistenceAdapter;
045    import org.apache.activemq.store.ReferenceStore;
046    import org.apache.activemq.store.ReferenceStore.ReferenceData;
047    import org.apache.activemq.thread.Task;
048    import org.apache.activemq.thread.TaskRunner;
049    import org.apache.activemq.transaction.Synchronization;
050    import org.apache.activemq.usage.MemoryUsage;
051    import org.apache.activemq.util.Callback;
052    import org.apache.activemq.util.TransactionTemplate;
053    import org.apache.commons.logging.Log;
054    import org.apache.commons.logging.LogFactory;
055    
056    /**
057     * A MessageStore that uses a Journal to store it's messages.
058     * 
059     * @version $Revision: 1.14 $
060     */
061    public class AMQMessageStore extends AbstractMessageStore {
062        private static final Log LOG = LogFactory.getLog(AMQMessageStore.class);
063        protected final AMQPersistenceAdapter peristenceAdapter;
064        protected final AMQTransactionStore transactionStore;
065        protected final ReferenceStore referenceStore;
066        protected final TransactionTemplate transactionTemplate;
067        protected Location lastLocation;
068        protected Location lastWrittenLocation;
069        protected Set<Location> inFlightTxLocations = new HashSet<Location>();
070        protected final TaskRunner asyncWriteTask;
071        protected CountDownLatch flushLatch;
072        private Map<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
073        private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
074        /** A MessageStore that we can use to retrieve messages quickly. */
075        private Map<MessageId, ReferenceData> cpAddedMessageIds;
076        private final boolean debug = LOG.isDebugEnabled();
077        private final AtomicReference<Location> mark = new AtomicReference<Location>();
078        protected final Lock lock;
079    
080        public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
081            super(destination);
082            this.peristenceAdapter = adapter;
083            this.lock = referenceStore.getStoreLock();
084            this.transactionStore = adapter.getTransactionStore();
085            this.referenceStore = referenceStore;
086            this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(
087                    new NonCachedMessageEvaluationContext()));
088            asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() {
089                public boolean iterate() {
090                    asyncWrite();
091                    return false;
092                }
093            }, "Checkpoint: " + destination);
094        }
095    
096        public void setMemoryUsage(MemoryUsage memoryUsage) {
097            referenceStore.setMemoryUsage(memoryUsage);
098        }
099    
100        /**
101         * Not synchronize since the Journal has better throughput if you increase the number of concurrent writes that it
102         * is doing.
103         */
104        public final void addMessage(ConnectionContext context, final Message message) throws IOException {
105            final MessageId id = message.getMessageId();
106            final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
107            if (!context.isInTransaction()) {
108                if (debug) {
109                    LOG.debug("Journalled message add for: " + id + ", at: " + location);
110                }
111                this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
112                addMessage(message, location);
113            } else {
114                if (debug) {
115                    LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
116                }
117                lock.lock();
118                try {
119                    inFlightTxLocations.add(location);
120                } finally {
121                    lock.unlock();
122                }
123                transactionStore.addMessage(this, message, location);
124                context.getTransaction().addSynchronization(new Synchronization() {
125                    public void afterCommit() throws Exception {
126                        if (debug) {
127                            LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
128                        }
129                        lock.lock();
130                        try {
131                            inFlightTxLocations.remove(location);
132                        } finally {
133                            lock.unlock();
134                        }
135                        addMessage(message, location);
136                    }
137    
138                    public void afterRollback() throws Exception {
139                        if (debug) {
140                            LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
141                        }
142                        lock.lock();
143                        try {
144                            inFlightTxLocations.remove(location);
145                        } finally {
146                            lock.unlock();
147                        }
148                    }
149                });
150            }
151        }
152    
153        final void addMessage(final Message message, final Location location) throws InterruptedIOException {
154            ReferenceData data = new ReferenceData();
155            data.setExpiration(message.getExpiration());
156            data.setFileId(location.getDataFileId());
157            data.setOffset(location.getOffset());
158            lock.lock();
159            try {
160                lastLocation = location;
161                messages.put(message.getMessageId(), data);
162            } finally {
163                lock.unlock();
164            }
165            if (messages.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
166                flush();
167            } else {
168                try {
169                    asyncWriteTask.wakeup();
170                } catch (InterruptedException e) {
171                    throw new InterruptedIOException();
172                }
173            }
174        }
175    
176        public boolean replayAddMessage(ConnectionContext context, Message message, Location location) {
177            MessageId id = message.getMessageId();
178            try {
179                // Only add the message if it has not already been added.
180                ReferenceData data = referenceStore.getMessageReference(id);
181                if (data == null) {
182                    data = new ReferenceData();
183                    data.setExpiration(message.getExpiration());
184                    data.setFileId(location.getDataFileId());
185                    data.setOffset(location.getOffset());
186                    referenceStore.addMessageReference(context, id, data);
187                    return true;
188                }
189            } catch (Throwable e) {
190                LOG.warn("Could not replay add for message '" + id + "'.  Message may have already been added. reason: "
191                        + e, e);
192            }
193            return false;
194        }
195    
196        /**
197         */
198        public void removeMessage(final ConnectionContext context, final MessageAck ack) throws IOException {
199            JournalQueueAck remove = new JournalQueueAck();
200            remove.setDestination(destination);
201            remove.setMessageAck(ack);
202            final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
203            if (!context.isInTransaction()) {
204                if (debug) {
205                    LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
206                }
207                removeMessage(ack, location);
208            } else {
209                if (debug) {
210                    LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
211                }
212                lock.lock();
213                try {
214                    inFlightTxLocations.add(location);
215                } finally {
216                    lock.unlock();
217                }
218                transactionStore.removeMessage(this, ack, location);
219                context.getTransaction().addSynchronization(new Synchronization() {
220                    public void afterCommit() throws Exception {
221                        if (debug) {
222                            LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: "
223                                    + location);
224                        }
225                        lock.lock();
226                        try {
227                            inFlightTxLocations.remove(location);
228                        } finally {
229                            lock.unlock();
230                        }
231                        removeMessage(ack, location);
232                    }
233    
234                    public void afterRollback() throws Exception {
235                        if (debug) {
236                            LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: "
237                                    + location);
238                        }
239                        lock.lock();
240                        try {
241                            inFlightTxLocations.remove(location);
242                        } finally {
243                            lock.unlock();
244                        }
245                    }
246                });
247            }
248        }
249    
250        final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException {
251            ReferenceData data;
252            lock.lock();
253            try {
254                lastLocation = location;
255                MessageId id = ack.getLastMessageId();
256                data = messages.remove(id);
257                if (data == null) {
258                    messageAcks.add(ack);
259                } else {
260                    // message never got written so datafileReference will still exist
261                    AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId());
262                }
263            } finally {
264                lock.unlock();
265            }
266            if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
267                flush();
268            } else if (data == null) {
269                try {
270                    asyncWriteTask.wakeup();
271                } catch (InterruptedException e) {
272                    throw new InterruptedIOException();
273                }
274            }
275        }
276    
277        public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
278            try {
279                // Only remove the message if it has not already been removed.
280                ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId());
281                if (t != null) {
282                    referenceStore.removeMessage(context, messageAck);
283                    return true;
284                }
285            } catch (Throwable e) {
286                LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId()
287                        + "'.  Message may have already been acknowledged. reason: " + e);
288            }
289            return false;
290        }
291    
292        /**
293         * Waits till the lastest data has landed on the referenceStore
294         * 
295         * @throws InterruptedIOException
296         */
297        public void flush() throws InterruptedIOException {
298            if (LOG.isDebugEnabled()) {
299                LOG.debug("flush starting ...");
300            }
301            CountDownLatch countDown;
302            lock.lock();
303            try {
304                if (lastWrittenLocation == lastLocation) {
305                    return;
306                }
307                if (flushLatch == null) {
308                    flushLatch = new CountDownLatch(1);
309                }
310                countDown = flushLatch;
311            } finally {
312                lock.unlock();
313            }
314            try {
315                asyncWriteTask.wakeup();
316                countDown.await();
317            } catch (InterruptedException e) {
318                throw new InterruptedIOException();
319            }
320            if (LOG.isDebugEnabled()) {
321                LOG.debug("flush finished");
322            }
323        }
324    
325        /**
326         * @return
327         * @throws IOException
328         */
329        synchronized void asyncWrite() {
330            try {
331                CountDownLatch countDown;
332                lock.lock();
333                try {
334                    countDown = flushLatch;
335                    flushLatch = null;
336                } finally {
337                    lock.unlock();
338                }
339                mark.set(doAsyncWrite());
340                if (countDown != null) {
341                    countDown.countDown();
342                }
343            } catch (IOException e) {
344                LOG.error("Checkpoint failed: " + e, e);
345            }
346        }
347    
348        /**
349         * @return
350         * @throws IOException
351         */
352        protected Location doAsyncWrite() throws IOException {
353            final List<MessageAck> cpRemovedMessageLocations;
354            final List<Location> cpActiveJournalLocations;
355            final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
356            final Location lastLocation;
357            // swap out the message hash maps..
358            lock.lock();
359            try {
360                cpAddedMessageIds = this.messages;
361                cpRemovedMessageLocations = this.messageAcks;
362                cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations);
363                this.messages = new LinkedHashMap<MessageId, ReferenceData>();
364                this.messageAcks = new ArrayList<MessageAck>();
365                lastLocation = this.lastLocation;
366            } finally {
367                lock.unlock();
368            }
369            if (LOG.isDebugEnabled()) {
370                LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: "
371                        + cpRemovedMessageLocations.size() + " ");
372            }
373            transactionTemplate.run(new Callback() {
374                public void execute() throws Exception {
375                    int size = 0;
376                    PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
377                    ConnectionContext context = transactionTemplate.getContext();
378                    // Checkpoint the added messages.
379                    Iterator<Entry<MessageId, ReferenceData>> iterator = cpAddedMessageIds.entrySet().iterator();
380                    while (iterator.hasNext()) {
381                        Entry<MessageId, ReferenceData> entry = iterator.next();
382                        try {
383                            if (referenceStore.addMessageReference(context, entry.getKey(), entry.getValue())) {
384                                if (LOG.isDebugEnabled()) {
385                                    LOG.debug("adding message ref:" + entry.getKey());
386                                }
387                                size++;
388                            } else {
389                                if (LOG.isDebugEnabled()) {
390                                    LOG.debug("not adding duplicate reference: " + entry.getKey() + ", " + entry.getValue());
391                                }
392                            }
393                            AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, entry
394                                    .getValue().getFileId());
395                        } catch (Throwable e) {
396                            LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
397                        }
398                        
399                        // Commit the batch if it's getting too big
400                        if (size >= maxCheckpointMessageAddSize) {
401                            persitanceAdapter.commitTransaction(context);
402                            persitanceAdapter.beginTransaction(context);
403                            size = 0;
404                        }
405                    }
406                    persitanceAdapter.commitTransaction(context);
407                    persitanceAdapter.beginTransaction(context);
408                    // Checkpoint the removed messages.
409                    for (MessageAck ack : cpRemovedMessageLocations) {
410                        try {
411                            referenceStore.removeMessage(transactionTemplate.getContext(), ack);
412                        } catch (Throwable e) {
413                            LOG.warn("Message could not be removed from long term store: " + e.getMessage(), e);
414                        }
415                    }
416                }
417            });
418            LOG.debug("Batch update done.");
419            lock.lock();
420            try {
421                cpAddedMessageIds = null;
422                lastWrittenLocation = lastLocation;
423            } finally {
424                lock.unlock();
425            }
426            if (cpActiveJournalLocations.size() > 0) {
427                Collections.sort(cpActiveJournalLocations);
428                return cpActiveJournalLocations.get(0);
429            } else {
430                return lastLocation;
431            }
432        }
433    
434        /**
435         * 
436         */
437        public Message getMessage(MessageId identity) throws IOException {
438            Location location = getLocation(identity);
439            if (location != null) {
440                DataStructure rc = peristenceAdapter.readCommand(location);
441                try {
442                    return (Message) rc;
443                } catch (ClassCastException e) {
444                    throw new IOException("Could not read message " + identity + " at location " + location
445                            + ", expected a message, but got: " + rc);
446                }
447            }
448            return null;
449        }
450    
451        protected Location getLocation(MessageId messageId) throws IOException {
452            ReferenceData data = null;
453            lock.lock();
454            try {
455                // Is it still in flight???
456                data = messages.get(messageId);
457                if (data == null && cpAddedMessageIds != null) {
458                    data = cpAddedMessageIds.get(messageId);
459                }
460            } finally {
461                lock.unlock();
462            }
463            if (data == null) {
464                data = referenceStore.getMessageReference(messageId);
465                if (data == null) {
466                    return null;
467                }
468            }
469            Location location = new Location();
470            location.setDataFileId(data.getFileId());
471            location.setOffset(data.getOffset());
472            return location;
473        }
474    
475        /**
476         * Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the
477         * transaction log and then the cache is updated.
478         * 
479         * @param listener
480         * @throws Exception
481         */
482        public void recover(final MessageRecoveryListener listener) throws Exception {
483            flush();
484            referenceStore.recover(new RecoveryListenerAdapter(this, listener));
485        }
486    
487        public void start() throws Exception {
488            referenceStore.start();
489        }
490    
491        public void stop() throws Exception {
492            flush();
493            asyncWriteTask.shutdown();
494            referenceStore.stop();
495        }
496    
497        /**
498         * @return Returns the longTermStore.
499         */
500        public ReferenceStore getReferenceStore() {
501            return referenceStore;
502        }
503    
504        /**
505         * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
506         */
507        public void removeAllMessages(ConnectionContext context) throws IOException {
508            flush();
509            referenceStore.removeAllMessages(context);
510        }
511    
512        public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime,
513                String messageRef) throws IOException {
514            throw new IOException("The journal does not support message references.");
515        }
516    
517        public String getMessageReference(MessageId identity) throws IOException {
518            throw new IOException("The journal does not support message references.");
519        }
520    
521        /**
522         * @return
523         * @throws IOException
524         * @see org.apache.activemq.store.MessageStore#getMessageCount()
525         */
526        public int getMessageCount() throws IOException {
527            flush();
528            return referenceStore.getMessageCount();
529        }
530    
531        public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
532            RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
533            referenceStore.recoverNextMessages(maxReturned, recoveryListener);
534            if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
535                flush();
536                referenceStore.recoverNextMessages(maxReturned, recoveryListener);
537            }
538        }
539    
540        Message getMessage(ReferenceData data) throws IOException {
541            Location location = new Location();
542            location.setDataFileId(data.getFileId());
543            location.setOffset(data.getOffset());
544            DataStructure rc = peristenceAdapter.readCommand(location);
545            try {
546                return (Message) rc;
547            } catch (ClassCastException e) {
548                throw new IOException("Could not read message  at location " + location + ", expected a message, but got: "
549                        + rc);
550            }
551        }
552    
553        public void resetBatching() {
554            referenceStore.resetBatching();
555        }
556    
557        public Location getMark() {
558            return mark.get();
559        }
560    
561        public void dispose(ConnectionContext context) {
562            try {
563                flush();
564            } catch (InterruptedIOException e) {
565                Thread.currentThread().interrupt();
566            }
567            referenceStore.dispose(context);
568            super.dispose(context);
569        }
570    
571        public void setBatch(MessageId messageId) {
572            try {
573                flush();
574            } catch (InterruptedIOException e) {
575                LOG.debug("flush on setBatch resulted in exception", e);
576            }
577            getReferenceStore().setBatch(messageId);
578        }
579        
580    }