001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.File;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.OutputStream;
025    import java.util.ArrayList;
026    import java.util.Collection;
027    import java.util.Date;
028    import java.util.HashMap;
029    import java.util.HashSet;
030    import java.util.Iterator;
031    import java.util.LinkedHashMap;
032    import java.util.List;
033    import java.util.SortedSet;
034    import java.util.TreeMap;
035    import java.util.TreeSet;
036    import java.util.Map.Entry;
037    import java.util.concurrent.atomic.AtomicBoolean;
038    
039    import org.apache.activemq.broker.BrokerService;
040    import org.apache.activemq.broker.BrokerServiceAware;
041    import org.apache.activemq.command.ConnectionId;
042    import org.apache.activemq.command.LocalTransactionId;
043    import org.apache.activemq.command.SubscriptionInfo;
044    import org.apache.activemq.command.TransactionId;
045    import org.apache.activemq.command.XATransactionId;
046    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
047    import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
048    import org.apache.activemq.store.kahadb.data.KahaDestination;
049    import org.apache.activemq.store.kahadb.data.KahaEntryType;
050    import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
051    import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
052    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
053    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
054    import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
055    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
056    import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
057    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
058    import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
059    import org.apache.activemq.util.Callback;
060    import org.apache.commons.logging.Log;
061    import org.apache.commons.logging.LogFactory;
062    import org.apache.kahadb.index.BTreeIndex;
063    import org.apache.kahadb.index.BTreeVisitor;
064    import org.apache.kahadb.journal.DataFile;
065    import org.apache.kahadb.journal.Journal;
066    import org.apache.kahadb.journal.Location;
067    import org.apache.kahadb.page.Page;
068    import org.apache.kahadb.page.PageFile;
069    import org.apache.kahadb.page.Transaction;
070    import org.apache.kahadb.util.ByteSequence;
071    import org.apache.kahadb.util.DataByteArrayInputStream;
072    import org.apache.kahadb.util.DataByteArrayOutputStream;
073    import org.apache.kahadb.util.LockFile;
074    import org.apache.kahadb.util.LongMarshaller;
075    import org.apache.kahadb.util.Marshaller;
076    import org.apache.kahadb.util.Sequence;
077    import org.apache.kahadb.util.SequenceSet;
078    import org.apache.kahadb.util.StringMarshaller;
079    import org.apache.kahadb.util.VariableMarshaller;
080    import org.springframework.core.enums.LetterCodedLabeledEnum;
081    
082    public class MessageDatabase implements BrokerServiceAware {
083            
084            private BrokerService brokerService;
085    
086        public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
087        public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500"));
088    
089        private static final Log LOG = LogFactory.getLog(MessageDatabase.class);
090        private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
091    
092        public static final int CLOSED_STATE = 1;
093        public static final int OPEN_STATE = 2;
094    
095    
096        protected class Metadata {
097            protected Page<Metadata> page;
098            protected int state;
099            protected BTreeIndex<String, StoredDestination> destinations;
100            protected Location lastUpdate;
101            protected Location firstInProgressTransactionLocation;
102    
103            public void read(DataInput is) throws IOException {
104                state = is.readInt();
105                destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
106                if (is.readBoolean()) {
107                    lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
108                } else {
109                    lastUpdate = null;
110                }
111                if (is.readBoolean()) {
112                    firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
113                } else {
114                    firstInProgressTransactionLocation = null;
115                }
116            }
117    
118            public void write(DataOutput os) throws IOException {
119                os.writeInt(state);
120                os.writeLong(destinations.getPageId());
121    
122                if (lastUpdate != null) {
123                    os.writeBoolean(true);
124                    LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
125                } else {
126                    os.writeBoolean(false);
127                }
128    
129                if (firstInProgressTransactionLocation != null) {
130                    os.writeBoolean(true);
131                    LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
132                } else {
133                    os.writeBoolean(false);
134                }
135            }
136        }
137    
138        class MetadataMarshaller extends VariableMarshaller<Metadata> {
139            public Metadata readPayload(DataInput dataIn) throws IOException {
140                Metadata rc = new Metadata();
141                rc.read(dataIn);
142                return rc;
143            }
144    
145            public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
146                object.write(dataOut);
147            }
148        }
149    
150        protected PageFile pageFile;
151            protected Journal journal;
152        protected Metadata metadata = new Metadata();
153    
154        protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
155    
156        protected boolean failIfDatabaseIsLocked;
157    
158        protected boolean deleteAllMessages;
159        protected File directory;
160        protected Thread checkpointThread;
161        protected boolean enableJournalDiskSyncs=true;
162        long checkpointInterval = 5*1000;
163        long cleanupInterval = 30*1000;
164        int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
165        int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
166        boolean enableIndexWriteAsync = false;
167        int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 
168        
169        protected AtomicBoolean started = new AtomicBoolean();
170        protected AtomicBoolean opened = new AtomicBoolean();
171        private LockFile lockFile;
172        private boolean ignoreMissingJournalfiles = false;
173        private int indexCacheSize = 100;
174        private boolean checkForCorruptJournalFiles = false;
175        private boolean checksumJournalFiles = false;
176    
177        public MessageDatabase() {
178        }
179    
180        public void start() throws Exception {
181            if (started.compareAndSet(false, true)) {
182                    load();
183            }
184        }
185    
186        public void stop() throws Exception {
187            if (started.compareAndSet(true, false)) {
188                unload();
189            }
190        }
191    
192            private void loadPageFile() throws IOException {
193                    synchronized (indexMutex) {
194                        final PageFile pageFile = getPageFile();
195                pageFile.load();
196                pageFile.tx().execute(new Transaction.Closure<IOException>() {
197                    public void execute(Transaction tx) throws IOException {
198                        if (pageFile.getPageCount() == 0) {
199                            // First time this is created.. Initialize the metadata
200                            Page<Metadata> page = tx.allocate();
201                            assert page.getPageId() == 0;
202                            page.set(metadata);
203                            metadata.page = page;
204                            metadata.state = CLOSED_STATE;
205                            metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
206    
207                            tx.store(metadata.page, metadataMarshaller, true);
208                        } else {
209                            Page<Metadata> page = tx.load(0, metadataMarshaller);
210                            metadata = page.get();
211                            metadata.page = page;
212                        }
213                        metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
214                        metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
215                        metadata.destinations.load(tx);
216                    }
217                });
218                pageFile.flush();
219                
220                // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
221                // Perhaps we should just keep an index of file
222                storedDestinations.clear();
223                pageFile.tx().execute(new Transaction.Closure<IOException>() {
224                    public void execute(Transaction tx) throws IOException {
225                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
226                            Entry<String, StoredDestination> entry = iterator.next();
227                            StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
228                            storedDestinations.put(entry.getKey(), sd);
229                        }
230                    }
231                });
232            }
233            }
234            
235            private void startCheckpoint() {
236            checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
237                public void run() {
238                    try {
239                        long lastCleanup = System.currentTimeMillis();
240                        long lastCheckpoint = System.currentTimeMillis();
241                        // Sleep for a short time so we can periodically check 
242                        // to see if we need to exit this thread.
243                        long sleepTime = Math.min(checkpointInterval, 500);
244                        while (opened.get()) {
245                            
246                            Thread.sleep(sleepTime);
247                            long now = System.currentTimeMillis();
248                            if( now - lastCleanup >= cleanupInterval ) {
249                                checkpointCleanup(true);
250                                lastCleanup = now;
251                                lastCheckpoint = now;
252                            } else if( now - lastCheckpoint >= checkpointInterval ) {
253                                checkpointCleanup(false);
254                                lastCheckpoint = now;
255                            }
256                        }
257                    } catch (InterruptedException e) {
258                        // Looks like someone really wants us to exit this thread...
259                    } catch (IOException ioe) {
260                        LOG.error("Checkpoint failed", ioe);
261                        brokerService.handleIOException(ioe);
262                    }
263                }
264                        
265            };
266            checkpointThread.setDaemon(true);
267            checkpointThread.start();
268            }
269            
270            /**
271             * @throws IOException
272             */
273            public void open() throws IOException {
274                    if( opened.compareAndSet(false, true) ) {
275                getJournal().start();
276                
277                    loadPageFile();
278                    
279                    startCheckpoint();
280                recover();
281                    }
282            }
283    
284        private void lock() throws IOException {
285            if( lockFile == null ) {
286                File lockFileName = new File(directory, "lock");
287                lockFile = new LockFile(lockFileName, true);
288                if (failIfDatabaseIsLocked) {
289                    lockFile.lock();
290                } else {
291                    while (true) {
292                        try {
293                            lockFile.lock();
294                            break;
295                        } catch (IOException e) {
296                            LOG.info("Database "+lockFileName+" is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked. Reason: " + e);
297                            try {
298                                Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
299                            } catch (InterruptedException e1) {
300                            }
301                        }
302                    }
303                }
304            }
305        }
306    
307        public void load() throws IOException {
308            
309            synchronized (indexMutex) {
310                lock();
311                if (deleteAllMessages) {
312                    getJournal().start();
313                    getJournal().delete();
314                    getJournal().close();
315                    journal = null;
316                    getPageFile().delete();
317                    LOG.info("Persistence store purged.");
318                    deleteAllMessages = false;
319                }
320    
321                    open();
322                    store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
323    
324            }
325    
326        }
327    
328        
329            public void close() throws IOException, InterruptedException {
330                    if( opened.compareAndSet(true, false)) {
331                    synchronized (indexMutex) {
332                        pageFile.unload();
333                        metadata = new Metadata();
334                    }
335                    journal.close();
336                    checkpointThread.join();
337                    lockFile.unlock();
338                    lockFile=null;
339                    }
340            }
341            
342        public void unload() throws IOException, InterruptedException {
343            synchronized (indexMutex) {
344                if( pageFile != null && pageFile.isLoaded() ) {
345                    metadata.state = CLOSED_STATE;
346                    metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
347        
348                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
349                        public void execute(Transaction tx) throws IOException {
350                            tx.store(metadata.page, metadataMarshaller, true);
351                        }
352                    });
353                }
354            }
355            close();
356        }
357    
358        /**
359         * @return
360         */
361        private Location getFirstInProgressTxLocation() {
362            Location l = null;
363            if (!inflightTransactions.isEmpty()) {
364                l = inflightTransactions.values().iterator().next().get(0).getLocation();
365            }
366            if (!preparedTransactions.isEmpty()) {
367                Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
368                if (l==null || t.compareTo(l) <= 0) {
369                    l = t;
370                }
371            }
372            return l;
373        }
374    
375        /**
376         * Move all the messages that were in the journal into long term storage. We
377         * just replay and do a checkpoint.
378         * 
379         * @throws IOException
380         * @throws IOException
381         * @throws IllegalStateException
382         */
383        private void recover() throws IllegalStateException, IOException {
384            synchronized (indexMutex) {
385                    long start = System.currentTimeMillis();
386                    
387                    Location recoveryPosition = getRecoveryPosition();
388                    if( recoveryPosition!=null ) {
389                            int redoCounter = 0;
390                            while (recoveryPosition != null) {
391                                JournalCommand message = load(recoveryPosition);
392                                metadata.lastUpdate = recoveryPosition;
393                                process(message, recoveryPosition);
394                                redoCounter++;
395                                recoveryPosition = journal.getNextLocation(recoveryPosition);
396                            }
397                            long end = System.currentTimeMillis();
398                            LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
399                    }
400                 
401                    // We may have to undo some index updates.
402                pageFile.tx().execute(new Transaction.Closure<IOException>() {
403                    public void execute(Transaction tx) throws IOException {
404                        recoverIndex(tx);
405                    }
406                });
407            }
408        }
409        
410            protected void recoverIndex(Transaction tx) throws IOException {
411            long start = System.currentTimeMillis();
412            // It is possible index updates got applied before the journal updates.. 
413            // in that case we need to removed references to messages that are not in the journal
414            final Location lastAppendLocation = journal.getLastAppendLocation();
415            long undoCounter=0;
416            
417            // Go through all the destinations to see if they have messages past the lastAppendLocation
418            for (StoredDestination sd : storedDestinations.values()) {
419                    
420                final ArrayList<Long> matches = new ArrayList<Long>();
421                // Find all the Locations that are >= than the last Append Location.
422                sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
423                                    @Override
424                                    protected void matched(Location key, Long value) {
425                                            matches.add(value);
426                                    }
427                });
428                
429                
430                for (Long sequenceId : matches) {
431                    MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
432                    sd.locationIndex.remove(tx, keys.location);
433                    sd.messageIdIndex.remove(tx, keys.messageId);
434                    undoCounter++;
435                    // TODO: do we need to modify the ack positions for the pub sub case?
436                            }
437            }
438    
439            long end = System.currentTimeMillis();
440            if( undoCounter > 0 ) {
441                    // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
442                    // should do sync writes to the journal.
443                    LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
444            }
445    
446            undoCounter = 0;
447            start = System.currentTimeMillis();
448    
449            // Lets be extra paranoid here and verify that all the datafiles being referenced
450            // by the indexes still exists.
451    
452            final SequenceSet ss = new SequenceSet();
453            for (StoredDestination sd : storedDestinations.values()) {
454                // Use a visitor to cut down the number of pages that we load
455                sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
456                    int last=-1;
457    
458                    public boolean isInterestedInKeysBetween(Location first, Location second) {
459                        if( first==null ) {
460                            return !ss.contains(0, second.getDataFileId());
461                        } else if( second==null ) {
462                            return true;
463                        } else {
464                            return !ss.contains(first.getDataFileId(), second.getDataFileId());
465                        }
466                    }
467    
468                    public void visit(List<Location> keys, List<Long> values) {
469                        for (Location l : keys) {
470                            int fileId = l.getDataFileId();
471                            if( last != fileId ) {
472                                ss.add(fileId);
473                                last = fileId;
474                            }
475                        }
476                    }
477    
478                });
479            }
480            HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
481            while( !ss.isEmpty() ) {
482                missingJournalFiles.add( (int)ss.removeFirst() );
483            }
484            missingJournalFiles.removeAll( journal.getFileMap().keySet() );
485    
486            if( !missingJournalFiles.isEmpty() ) {
487                LOG.info("Some journal files are missing: "+missingJournalFiles);
488            }
489    
490            ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
491            for (Integer missing : missingJournalFiles) {
492                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0)));
493            }
494    
495            if ( checkForCorruptJournalFiles ) {
496                Collection<DataFile> dataFiles = journal.getFileMap().values();
497                for (DataFile dataFile : dataFiles) {
498                    int id = dataFile.getDataFileId();
499                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id,dataFile.getLength()), new Location(id+1,0)));
500                    Sequence seq = dataFile.getCorruptedBlocks().getHead();
501                    while( seq!=null ) {
502                        missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1)));
503                        seq = seq.getNext();
504                    }
505                }
506            }
507    
508            if( !missingPredicates.isEmpty() ) {
509                for (StoredDestination sd : storedDestinations.values()) {
510    
511                    final ArrayList<Long> matches = new ArrayList<Long>();
512                    sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
513                        protected void matched(Location key, Long value) {
514                            matches.add(value);
515                        }
516                    });
517    
518                    // If somes message references are affected by the missing data files...
519                    if( !matches.isEmpty() ) {
520    
521                        // We either 'gracefully' recover dropping the missing messages or
522                        // we error out.
523                        if( ignoreMissingJournalfiles ) {
524                            // Update the index to remove the references to the missing data
525                            for (Long sequenceId : matches) {
526                                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
527                                sd.locationIndex.remove(tx, keys.location);
528                                sd.messageIdIndex.remove(tx, keys.messageId);
529                                undoCounter++;
530                                // TODO: do we need to modify the ack positions for the pub sub case?
531                            }
532    
533                        } else {
534                            throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
535                        }
536                    }
537                }
538            }
539            
540            end = System.currentTimeMillis();
541            if( undoCounter > 0 ) {
542                    // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
543                    // should do sync writes to the journal.
544                    LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
545            }
546            }
547    
548            private Location nextRecoveryPosition;
549            private Location lastRecoveryPosition;
550    
551            public void incrementalRecover() throws IOException {
552            synchronized (indexMutex) {
553                    if( nextRecoveryPosition == null ) {
554                            if( lastRecoveryPosition==null ) {
555                                    nextRecoveryPosition = getRecoveryPosition();
556                            } else {
557                            nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
558                            }               
559                    }
560                    while (nextRecoveryPosition != null) {
561                            lastRecoveryPosition = nextRecoveryPosition;
562                        metadata.lastUpdate = lastRecoveryPosition;
563                        JournalCommand message = load(lastRecoveryPosition);
564                        process(message, lastRecoveryPosition);            
565                        nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
566                    }
567            }
568            }
569            
570        public Location getLastUpdatePosition() throws IOException {
571            return metadata.lastUpdate;
572        }
573        
574            private Location getRecoveryPosition() throws IOException {
575                    
576            // If we need to recover the transactions..
577            if (metadata.firstInProgressTransactionLocation != null) {
578                return metadata.firstInProgressTransactionLocation;
579            }
580            
581            // Perhaps there were no transactions...
582            if( metadata.lastUpdate!=null) {
583                // Start replay at the record after the last one recorded in the index file.
584                return journal.getNextLocation(metadata.lastUpdate);
585            }
586            
587            // This loads the first position.
588            return journal.getNextLocation(null);
589            }
590    
591        protected void checkpointCleanup(final boolean cleanup) throws IOException {
592            long start = System.currentTimeMillis();
593            synchronized (indexMutex) {
594                    if( !opened.get() ) {
595                            return;
596                    }
597                pageFile.tx().execute(new Transaction.Closure<IOException>() {
598                    public void execute(Transaction tx) throws IOException {
599                        checkpointUpdate(tx, cleanup);
600                    }
601                });
602            }
603            long end = System.currentTimeMillis();
604            if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
605                    LOG.info("Slow KahaDB access: cleanup took "+(end-start));
606            }
607        }
608    
609        
610            public void checkpoint(Callback closure) throws Exception {
611            synchronized (indexMutex) {
612                pageFile.tx().execute(new Transaction.Closure<IOException>() {
613                    public void execute(Transaction tx) throws IOException {
614                        checkpointUpdate(tx, false);
615                    }
616                });
617                closure.execute();
618            }
619            }
620    
621        // /////////////////////////////////////////////////////////////////
622        // Methods call by the broker to update and query the store.
623        // /////////////////////////////////////////////////////////////////
624        public Location store(JournalCommand data) throws IOException {
625            return store(data, false);
626        }
627    
628        /**
629         * All updated are are funneled through this method. The updates are converted
630         * to a JournalMessage which is logged to the journal and then the data from
631         * the JournalMessage is used to update the index just like it would be done
632         * during a recovery process.
633         */
634        public Location store(JournalCommand data, boolean sync) throws IOException {
635            try {
636                int size = data.serializedSizeFramed();
637                DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
638                os.writeByte(data.type().getNumber());
639                data.writeFramed(os);
640        
641                long start = System.currentTimeMillis();
642                Location location = journal.write(os.toByteSequence(), sync);
643                long start2 = System.currentTimeMillis();
644                process(data, location);
645                    long end = System.currentTimeMillis();
646                    if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
647                            LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
648                    }
649        
650                synchronized (indexMutex) {
651                    metadata.lastUpdate = location;
652                }
653                if (!checkpointThread.isAlive()) {
654                    LOG.info("KahaDB: Recovering checkpoint thread after exception");
655                    startCheckpoint();
656                }
657                return location;
658            } catch (IOException ioe) {
659                LOG.error("KahaDB failed to store to Journal", ioe);
660                brokerService.handleIOException(ioe);
661                throw ioe;
662            }
663        }
664    
665        /**
666         * Loads a previously stored JournalMessage
667         * 
668         * @param location
669         * @return
670         * @throws IOException
671         */
672        public JournalCommand load(Location location) throws IOException {
673            ByteSequence data = journal.read(location);
674            DataByteArrayInputStream is = new DataByteArrayInputStream(data);
675            byte readByte = is.readByte();
676            KahaEntryType type = KahaEntryType.valueOf(readByte);
677            if( type == null ) {
678                throw new IOException("Could not load journal record. Invalid location: "+location);
679            }
680            JournalCommand message = (JournalCommand)type.createMessage();
681            message.mergeFramed(is);
682            return message;
683        }
684    
685        // /////////////////////////////////////////////////////////////////
686        // Journaled record processing methods. Once the record is journaled,
687        // these methods handle applying the index updates. These may be called
688        // from the recovery method too so they need to be idempotent
689        // /////////////////////////////////////////////////////////////////
690    
691        private void process(JournalCommand data, final Location location) throws IOException {
692            data.visit(new Visitor() {
693                @Override
694                public void visit(KahaAddMessageCommand command) throws IOException {
695                    process(command, location);
696                }
697    
698                @Override
699                public void visit(KahaRemoveMessageCommand command) throws IOException {
700                    process(command, location);
701                }
702    
703                @Override
704                public void visit(KahaPrepareCommand command) throws IOException {
705                    process(command, location);
706                }
707    
708                @Override
709                public void visit(KahaCommitCommand command) throws IOException {
710                    process(command, location);
711                }
712    
713                @Override
714                public void visit(KahaRollbackCommand command) throws IOException {
715                    process(command, location);
716                }
717    
718                @Override
719                public void visit(KahaRemoveDestinationCommand command) throws IOException {
720                    process(command, location);
721                }
722    
723                @Override
724                public void visit(KahaSubscriptionCommand command) throws IOException {
725                    process(command, location);
726                }
727            });
728        }
729    
730        private void process(final KahaAddMessageCommand command, final Location location) throws IOException {
731            if (command.hasTransactionInfo()) {
732                synchronized (indexMutex) {
733                    ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
734                    inflightTx.add(new AddOpperation(command, location));
735                }
736            } else {
737                synchronized (indexMutex) {
738                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
739                        public void execute(Transaction tx) throws IOException {
740                            upadateIndex(tx, command, location);
741                        }
742                    });
743                }
744            }
745        }
746    
747        protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
748            if (command.hasTransactionInfo()) {
749                synchronized (indexMutex) {
750                    ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
751                    inflightTx.add(new RemoveOpperation(command, location));
752                }
753            } else {
754                synchronized (indexMutex) {
755                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
756                        public void execute(Transaction tx) throws IOException {
757                            updateIndex(tx, command, location);
758                        }
759                    });
760                }
761            }
762    
763        }
764    
765        protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
766            synchronized (indexMutex) {
767                pageFile.tx().execute(new Transaction.Closure<IOException>() {
768                    public void execute(Transaction tx) throws IOException {
769                        updateIndex(tx, command, location);
770                    }
771                });
772            }
773        }
774    
775        protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
776            synchronized (indexMutex) {
777                pageFile.tx().execute(new Transaction.Closure<IOException>() {
778                    public void execute(Transaction tx) throws IOException {
779                        updateIndex(tx, command, location);
780                    }
781                });
782            }
783        }
784    
785        protected void process(KahaCommitCommand command, Location location) throws IOException {
786            TransactionId key = key(command.getTransactionInfo());
787            synchronized (indexMutex) {
788                ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
789                if (inflightTx == null) {
790                    inflightTx = preparedTransactions.remove(key);
791                }
792                if (inflightTx == null) {
793                    return;
794                }
795    
796                final ArrayList<Operation> messagingTx = inflightTx;
797                pageFile.tx().execute(new Transaction.Closure<IOException>() {
798                    public void execute(Transaction tx) throws IOException {
799                        for (Operation op : messagingTx) {
800                            op.execute(tx);
801                        }
802                    }
803                });
804            }
805        }
806    
807        protected void process(KahaPrepareCommand command, Location location) {
808            synchronized (indexMutex) {
809                TransactionId key = key(command.getTransactionInfo());
810                ArrayList<Operation> tx = inflightTransactions.remove(key);
811                if (tx != null) {
812                    preparedTransactions.put(key, tx);
813                }
814            }
815        }
816    
817        protected void process(KahaRollbackCommand command, Location location) {
818            synchronized (indexMutex) {
819                TransactionId key = key(command.getTransactionInfo());
820                ArrayList<Operation> tx = inflightTransactions.remove(key);
821                if (tx == null) {
822                    preparedTransactions.remove(key);
823                }
824            }
825        }
826    
827        // /////////////////////////////////////////////////////////////////
828        // These methods do the actual index updates.
829        // /////////////////////////////////////////////////////////////////
830    
831        protected final Object indexMutex = new Object();
832            private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
833    
834        private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
835            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
836    
837            // Skip adding the message to the index if this is a topic and there are
838            // no subscriptions.
839            if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
840                return;
841            }
842    
843            // Add the message.
844            long id = sd.nextMessageId++;
845            Long previous = sd.locationIndex.put(tx, location, id);
846            if( previous == null ) {
847                previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
848                if( previous == null ) {
849                    sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
850                } else {
851                    // If the message ID as indexed, then the broker asked us to store a DUP
852                    // message.  Bad BOY!  Don't do it, and log a warning.
853                    LOG.warn("Duplicate message add attempt rejected. Message id: "+command.getMessageId());
854                    // TODO: consider just rolling back the tx.
855                    sd.messageIdIndex.put(tx, command.getMessageId(), previous);
856                }
857            } else {
858                // restore the previous value.. Looks like this was a redo of a previously
859                // added message.  We don't want to assign it a new id as the other indexes would 
860                // be wrong..
861                //
862                // TODO: consider just rolling back the tx.
863                sd.locationIndex.put(tx, location, previous);
864            }
865            
866        }
867    
868        private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
869            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
870            if (!command.hasSubscriptionKey()) {
871                
872                // In the queue case we just remove the message from the index..
873                Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
874                if (sequenceId != null) {
875                    MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
876                    if (keys != null) {
877                        sd.locationIndex.remove(tx, keys.location);
878                    }
879                }
880            } else {
881                // In the topic case we need remove the message once it's been acked
882                // by all the subs
883                Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
884    
885                // Make sure it's a valid message id...
886                if (sequence != null) {
887                    String subscriptionKey = command.getSubscriptionKey();
888                    Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
889    
890                    // The following method handles deleting un-referenced messages.
891                    removeAckLocation(tx, sd, subscriptionKey, prev);
892    
893                    // Add it to the new location set.
894                    addAckLocation(sd, sequence, subscriptionKey);
895                }
896    
897            }
898        }
899    
900        private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
901            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
902            sd.orderIndex.clear(tx);
903            sd.orderIndex.unload(tx);
904            tx.free(sd.orderIndex.getPageId());
905            
906            sd.locationIndex.clear(tx);
907            sd.locationIndex.unload(tx);
908            tx.free(sd.locationIndex.getPageId());
909    
910            sd.messageIdIndex.clear(tx);
911            sd.messageIdIndex.unload(tx);
912            tx.free(sd.messageIdIndex.getPageId());
913    
914            if (sd.subscriptions != null) {
915                sd.subscriptions.clear(tx);
916                sd.subscriptions.unload(tx);
917                tx.free(sd.subscriptions.getPageId());
918    
919                sd.subscriptionAcks.clear(tx);
920                sd.subscriptionAcks.unload(tx);
921                tx.free(sd.subscriptionAcks.getPageId());
922            }
923    
924            String key = key(command.getDestination());
925            storedDestinations.remove(key);
926            metadata.destinations.remove(tx, key);
927        }
928    
929        private void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
930            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
931    
932            // If set then we are creating it.. otherwise we are destroying the sub
933            if (command.hasSubscriptionInfo()) {
934                String subscriptionKey = command.getSubscriptionKey();
935                sd.subscriptions.put(tx, subscriptionKey, command);
936                long ackLocation=-1;
937                if (!command.getRetroactive()) {
938                    ackLocation = sd.nextMessageId-1;
939                }
940    
941                sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
942                addAckLocation(sd, ackLocation, subscriptionKey);
943            } else {
944                // delete the sub...
945                String subscriptionKey = command.getSubscriptionKey();
946                sd.subscriptions.remove(tx, subscriptionKey);
947                Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
948                if( prev!=null ) {
949                    removeAckLocation(tx, sd, subscriptionKey, prev);
950                }
951            }
952    
953        }
954        
955        /**
956         * @param tx
957         * @throws IOException
958         */
959        private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
960    
961            LOG.debug("Checkpoint started.");
962            
963            metadata.state = OPEN_STATE;
964            metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
965            tx.store(metadata.page, metadataMarshaller, true);
966            pageFile.flush();
967    
968            if( cleanup ) {
969                    
970                    final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(journal.getFileMap().keySet());
971                    
972                    // Don't GC files under replication
973                    if( journalFilesBeingReplicated!=null ) {
974                            gcCandidateSet.removeAll(journalFilesBeingReplicated);
975                    }
976                    
977                    // Don't GC files after the first in progress tx
978                    Location firstTxLocation = metadata.lastUpdate;
979                if( metadata.firstInProgressTransactionLocation!=null ) {
980                    firstTxLocation = metadata.firstInProgressTransactionLocation;
981                }
982                
983                if( firstTxLocation!=null ) {
984                    while( !gcCandidateSet.isEmpty() ) {
985                            Integer last = gcCandidateSet.last();
986                            if( last >= firstTxLocation.getDataFileId() ) {
987                                    gcCandidateSet.remove(last);
988                            } else {
989                                    break;
990                            }
991                    }
992                }
993    
994                // Go through all the destinations to see if any of them can remove GC candidates.
995                for (StoredDestination sd : storedDestinations.values()) {
996                    if( gcCandidateSet.isEmpty() ) {
997                            break;
998                    }
999                    
1000                    // Use a visitor to cut down the number of pages that we load
1001                    sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1002                        int last=-1;
1003                        public boolean isInterestedInKeysBetween(Location first, Location second) {
1004                            if( first==null ) {
1005                                    SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1006                                    if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1007                                            subset.remove(second.getDataFileId());
1008                                    }
1009                                                            return !subset.isEmpty();
1010                            } else if( second==null ) {
1011                                    SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1012                                    if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1013                                            subset.remove(first.getDataFileId());
1014                                    }
1015                                                            return !subset.isEmpty();
1016                            } else {
1017                                    SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1018                                    if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1019                                            subset.remove(first.getDataFileId());
1020                                    }
1021                                    if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1022                                            subset.remove(second.getDataFileId());
1023                                    }
1024                                                            return !subset.isEmpty();
1025                            }
1026                        }
1027        
1028                        public void visit(List<Location> keys, List<Long> values) {
1029                            for (Location l : keys) {
1030                                int fileId = l.getDataFileId();
1031                                                            if( last != fileId ) {
1032                                            gcCandidateSet.remove(fileId);
1033                                    last = fileId;
1034                                }
1035                                                    }                        
1036                        }
1037        
1038                    });
1039                }
1040    
1041                if( !gcCandidateSet.isEmpty() ) {
1042                        LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
1043                        journal.removeDataFiles(gcCandidateSet);
1044                }
1045            }
1046            
1047            LOG.debug("Checkpoint done.");
1048        }
1049        
1050        public HashSet<Integer> getJournalFilesBeingReplicated() {
1051                    return journalFilesBeingReplicated;
1052            }
1053    
1054        // /////////////////////////////////////////////////////////////////
1055        // StoredDestination related implementation methods.
1056        // /////////////////////////////////////////////////////////////////
1057    
1058    
1059            private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
1060    
1061        class StoredSubscription {
1062            SubscriptionInfo subscriptionInfo;
1063            String lastAckId;
1064            Location lastAckLocation;
1065            Location cursor;
1066        }
1067        
1068        static class MessageKeys {
1069            final String messageId;
1070            final Location location;
1071            
1072            public MessageKeys(String messageId, Location location) {
1073                this.messageId=messageId;
1074                this.location=location;
1075            }
1076            
1077            @Override
1078            public String toString() {
1079                return "["+messageId+","+location+"]";
1080            }
1081        }
1082        
1083        static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
1084            static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
1085            
1086            public MessageKeys readPayload(DataInput dataIn) throws IOException {
1087                return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
1088            }
1089    
1090            public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
1091                dataOut.writeUTF(object.messageId);
1092                LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
1093            }
1094        }
1095        
1096        static class StoredDestination {
1097            long nextMessageId;
1098            BTreeIndex<Long, MessageKeys> orderIndex;
1099            BTreeIndex<Location, Long> locationIndex;
1100            BTreeIndex<String, Long> messageIdIndex;
1101    
1102            // These bits are only set for Topics
1103            BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
1104            BTreeIndex<String, Long> subscriptionAcks;
1105            HashMap<String, Long> subscriptionCursors;
1106            TreeMap<Long, HashSet<String>> ackPositions;
1107        }
1108    
1109        protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
1110    
1111            public StoredDestination readPayload(DataInput dataIn) throws IOException {
1112                StoredDestination value = new StoredDestination();
1113                value.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1114                value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
1115                value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1116    
1117                if (dataIn.readBoolean()) {
1118                    value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
1119                    value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1120                }
1121                return value;
1122            }
1123    
1124            public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
1125                dataOut.writeLong(value.orderIndex.getPageId());
1126                dataOut.writeLong(value.locationIndex.getPageId());
1127                dataOut.writeLong(value.messageIdIndex.getPageId());
1128                if (value.subscriptions != null) {
1129                    dataOut.writeBoolean(true);
1130                    dataOut.writeLong(value.subscriptions.getPageId());
1131                    dataOut.writeLong(value.subscriptionAcks.getPageId());
1132                } else {
1133                    dataOut.writeBoolean(false);
1134                }
1135            }
1136        }
1137    
1138        static class LocationMarshaller implements Marshaller<Location> {
1139            final static LocationMarshaller INSTANCE = new LocationMarshaller();
1140    
1141            public Location readPayload(DataInput dataIn) throws IOException {
1142                Location rc = new Location();
1143                rc.setDataFileId(dataIn.readInt());
1144                rc.setOffset(dataIn.readInt());
1145                return rc;
1146            }
1147    
1148            public void writePayload(Location object, DataOutput dataOut) throws IOException {
1149                dataOut.writeInt(object.getDataFileId());
1150                dataOut.writeInt(object.getOffset());
1151            }
1152    
1153            public int getFixedSize() {
1154                return 8;
1155            }
1156    
1157            public Location deepCopy(Location source) {
1158                return new Location(source);
1159            }
1160    
1161            public boolean isDeepCopySupported() {
1162                return true;
1163            }
1164        }
1165    
1166        static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
1167            final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
1168    
1169            public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
1170                KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
1171                rc.mergeFramed((InputStream)dataIn);
1172                return rc;
1173            }
1174    
1175            public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
1176                object.writeFramed((OutputStream)dataOut);
1177            }
1178        }
1179    
1180        protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1181            String key = key(destination);
1182            StoredDestination rc = storedDestinations.get(key);
1183            if (rc == null) {
1184                boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
1185                rc = loadStoredDestination(tx, key, topic);
1186                // Cache it. We may want to remove/unload destinations from the
1187                // cache that are not used for a while
1188                // to reduce memory usage.
1189                storedDestinations.put(key, rc);
1190            }
1191            return rc;
1192        }
1193    
1194        /**
1195         * @param tx
1196         * @param key
1197         * @param topic
1198         * @return
1199         * @throws IOException
1200         */
1201        private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
1202            // Try to load the existing indexes..
1203            StoredDestination rc = metadata.destinations.get(tx, key);
1204            if (rc == null) {
1205                // Brand new destination.. allocate indexes for it.
1206                rc = new StoredDestination();
1207                rc.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1208                rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
1209                rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
1210    
1211                if (topic) {
1212                    rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
1213                    rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
1214                }
1215                metadata.destinations.put(tx, key, rc);
1216            }
1217    
1218            // Configure the marshalers and load.
1219            rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1220            rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1221            rc.orderIndex.load(tx);
1222    
1223            // Figure out the next key using the last entry in the destination.
1224            Entry<Long, MessageKeys> lastEntry = rc.orderIndex.getLast(tx);
1225            if( lastEntry!=null ) {
1226                rc.nextMessageId = lastEntry.getKey()+1;
1227            }
1228    
1229            rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
1230            rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1231            rc.locationIndex.load(tx);
1232    
1233            rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
1234            rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1235            rc.messageIdIndex.load(tx);
1236            
1237            // If it was a topic...
1238            if (topic) {
1239    
1240                rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
1241                rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
1242                rc.subscriptions.load(tx);
1243    
1244                rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
1245                rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
1246                rc.subscriptionAcks.load(tx);
1247    
1248                rc.ackPositions = new TreeMap<Long, HashSet<String>>();
1249                rc.subscriptionCursors = new HashMap<String, Long>();
1250    
1251                for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1252                    Entry<String, Long> entry = iterator.next();
1253                    addAckLocation(rc, entry.getValue(), entry.getKey());
1254                }
1255    
1256            }
1257            return rc;
1258        }
1259    
1260        /**
1261         * @param sd
1262         * @param messageSequence
1263         * @param subscriptionKey
1264         */
1265        private void addAckLocation(StoredDestination sd, Long messageSequence, String subscriptionKey) {
1266            HashSet<String> hs = sd.ackPositions.get(messageSequence);
1267            if (hs == null) {
1268                hs = new HashSet<String>();
1269                sd.ackPositions.put(messageSequence, hs);
1270            }
1271            hs.add(subscriptionKey);
1272        }
1273    
1274        /**
1275         * @param tx
1276         * @param sd
1277         * @param subscriptionKey
1278         * @param sequenceId
1279         * @throws IOException
1280         */
1281        private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
1282            // Remove the sub from the previous location set..
1283            if (sequenceId != null) {
1284                HashSet<String> hs = sd.ackPositions.get(sequenceId);
1285                if (hs != null) {
1286                    hs.remove(subscriptionKey);
1287                    if (hs.isEmpty()) {
1288                        HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
1289                        sd.ackPositions.remove(sequenceId);
1290    
1291                        // Did we just empty out the first set in the
1292                        // ordered list of ack locations? Then it's time to
1293                        // delete some messages.
1294                        if (hs == firstSet) {
1295    
1296                            // Find all the entries that need to get deleted.
1297                            ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
1298                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
1299                                Entry<Long, MessageKeys> entry = iterator.next();
1300                                if (entry.getKey().compareTo(sequenceId) <= 0) {
1301                                    // We don't do the actually delete while we are
1302                                    // iterating the BTree since
1303                                    // iterating would fail.
1304                                    deletes.add(entry);
1305                                }else {
1306                                    //no point in iterating the in-order sequences anymore
1307                                    break;
1308                                }
1309                            }
1310    
1311                            // Do the actual deletes.
1312                            for (Entry<Long, MessageKeys> entry : deletes) {
1313                                sd.locationIndex.remove(tx, entry.getValue().location);
1314                                sd.messageIdIndex.remove(tx,entry.getValue().messageId);
1315                                sd.orderIndex.remove(tx,entry.getKey());
1316                            }
1317                        }
1318                    }
1319                }
1320            }
1321        }
1322    
1323        private String key(KahaDestination destination) {
1324            return destination.getType().getNumber() + ":" + destination.getName();
1325        }
1326    
1327        // /////////////////////////////////////////////////////////////////
1328        // Transaction related implementation methods.
1329        // /////////////////////////////////////////////////////////////////
1330        protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
1331        protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
1332     
1333        private ArrayList<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
1334            TransactionId key = key(info);
1335            ArrayList<Operation> tx = inflightTransactions.get(key);
1336            if (tx == null) {
1337                tx = new ArrayList<Operation>();
1338                inflightTransactions.put(key, tx);
1339            }
1340            return tx;
1341        }
1342    
1343        private TransactionId key(KahaTransactionInfo transactionInfo) {
1344            if (transactionInfo.hasLocalTransacitonId()) {
1345                KahaLocalTransactionId tx = transactionInfo.getLocalTransacitonId();
1346                LocalTransactionId rc = new LocalTransactionId();
1347                rc.setConnectionId(new ConnectionId(tx.getConnectionId()));
1348                rc.setValue(tx.getTransacitonId());
1349                return rc;
1350            } else {
1351                KahaXATransactionId tx = transactionInfo.getXaTransacitonId();
1352                XATransactionId rc = new XATransactionId();
1353                rc.setBranchQualifier(tx.getBranchQualifier().toByteArray());
1354                rc.setGlobalTransactionId(tx.getGlobalTransactionId().toByteArray());
1355                rc.setFormatId(tx.getFormatId());
1356                return rc;
1357            }
1358        }
1359    
1360        abstract class Operation {
1361            final Location location;
1362    
1363            public Operation(Location location) {
1364                this.location = location;
1365            }
1366    
1367            public Location getLocation() {
1368                return location;
1369            }
1370    
1371            abstract public void execute(Transaction tx) throws IOException;
1372        }
1373    
1374        class AddOpperation extends Operation {
1375            final KahaAddMessageCommand command;
1376    
1377            public AddOpperation(KahaAddMessageCommand command, Location location) {
1378                super(location);
1379                this.command = command;
1380            }
1381    
1382            public void execute(Transaction tx) throws IOException {
1383                upadateIndex(tx, command, location);
1384            }
1385    
1386            public KahaAddMessageCommand getCommand() {
1387                return command;
1388            }
1389        }
1390    
1391        class RemoveOpperation extends Operation {
1392            final KahaRemoveMessageCommand command;
1393    
1394            public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
1395                super(location);
1396                this.command = command;
1397            }
1398    
1399            public void execute(Transaction tx) throws IOException {
1400                updateIndex(tx, command, location);
1401            }
1402    
1403            public KahaRemoveMessageCommand getCommand() {
1404                return command;
1405            }
1406        }
1407    
1408        // /////////////////////////////////////////////////////////////////
1409        // Initialization related implementation methods.
1410        // /////////////////////////////////////////////////////////////////
1411    
1412        private PageFile createPageFile() {
1413            PageFile index = new PageFile(directory, "db");
1414            index.setEnableWriteThread(isEnableIndexWriteAsync());
1415            index.setWriteBatchSize(getIndexWriteBatchSize());
1416            index.setPageCacheSize(indexCacheSize);
1417            return index;
1418        }
1419    
1420        private Journal createJournal() {
1421            Journal manager = new Journal();
1422            manager.setDirectory(directory);
1423            manager.setMaxFileLength(getJournalMaxFileLength());
1424            manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
1425            manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
1426            manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
1427            return manager;
1428        }
1429    
1430        public int getJournalMaxWriteBatchSize() {
1431            return journalMaxWriteBatchSize;
1432        }
1433        
1434        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
1435            this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
1436        }
1437    
1438        public File getDirectory() {
1439            return directory;
1440        }
1441    
1442        public void setDirectory(File directory) {
1443            this.directory = directory;
1444        }
1445    
1446        public boolean isDeleteAllMessages() {
1447            return deleteAllMessages;
1448        }
1449    
1450        public void setDeleteAllMessages(boolean deleteAllMessages) {
1451            this.deleteAllMessages = deleteAllMessages;
1452        }
1453        
1454        public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
1455            this.setIndexWriteBatchSize = setIndexWriteBatchSize;
1456        }
1457    
1458        public int getIndexWriteBatchSize() {
1459            return setIndexWriteBatchSize;
1460        }
1461        
1462        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
1463            this.enableIndexWriteAsync = enableIndexWriteAsync;
1464        }
1465        
1466        boolean isEnableIndexWriteAsync() {
1467            return enableIndexWriteAsync;
1468        }
1469        
1470        public boolean isEnableJournalDiskSyncs() {
1471            return enableJournalDiskSyncs;
1472        }
1473    
1474        public void setEnableJournalDiskSyncs(boolean syncWrites) {
1475            this.enableJournalDiskSyncs = syncWrites;
1476        }
1477    
1478        public long getCheckpointInterval() {
1479            return checkpointInterval;
1480        }
1481    
1482        public void setCheckpointInterval(long checkpointInterval) {
1483            this.checkpointInterval = checkpointInterval;
1484        }
1485    
1486        public long getCleanupInterval() {
1487            return cleanupInterval;
1488        }
1489    
1490        public void setCleanupInterval(long cleanupInterval) {
1491            this.cleanupInterval = cleanupInterval;
1492        }
1493    
1494        public void setJournalMaxFileLength(int journalMaxFileLength) {
1495            this.journalMaxFileLength = journalMaxFileLength;
1496        }
1497        
1498        public int getJournalMaxFileLength() {
1499            return journalMaxFileLength;
1500        }
1501        
1502        public PageFile getPageFile() {
1503            if (pageFile == null) {
1504                pageFile = createPageFile();
1505            }
1506                    return pageFile;
1507            }
1508    
1509            public Journal getJournal() {
1510            if (journal == null) {
1511                journal = createJournal();
1512            }
1513                    return journal;
1514            }
1515    
1516        public boolean isFailIfDatabaseIsLocked() {
1517            return failIfDatabaseIsLocked;
1518        }
1519    
1520        public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
1521            this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
1522        }
1523    
1524        public boolean isIgnoreMissingJournalfiles() {
1525            return ignoreMissingJournalfiles;
1526        }
1527        
1528        public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
1529            this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
1530        }
1531    
1532        public int getIndexCacheSize() {
1533            return indexCacheSize;
1534        }
1535    
1536        public void setIndexCacheSize(int indexCacheSize) {
1537            this.indexCacheSize = indexCacheSize;
1538        }
1539    
1540        public boolean isCheckForCorruptJournalFiles() {
1541            return checkForCorruptJournalFiles;
1542        }
1543    
1544        public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
1545            this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
1546        }
1547    
1548        public boolean isChecksumJournalFiles() {
1549            return checksumJournalFiles;
1550        }
1551    
1552        public void setChecksumJournalFiles(boolean checksumJournalFiles) {
1553            this.checksumJournalFiles = checksumJournalFiles;
1554        }
1555    
1556            public void setBrokerService(BrokerService brokerService) {
1557                    this.brokerService = brokerService;
1558            }
1559    }