001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.kahadb.page;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.File;
024    import java.io.FileInputStream;
025    import java.io.FileOutputStream;
026    import java.io.IOException;
027    import java.io.InterruptedIOException;
028    import java.io.RandomAccessFile;
029    import java.util.ArrayList;
030    import java.util.Arrays;
031    import java.util.Collection;
032    import java.util.Iterator;
033    import java.util.LinkedHashMap;
034    import java.util.Map;
035    import java.util.Properties;
036    import java.util.TreeMap;
037    import java.util.Map.Entry;
038    import java.util.concurrent.CountDownLatch;
039    import java.util.concurrent.atomic.AtomicBoolean;
040    import java.util.concurrent.atomic.AtomicLong;
041    import java.util.zip.Adler32;
042    import java.util.zip.Checksum;
043    
044    import org.apache.commons.logging.Log;
045    import org.apache.commons.logging.LogFactory;
046    import org.apache.kahadb.util.DataByteArrayOutputStream;
047    import org.apache.kahadb.util.IOExceptionSupport;
048    import org.apache.kahadb.util.IOHelper;
049    import org.apache.kahadb.util.IntrospectionSupport;
050    import org.apache.kahadb.util.LRUCache;
051    import org.apache.kahadb.util.Sequence;
052    import org.apache.kahadb.util.SequenceSet;
053    
054    /**
055     * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should 
056     * be externally synchronized.
057     * 
058     * The file has 3 parts:
059     * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
060     * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
061     * Page Space: The pages in the page file.
062     * 
063     * @version $Revision: 882511 $
064     */
065    public class PageFile {
066        
067        private static final String PAGEFILE_SUFFIX = ".data";
068        private static final String RECOVERY_FILE_SUFFIX = ".redo";
069        private static final String FREE_FILE_SUFFIX = ".free";
070        
071        // 4k Default page size.
072        public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); 
073        public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000));
074        private static final int RECOVERY_FILE_HEADER_SIZE=1024*4;
075        private static final int PAGE_FILE_HEADER_SIZE=1024*4;
076    
077        // Recovery header is (long offset)
078        private static final Log LOG = LogFactory.getLog(PageFile.class);
079    
080        // A PageFile will use a couple of files in this directory
081        private File directory;
082        // And the file names in that directory will be based on this name.
083        private final String name;
084        
085        // File handle used for reading pages..
086        private RandomAccessFile readFile;
087        // File handle used for writing pages..
088        private RandomAccessFile writeFile;
089        // File handle used for writing pages..
090        private RandomAccessFile recoveryFile;
091    
092        // The size of pages
093        private int pageSize = DEFAULT_PAGE_SIZE;
094        
095        // The minimum number of space allocated to the recovery file in number of pages.
096        private int recoveryFileMinPageCount = 1000;
097        // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize 
098        // to this max size as soon as  possible.
099        private int recoveryFileMaxPageCount = 10000;
100        // The number of pages in the current recovery buffer
101        private int recoveryPageCount;
102    
103        private AtomicBoolean loaded = new AtomicBoolean();
104        // The number of pages we are aiming to write every time we 
105        // write to disk.
106        int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
107    
108        // We keep a cache of pages recently used?
109        private LRUCache<Long, Page> pageCache;
110        // The cache of recently used pages.
111        private boolean enablePageCaching=true;
112        // How many pages will we keep in the cache?
113        private int pageCacheSize = 100;
114        
115        // Should first log the page write to the recovery buffer? Avoids partial
116        // page write failures..
117        private boolean enableRecoveryFile=true;
118        // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
119        private boolean enableDiskSyncs=true;
120        // Will writes be done in an async thread?
121        private boolean enabledWriteThread=false;
122    
123        // These are used if enableAsyncWrites==true 
124        private AtomicBoolean stopWriter = new AtomicBoolean();
125        private Thread writerThread;
126        private CountDownLatch checkpointLatch;
127    
128        // Keeps track of writes that are being written to disk.
129        private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
130    
131        // Keeps track of free pages.
132        private final AtomicLong nextFreePageId = new AtomicLong();
133        private SequenceSet freeList = new SequenceSet();
134        
135        private AtomicLong nextTxid = new AtomicLong();
136        
137        // Persistent settings stored in the page file. 
138        private MetaData metaData;
139        
140        /**
141         * Use to keep track of updated pages which have not yet been committed.
142         */
143        static class PageWrite {
144            Page page;
145            byte[] current;
146            byte[] diskBound;
147    
148            public PageWrite(Page page, byte[] data) {
149                this.page=page;
150                current=data;
151            }
152                    
153            public void setCurrent(Page page, byte[] data) {
154                this.page=page;
155                current=data;
156            }
157    
158            @Override
159            public String toString() {
160                return "[PageWrite:"+page.getPageId()+"]";
161            }
162    
163            @SuppressWarnings("unchecked")
164            public Page getPage() {
165                return page;
166            }
167            
168            void begin() {
169               diskBound = current;
170               current = null;
171            }
172            
173            /**
174             * @return true if there is no pending writes to do.
175             */
176            boolean done() {
177                diskBound=null;
178                return current == null;
179            }
180            
181            boolean isDone() {
182                return diskBound == null && current == null;
183            }
184    
185        }
186        
187        /**
188         * The MetaData object hold the persistent data associated with a PageFile object. 
189         */
190        public static class MetaData {
191            
192            String fileType;
193            String fileTypeVersion;
194            
195            long metaDataTxId=-1;
196            int pageSize;
197            boolean cleanShutdown;
198            long lastTxId;
199            long freePages;
200            
201            public String getFileType() {
202                return fileType;
203            }
204            public void setFileType(String fileType) {
205                this.fileType = fileType;
206            }
207            public String getFileTypeVersion() {
208                return fileTypeVersion;
209            }
210            public void setFileTypeVersion(String version) {
211                this.fileTypeVersion = version;
212            }
213            public long getMetaDataTxId() {
214                return metaDataTxId;
215            }
216            public void setMetaDataTxId(long metaDataTxId) {
217                this.metaDataTxId = metaDataTxId;
218            }
219            public int getPageSize() {
220                return pageSize;
221            }
222            public void setPageSize(int pageSize) {
223                this.pageSize = pageSize;
224            }
225            public boolean isCleanShutdown() {
226                return cleanShutdown;
227            }
228            public void setCleanShutdown(boolean cleanShutdown) {
229                this.cleanShutdown = cleanShutdown;
230            }
231            public long getLastTxId() {
232                return lastTxId;
233            }
234            public void setLastTxId(long lastTxId) {
235                this.lastTxId = lastTxId;
236            }
237            public long getFreePages() {
238                return freePages;
239            }
240            public void setFreePages(long value) {
241                this.freePages = value;
242            }
243        }
244    
245        public Transaction tx() {
246            assertLoaded();
247            return new Transaction(this);
248        }
249        
250        /**
251         * Creates a PageFile in the specified directory who's data files are named by name.
252         * 
253         * @param directory
254         * @param name
255         */
256        public PageFile(File directory, String name) {
257            this.directory = directory;
258            this.name = name;
259        }
260        
261        /**
262         * Deletes the files used by the PageFile object.  This method can only be used when this object is not loaded.
263         * 
264         * @throws IOException 
265         *         if the files cannot be deleted.
266         * @throws IllegalStateException 
267         *         if this PageFile is loaded
268         */
269        public void delete() throws IOException {
270            if( loaded.get() ) {
271                throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
272            }
273            delete(getMainPageFile());
274            delete(getFreeFile());
275            delete(getRecoveryFile());
276        }
277    
278        /**
279         * @param file
280         * @throws IOException
281         */
282        private void delete(File file) throws IOException {
283            if( file.exists() ) {
284                if( !file.delete() ) {
285                    throw new IOException("Could not delete: "+file.getPath());
286                }
287            }
288        }
289        
290        /**
291         * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the 
292         * first time the page file is loaded, then this creates the page file in the file system.
293         * 
294         * @throws IOException
295         *         If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if 
296         *         there was a disk error.
297         * @throws IllegalStateException 
298         *         If the page file was already loaded.
299         */
300        public void load() throws IOException, IllegalStateException {
301            if (loaded.compareAndSet(false, true)) {
302                
303                if( enablePageCaching ) {
304                    pageCache = new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true);
305                }
306                
307                File file = getMainPageFile();
308                IOHelper.mkdirs(file.getParentFile());
309                writeFile = new RandomAccessFile(file, "rw");
310                readFile = new RandomAccessFile(file, "r");
311                
312                if (readFile.length() > 0) {
313                    // Load the page size setting cause that can't change once the file is created.
314                    loadMetaData();
315                    pageSize = metaData.getPageSize();
316                } else {
317                    // Store the page size setting cause that can't change once the file is created.
318                    metaData = new MetaData();
319                    metaData.setFileType(PageFile.class.getName());
320                    metaData.setFileTypeVersion("1");
321                    metaData.setPageSize(getPageSize());
322                    metaData.setCleanShutdown(true);
323                    metaData.setFreePages(-1);
324                    metaData.setLastTxId(0);
325                    storeMetaData();
326                }
327    
328                if( enableRecoveryFile ) {
329                    recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw");
330                }
331                
332                if(  metaData.isCleanShutdown() ) {
333                    nextTxid.set(metaData.getLastTxId()+1);
334                    if( metaData.getFreePages()>0 ) {
335                        loadFreeList();
336                    } 
337                } else {
338                    LOG.debug("Recovering page file...");
339                    nextTxid.set(redoRecoveryUpdates());
340                    
341                    // Scan all to find the free pages.
342                    freeList = new SequenceSet();
343                    for (Iterator i = tx().iterator(true); i.hasNext();) {
344                        Page page = (Page)i.next();
345                        if( page.getType() == Page.PAGE_FREE_TYPE ) {
346                            freeList.add(page.getPageId());
347                        }
348                    }
349                    
350                }
351                
352                metaData.setCleanShutdown(false);
353                storeMetaData();
354                getFreeFile().delete();
355                
356                if( writeFile.length() < PAGE_FILE_HEADER_SIZE) {
357                    writeFile.setLength(PAGE_FILE_HEADER_SIZE);
358                }
359                nextFreePageId.set((writeFile.length()-PAGE_FILE_HEADER_SIZE)/pageSize);
360                startWriter();
361                    
362            } else {
363                throw new IllegalStateException("Cannot load the page file when it is allready loaded.");
364            }
365        }
366    
367    
368        /**
369         * Unloads a previously loaded PageFile.  This deallocates OS related resources like file handles.
370         * once unloaded, you can no longer use the page file to read or write Pages.
371         * 
372         * @throws IOException
373         *         if there was a disk error occurred while closing the down the page file.
374         * @throws IllegalStateException
375         *         if the PageFile is not loaded
376         */
377        public void unload() throws IOException {
378            if (loaded.compareAndSet(true, false)) {
379                flush();
380                try {
381                    stopWriter();
382                } catch (InterruptedException e) {
383                    throw new InterruptedIOException();
384                }
385                
386                if( freeList.isEmpty() ) {
387                    metaData.setFreePages(0);
388                } else {
389                    storeFreeList();
390                    metaData.setFreePages(freeList.size());
391                }
392                
393                metaData.setLastTxId( nextTxid.get()-1 );
394                metaData.setCleanShutdown(true);
395                storeMetaData();
396                
397                if (readFile != null) {
398                    readFile.close();
399                    readFile = null;
400                    writeFile.close();
401                    writeFile=null;
402                    if( enableRecoveryFile ) {
403                        recoveryFile.close();
404                        recoveryFile=null;
405                    }
406                    freeList.clear();
407                    if( pageCache!=null ) {
408                        pageCache=null;
409                    }
410                    synchronized(writes) {
411                        writes.clear();
412                    }
413                }
414            } else {
415                throw new IllegalStateException("Cannot unload the page file when it is not loaded");
416            }
417        }
418            
419        public boolean isLoaded() {
420            return loaded.get();
421        }
422    
423        /**
424         * Flush and sync all write buffers to disk.
425         * 
426         * @throws IOException
427         *         If an disk error occurred.
428         */
429        public void flush() throws IOException {
430    
431            if( enabledWriteThread && stopWriter.get() ) {
432                throw new IOException("Page file already stopped: checkpointing is not allowed");
433            }
434            
435            // Setup a latch that gets notified when all buffered writes hits the disk.
436            CountDownLatch checkpointLatch;
437            synchronized( writes ) {
438                if( writes.isEmpty()) {                
439                    return;
440                }
441                if( enabledWriteThread ) {
442                    if( this.checkpointLatch == null ) {
443                        this.checkpointLatch = new CountDownLatch(1);
444                    }
445                    checkpointLatch = this.checkpointLatch;
446                    writes.notify();
447                } else {
448                    writeBatch();
449                    return;
450                }
451            }
452            try {
453                int size = writes.size();
454                long start = System.currentTimeMillis();
455                checkpointLatch.await();        
456                long end = System.currentTimeMillis();
457                if( end-start > 100 ) {
458                    LOG.warn("KahaDB PageFile flush: " + size + " queued writes, latch wait took "+(end-start));
459                }
460            } catch (InterruptedException e) {
461                throw new InterruptedIOException();
462            }
463        }
464    
465        
466        public String toString() {
467            return "Page File: "+getMainPageFile();
468        }
469        
470        ///////////////////////////////////////////////////////////////////
471        // Private Implementation Methods
472        ///////////////////////////////////////////////////////////////////
473        private File getMainPageFile() {
474            return new File(directory, IOHelper.toFileSystemSafeName(name)+PAGEFILE_SUFFIX);
475        }
476        
477        public File getFreeFile() {
478            return new File(directory, IOHelper.toFileSystemSafeName(name)+FREE_FILE_SUFFIX);
479        } 
480    
481        public File getRecoveryFile() {
482            return new File(directory, IOHelper.toFileSystemSafeName(name)+RECOVERY_FILE_SUFFIX);
483        } 
484    
485        private long toOffset(long pageId) {
486            return PAGE_FILE_HEADER_SIZE+(pageId*pageSize);
487        }
488    
489        private void loadMetaData() throws IOException {
490    
491            ByteArrayInputStream is;
492            MetaData v1 = new MetaData();
493            MetaData v2 = new MetaData();
494            try {
495                Properties p = new Properties();
496                byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2];
497                readFile.seek(0);
498                readFile.readFully(d);
499                is = new ByteArrayInputStream(d);
500                p.load(is);
501                IntrospectionSupport.setProperties(v1, p);
502            } catch (IOException e) {
503                v1 = null;
504            }
505            
506            try {
507                Properties p = new Properties();
508                byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2];
509                readFile.seek(PAGE_FILE_HEADER_SIZE/2);
510                readFile.readFully(d);
511                is = new ByteArrayInputStream(d);
512                p.load(is);
513                IntrospectionSupport.setProperties(v2, p);
514            } catch (IOException e) {
515                v2 = null;
516            }
517            
518            if( v1==null && v2==null ) {
519                throw new IOException("Could not load page file meta data");
520            } 
521            
522            if( v1 == null || v1.metaDataTxId<0 ) {
523                metaData = v2;
524            } else if( v2==null || v1.metaDataTxId<0 ) {
525                metaData = v1;
526            } else if( v1.metaDataTxId==v2.metaDataTxId ) {
527                metaData = v1; // use the first since the 2nd could be a partial..
528            } else {
529                metaData = v2; // use the second cause the first is probably a partial.
530            }
531        }
532        
533        private void storeMetaData() throws IOException {
534            // Convert the metadata into a property format
535            metaData.metaDataTxId++;
536            Properties p = new Properties();
537            IntrospectionSupport.getProperties(metaData, p, null);
538            
539            ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
540            p.store(os, "");
541            if( os.size() > PAGE_FILE_HEADER_SIZE/2) { 
542                throw new IOException("Configuation is to larger than: "+PAGE_FILE_HEADER_SIZE/2);
543            }
544            // Fill the rest with space...
545            byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE/2)-os.size()];
546            Arrays.fill(filler, (byte)' ');
547            os.write(filler);
548            os.flush();
549            
550            byte[] d = os.toByteArray();
551    
552            // So we don't loose it.. write it 2 times...
553            writeFile.seek(0);
554            writeFile.write(d);
555            writeFile.getFD().sync();
556            writeFile.seek(PAGE_FILE_HEADER_SIZE/2);
557            writeFile.write(d);
558            writeFile.getFD().sync();
559        }
560    
561        private void storeFreeList() throws IOException {
562            FileOutputStream os = new FileOutputStream(getFreeFile());
563            DataOutputStream dos = new DataOutputStream(os);
564            SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
565            dos.close();
566        }
567    
568        private void loadFreeList() throws IOException {
569            freeList.clear();
570            FileInputStream is = new FileInputStream(getFreeFile());
571            DataInputStream dis = new DataInputStream(is);
572            freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
573            dis.close();
574        }
575        
576        ///////////////////////////////////////////////////////////////////
577        // Property Accessors 
578        ///////////////////////////////////////////////////////////////////
579        
580        /**
581         * Is the recovery buffer used to double buffer page writes.  Enabled by default.
582         * 
583         * @return is the recovery buffer enabled.
584         */
585        public boolean isEnableRecoveryFile() {
586            return enableRecoveryFile;
587        }
588    
589        /**
590         * Sets if the recovery buffer uses to double buffer page writes.  Enabled by default.  Disabling this
591         * may potentially cause partial page writes which can lead to page file corruption.
592         */
593        public void setEnableRecoveryFile(boolean doubleBuffer) {
594            assertNotLoaded();
595            this.enableRecoveryFile = doubleBuffer;
596        }
597    
598        /**
599         * @return Are page writes synced to disk?
600         */
601        public boolean isEnableDiskSyncs() {
602            return enableDiskSyncs;
603        }
604    
605        /**
606         * Allows you enable syncing writes to disk.
607         * @param syncWrites
608         */
609        public void setEnableDiskSyncs(boolean syncWrites) {
610            assertNotLoaded();
611            this.enableDiskSyncs = syncWrites;
612        }
613        
614        /**
615         * @return the page size
616         */
617        public int getPageSize() {
618            return this.pageSize;
619        }
620    
621        /**
622         * @return the amount of content data that a page can hold.
623         */
624        public int getPageContentSize() {
625            return this.pageSize-Page.PAGE_HEADER_SIZE;
626        }
627        
628        /**
629         * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
630         * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
631         * can no longer be changed.
632         * 
633         * @param pageSize the pageSize to set
634         * @throws IllegalStateException
635         *         once the page file is loaded.
636         */
637        public void setPageSize(int pageSize) throws IllegalStateException {
638            assertNotLoaded();
639            this.pageSize = pageSize;
640        }
641        
642        /**
643         * @return true if read page caching is enabled
644         */
645        public boolean isEnablePageCaching() {
646            return this.enablePageCaching;
647        }
648    
649        /**
650         * @param allows you to enable read page caching
651         */
652        public void setEnablePageCaching(boolean enablePageCaching) {
653            assertNotLoaded();
654            this.enablePageCaching = enablePageCaching;
655        }
656    
657        /**
658         * @return the maximum number of pages that will get stored in the read page cache.
659         */
660        public int getPageCacheSize() {
661            return this.pageCacheSize;
662        }
663    
664        /**
665         * @param Sets the maximum number of pages that will get stored in the read page cache.
666         */
667        public void setPageCacheSize(int pageCacheSize) {
668            assertNotLoaded();
669            this.pageCacheSize = pageCacheSize;
670        }
671    
672        public boolean isEnabledWriteThread() {
673            return enabledWriteThread;
674        }
675    
676        public void setEnableWriteThread(boolean enableAsyncWrites) {
677            assertNotLoaded();
678            this.enabledWriteThread = enableAsyncWrites;
679        }
680    
681        public long getDiskSize() throws IOException {
682            return toOffset(nextFreePageId.get());
683        }
684        
685        /**
686         * @return the number of pages allocated in the PageFile
687         */
688        public long getPageCount() {
689            return nextFreePageId.get();
690        }
691    
692        public int getRecoveryFileMinPageCount() {
693            return recoveryFileMinPageCount;
694        }
695    
696        public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
697            assertNotLoaded();
698            this.recoveryFileMinPageCount = recoveryFileMinPageCount;
699        }
700    
701        public int getRecoveryFileMaxPageCount() {
702            return recoveryFileMaxPageCount;
703        }
704    
705        public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
706            assertNotLoaded();
707            this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
708        }
709    
710            public int getWriteBatchSize() {
711                    return writeBatchSize;
712            }
713    
714            public void setWriteBatchSize(int writeBatchSize) {
715            assertNotLoaded();
716                    this.writeBatchSize = writeBatchSize;
717            }
718    
719            ///////////////////////////////////////////////////////////////////
720        // Package Protected Methods exposed to Transaction
721        ///////////////////////////////////////////////////////////////////
722    
723        /**
724         * @throws IllegalStateException if the page file is not loaded.
725         */
726        void assertLoaded() throws IllegalStateException {
727            if( !loaded.get() ) {
728                throw new IllegalStateException("PageFile is not loaded");
729            }
730        }
731        void assertNotLoaded() throws IllegalStateException {
732            if( loaded.get() ) {
733                throw new IllegalStateException("PageFile is loaded");
734            }
735        }
736            
737        /** 
738         * Allocates a block of free pages that you can write data to.
739         * 
740         * @param count the number of sequential pages to allocate
741         * @return the first page of the sequential set. 
742         * @throws IOException
743         *         If an disk error occurred.
744         * @throws IllegalStateException
745         *         if the PageFile is not loaded
746         */
747        <T> Page<T> allocate(int count) throws IOException {
748            assertLoaded();
749            if (count <= 0) {
750                throw new IllegalArgumentException("The allocation count must be larger than zero");
751            }
752    
753            Sequence seq = freeList.removeFirstSequence(count);
754    
755            // We may need to create new free pages...
756            if (seq == null) {
757    
758                Page<T> first = null;
759                int c = count;
760                while (c > 0) {
761                    Page<T> page = new Page<T>(nextFreePageId.getAndIncrement());
762                    page.makeFree(getNextWriteTransactionId());
763    
764                    if (first == null) {
765                        first = page;
766                    }
767    
768                    addToCache(page);
769                    DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
770                    page.write(out);
771                    write(page, out.getData());
772    
773                    // LOG.debug("allocate writing: "+page.getPageId());
774                    c--;
775                }
776    
777                return first;
778            }
779    
780            Page<T> page = new Page<T>(seq.getFirst());
781            page.makeFree(0);
782            // LOG.debug("allocated: "+page.getPageId());
783            return page;
784        }
785    
786        long getNextWriteTransactionId() {
787            return nextTxid.incrementAndGet();
788        }
789    
790        void readPage(long pageId, byte[] data) throws IOException {
791            readFile.seek(toOffset(pageId));
792            readFile.readFully(data);
793        }
794    
795        public void freePage(long pageId) {
796            freeList.add(pageId);
797            if( enablePageCaching ) {
798                pageCache.remove(pageId);
799            }
800        }
801        
802        @SuppressWarnings("unchecked")
803        private <T> void write(Page<T> page, byte[] data) throws IOException {
804            final PageWrite write = new PageWrite(page, data);
805            Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>(){
806                public Long getKey() {
807                    return write.getPage().getPageId();
808                }
809                public PageWrite getValue() {
810                    return write;
811                }
812                public PageWrite setValue(PageWrite value) {
813                    return null;
814                }
815            };
816            Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
817            write(Arrays.asList(entries));
818        }
819    
820        void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
821            synchronized( writes ) {
822                if( enabledWriteThread  ) {
823                    while( writes.size() >= writeBatchSize && !stopWriter.get() ) {
824                        try {
825                            writes.wait();
826                        } catch (InterruptedException e) {
827                            Thread.currentThread().interrupt();
828                            throw new InterruptedIOException();
829                        }
830                    }
831                }
832    
833                for (Map.Entry<Long, PageWrite> entry : updates) {
834                    Long key = entry.getKey();
835                    PageWrite value = entry.getValue();
836                    PageWrite write = writes.get(key);
837                    if( write==null ) {
838                        writes.put(key, value);
839                    } else {
840                        write.setCurrent(value.page, value.current);
841                    }
842                }
843                
844                // Once we start approaching capacity, notify the writer to start writing
845                if( canStartWriteBatch() ) {
846                    if( enabledWriteThread  ) {
847                        writes.notify();
848                    } else {
849                        writeBatch();
850                    }
851                }
852            }            
853        }
854        
855        private boolean canStartWriteBatch() {
856                    int capacityUsed = ((writes.size() * 100)/writeBatchSize);
857            if( enabledWriteThread ) {
858                // The constant 10 here controls how soon write batches start going to disk..
859                // would be nice to figure out how to auto tune that value.  Make to small and
860                // we reduce through put because we are locking the write mutex too often doing writes
861                return capacityUsed >= 10 || checkpointLatch!=null;
862            } else {
863                return capacityUsed >= 80 || checkpointLatch!=null;
864            }
865        }
866    
867        ///////////////////////////////////////////////////////////////////
868        // Cache Related operations
869        ///////////////////////////////////////////////////////////////////
870        @SuppressWarnings("unchecked")
871        <T> Page<T> getFromCache(long pageId) {
872            synchronized(writes) {
873                PageWrite pageWrite = writes.get(pageId);
874                if( pageWrite != null ) {
875                    return pageWrite.page;
876                }
877            }
878    
879            Page<T> result = null;
880            if (enablePageCaching) {
881                result = pageCache.get(pageId);
882            }
883            return result;
884        }
885    
886        void addToCache(Page page) {
887            if (enablePageCaching) {
888                pageCache.put(page.getPageId(), page);
889            }
890        }
891    
892        void removeFromCache(Page page) {
893            if (enablePageCaching) {
894                pageCache.remove(page.getPageId());
895            }
896        }
897    
898        ///////////////////////////////////////////////////////////////////
899        // Internal Double write implementation follows...
900        ///////////////////////////////////////////////////////////////////
901        /**
902         * 
903         */
904        private void pollWrites() {
905            try {
906                while( !stopWriter.get() ) {
907                    // Wait for a notification...
908                    synchronized( writes ) {  
909                        writes.notifyAll();
910                        
911                        // If there is not enough to write, wait for a notification...
912                        while( writes.isEmpty() && checkpointLatch==null && !stopWriter.get() ) {
913                            writes.wait(100);
914                        }
915                        
916                        if( writes.isEmpty() ) {
917                            releaseCheckpointWaiter();
918                        }
919                    }
920                    writeBatch();
921                }
922            } catch (Throwable e) {
923                e.printStackTrace();
924            } finally {
925                releaseCheckpointWaiter();
926            }
927        }
928    
929        /**
930         * 
931         * @param timeout
932         * @param unit
933         * @return true if there are still pending writes to do.
934         * @throws InterruptedException 
935         * @throws IOException 
936         */
937        private void writeBatch() throws IOException {
938                
939            CountDownLatch checkpointLatch;
940            ArrayList<PageWrite> batch;
941            synchronized( writes ) {
942                // If there is not enough to write, wait for a notification...
943    
944                batch = new ArrayList<PageWrite>(writes.size());
945                // build a write batch from the current write cache.
946                for (PageWrite write : writes.values()) {
947                    batch.add(write);
948                    // Move the current write to the diskBound write, this lets folks update the 
949                    // page again without blocking for this write.
950                    write.begin();
951                    if (write.diskBound == null) {
952                        batch.remove(write);
953                    }
954                }
955    
956                // Grab on to the existing checkpoint latch cause once we do this write we can 
957                // release the folks that were waiting for those writes to hit disk.
958                checkpointLatch = this.checkpointLatch;
959                this.checkpointLatch=null;
960            }
961            
962           try {
963                if (enableRecoveryFile) {
964    
965                    // Using Adler-32 instead of CRC-32 because it's much faster and
966                    // it's
967                    // weakness for short messages with few hundred bytes is not a
968                    // factor in this case since we know
969                    // our write batches are going to much larger.
970                    Checksum checksum = new Adler32();
971                    for (PageWrite w : batch) {
972                        try {
973                            checksum.update(w.diskBound, 0, pageSize);
974                        } catch (Throwable t) {
975                            throw IOExceptionSupport.create(
976                                    "Cannot create recovery file. Reason: " + t, t);
977                        }
978                    }
979    
980                    // Can we shrink the recovery buffer??
981                    if (recoveryPageCount > recoveryFileMaxPageCount) {
982                        int t = Math.max(recoveryFileMinPageCount, batch.size());
983                        recoveryFile.setLength(recoveryFileSizeForPages(t));
984                    }
985    
986                    // Record the page writes in the recovery buffer.
987                    recoveryFile.seek(0);
988                    // Store the next tx id...
989                    recoveryFile.writeLong(nextTxid.get());
990                    // Store the checksum for thw write batch so that on recovery we
991                    // know if we have a consistent
992                    // write batch on disk.
993                    recoveryFile.writeLong(checksum.getValue());
994                    // Write the # of pages that will follow
995                    recoveryFile.writeInt(batch.size());
996    
997                    // Write the pages.
998                    recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
999    
1000                    for (PageWrite w : batch) {
1001                        recoveryFile.writeLong(w.page.getPageId());
1002                        recoveryFile.write(w.diskBound, 0, pageSize);
1003                    }
1004    
1005                    if (enableDiskSyncs) {
1006                        // Sync to make sure recovery buffer writes land on disk..
1007                        recoveryFile.getFD().sync();
1008                    }
1009    
1010                    recoveryPageCount = batch.size();
1011                }
1012    
1013                for (PageWrite w : batch) {
1014                    writeFile.seek(toOffset(w.page.getPageId()));
1015                    writeFile.write(w.diskBound, 0, pageSize);
1016                    w.done();
1017                }
1018    
1019                // Sync again
1020                if (enableDiskSyncs) {
1021                    writeFile.getFD().sync();
1022                }
1023    
1024            } finally {
1025                synchronized (writes) {
1026                    for (PageWrite w : batch) {
1027                        // If there are no more pending writes, then remove it from
1028                        // the write cache.
1029                        if (w.isDone()) {
1030                            writes.remove(w.page.getPageId());
1031                        }
1032                    }
1033                }
1034                
1035                if( checkpointLatch!=null ) {
1036                    checkpointLatch.countDown();
1037                }
1038            }
1039        }
1040    
1041        private long recoveryFileSizeForPages(int pageCount) {
1042            return RECOVERY_FILE_HEADER_SIZE+((pageSize+8)*pageCount);
1043        }
1044    
1045        private void releaseCheckpointWaiter() {
1046            if( checkpointLatch!=null ) {
1047                checkpointLatch.countDown();
1048                checkpointLatch=null;
1049            }
1050        }       
1051        
1052        /**
1053         * Inspects the recovery buffer and re-applies any 
1054         * partially applied page writes.
1055         * 
1056         * @return the next transaction id that can be used.
1057         * @throws IOException
1058         */
1059        private long redoRecoveryUpdates() throws IOException {
1060            if( !enableRecoveryFile ) {
1061                return 0;
1062            }
1063            recoveryPageCount=0;
1064            
1065            // Are we initializing the recovery file?
1066            if( recoveryFile.length() == 0 ) {
1067                // Write an empty header..
1068                recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
1069                // Preallocate the minium size for better performance.
1070                recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
1071                return 0;
1072            }
1073            
1074            // How many recovery pages do we have in the recovery buffer?
1075            recoveryFile.seek(0);
1076            long nextTxId = readFile.readLong();
1077            long expectedChecksum = readFile.readLong();
1078            int pageCounter = readFile.readInt();
1079            
1080            recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1081            Checksum checksum = new Adler32();
1082            LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
1083            try {
1084                for (int i = 0; i < pageCounter; i++) {
1085                    long offset = recoveryFile.readLong();
1086                    byte []data = new byte[pageSize];
1087                    if( recoveryFile.read(data, 0, pageSize) != pageSize ) {
1088                        // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
1089                        return nextTxId;
1090                    }
1091                    checksum.update(data, 0, pageSize);
1092                    batch.put(offset, data);
1093                }
1094            } catch (Exception e) {
1095                // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. 
1096                // as the pages should still be consistent.
1097                LOG.debug("Redo buffer was not fully intact: ", e);
1098                return nextTxId;
1099            }
1100            
1101            recoveryPageCount = pageCounter;
1102            
1103            // If the checksum is not valid then the recovery buffer was partially written to disk.
1104            if( checksum.getValue() != expectedChecksum ) {
1105                return nextTxId;
1106            }
1107            
1108            // Re-apply all the writes in the recovery buffer.
1109            for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
1110                writeFile.seek(e.getKey());
1111                e.getValue();
1112                writeFile.write(e.getValue());
1113            }
1114            
1115            // And sync it to disk
1116            writeFile.getFD().sync();
1117            return nextTxId;
1118        }
1119    
1120        private void startWriter() {
1121            synchronized( writes ) {
1122                if( enabledWriteThread ) {
1123                    stopWriter.set(false);
1124                    writerThread = new Thread("KahaDB Page Writer") {
1125                        @Override
1126                        public void run() {
1127                            pollWrites();
1128                        }
1129                    };
1130                    writerThread.setPriority(Thread.MAX_PRIORITY);
1131                    writerThread.setDaemon(true);
1132                    writerThread.start();
1133                }
1134            }
1135        }
1136     
1137        private void stopWriter() throws InterruptedException {
1138            if( enabledWriteThread ) {
1139                stopWriter.set(true);
1140                writerThread.join();
1141            }
1142        }
1143    
1144            public File getFile() {
1145                    return getMainPageFile();
1146            }
1147    
1148    }