001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.kahadb.journal;
018    
019    import java.io.File;
020    import java.io.FilenameFilter;
021    import java.io.IOException;
022    import java.io.UnsupportedEncodingException;
023    import java.util.*;
024    import java.util.concurrent.ConcurrentHashMap;
025    import java.util.concurrent.atomic.AtomicLong;
026    import java.util.concurrent.atomic.AtomicReference;
027    import java.util.zip.Adler32;
028    import java.util.zip.Checksum;
029    
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
033    import org.apache.kahadb.journal.DataFileAppender.WriteKey;
034    import org.apache.kahadb.util.*;
035    
036    /**
037     * Manages DataFiles
038     * 
039     * @version $Revision: 813467 $
040     */
041    public class Journal {
042    
043        private static final int MAX_BATCH_SIZE = 32*1024*1024;
044    
045            // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
046        public static final int RECORD_HEAD_SPACE = 4 + 1;
047        
048        public static final byte USER_RECORD_TYPE = 1;
049        public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
050        // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 
051        public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
052        public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
053        public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
054    
055        private static byte[] createBatchControlRecordHeader() {
056            try {
057                DataByteArrayOutputStream os = new DataByteArrayOutputStream();
058                os.writeInt(BATCH_CONTROL_RECORD_SIZE);
059                os.writeByte(BATCH_CONTROL_RECORD_TYPE);
060                os.write(BATCH_CONTROL_RECORD_MAGIC);
061                ByteSequence sequence = os.toByteSequence();
062                sequence.compact();
063                return sequence.getData();
064            } catch (IOException e) {
065                throw new RuntimeException("Could not create batch control record header.");
066            }
067        }
068    
069        public static final String DEFAULT_DIRECTORY = ".";
070        public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
071        public static final String DEFAULT_FILE_PREFIX = "db-";
072        public static final String DEFAULT_FILE_SUFFIX = ".log";
073        public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
074        public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
075        public static final int PREFERED_DIFF = 1024 * 512;
076        public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
077        
078        private static final Log LOG = LogFactory.getLog(Journal.class);
079    
080        protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
081    
082        protected File directory = new File(DEFAULT_DIRECTORY);
083        protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY);
084        protected String filePrefix = DEFAULT_FILE_PREFIX;
085        protected String fileSuffix = DEFAULT_FILE_SUFFIX;
086        protected boolean started;
087        
088        protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
089        protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
090        protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
091        
092        protected DataFileAppender appender;
093        protected DataFileAccessorPool accessorPool;
094    
095        protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
096        protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
097        protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
098    
099        protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
100        protected Runnable cleanupTask;
101        protected final AtomicLong totalLength = new AtomicLong();
102        protected boolean archiveDataLogs;
103            private ReplicationTarget replicationTarget;
104        protected boolean checksum;
105        protected boolean checkForCorruptionOnStartup;
106    
107       
108    
109        public synchronized void start() throws IOException {
110            if (started) {
111                return;
112            }
113            
114            long start = System.currentTimeMillis();
115            accessorPool = new DataFileAccessorPool(this);
116            started = true;
117            preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
118    
119            appender = new DataFileAppender(this);
120    
121            File[] files = directory.listFiles(new FilenameFilter() {
122                public boolean accept(File dir, String n) {
123                    return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
124                }
125            });
126    
127            if (files != null) {
128                for (int i = 0; i < files.length; i++) {
129                    try {
130                        File file = files[i];
131                        String n = file.getName();
132                        String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
133                        int num = Integer.parseInt(numStr);
134                        DataFile dataFile = new DataFile(file, num, preferedFileLength);
135                        fileMap.put(dataFile.getDataFileId(), dataFile);
136                        totalLength.addAndGet(dataFile.getLength());
137                    } catch (NumberFormatException e) {
138                        // Ignore file that do not match the pattern.
139                    }
140                }
141    
142                // Sort the list so that we can link the DataFiles together in the
143                // right order.
144                List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
145                Collections.sort(l);
146                for (DataFile df : l) {
147                    dataFiles.addLast(df);
148                    fileByFileMap.put(df.getFile(), df);
149    
150                    if( isCheckForCorruptionOnStartup() ) {
151                        lastAppendLocation.set(recoveryCheck(df));
152                    }
153                }
154            }
155    
156            getCurrentWriteFile();
157    
158            if( lastAppendLocation.get()==null ) {
159                DataFile df = dataFiles.getTail();
160                lastAppendLocation.set(recoveryCheck(df));
161            }
162    
163            cleanupTask = new Runnable() {
164                public void run() {
165                    cleanup();
166                }
167            };
168            Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
169            long end = System.currentTimeMillis();
170            LOG.trace("Startup took: "+(end-start)+" ms");
171        }
172    
173        private static byte[] bytes(String string) {
174            try {
175                            return string.getBytes("UTF-8");
176                    } catch (UnsupportedEncodingException e) {
177                            throw new RuntimeException(e);
178                    }
179            }
180    
181            protected Location recoveryCheck(DataFile dataFile) throws IOException {
182            Location location = new Location();
183            location.setDataFileId(dataFile.getDataFileId());
184            location.setOffset(0);
185    
186            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
187            try {
188                while( true ) {
189                    int size = checkBatchRecord(reader, location.getOffset());
190                    if ( size>=0 ) {
191                        location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
192                    } else {
193    
194                        // Perhaps it's just some corruption... scan through the file to find the next valid batch record.  We
195                        // may have subsequent valid batch records.
196                        int nextOffset = findNextBatchRecord(reader, location.getOffset()+1);
197                        if( nextOffset >=0 ) {
198                            Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
199                            LOG.info("Corrupt journal records found in '"+dataFile.getFile()+"' between offsets: "+sequence);
200                            dataFile.corruptedBlocks.add(sequence);
201                            location.setOffset(nextOffset);
202                        } else {
203                            break;
204                        }
205                    }
206                }
207                
208            } catch (IOException e) {
209                    } finally {
210                accessorPool.closeDataFileAccessor(reader);
211            }
212    
213            dataFile.setLength(location.getOffset());
214    
215            if( !dataFile.corruptedBlocks.isEmpty() ) {
216                // Is the end of the data file corrupted?
217                if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) {
218                    dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
219                }
220            }
221    
222            return location;
223        }
224    
225        private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
226            ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
227            byte data[] = new byte[1024*4];
228            ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
229    
230            int pos = 0;
231            while( true ) {
232                pos = bs.indexOf(header, pos);
233                if( pos >= 0 ) {
234                    return offset+pos;
235                } else {
236                    // need to load the next data chunck in..
237                    if( bs.length != data.length ) {
238                        // If we had a short read then we were at EOF
239                        return -1;
240                    }
241                    offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length;
242                    bs = new ByteSequence(data, 0, reader.read(offset, data));
243                    pos=0;
244                }
245            }
246        }
247    
248    
249        public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
250            byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
251            DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
252    
253            reader.readFully(offset, controlRecord);
254    
255            // Assert that it's  a batch record.
256            for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
257                if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
258                    return -1;
259                }
260            }
261    
262            int size = controlIs.readInt();
263            if( size > MAX_BATCH_SIZE ) {
264                return -1;
265            }
266    
267            if( isChecksum() ) {
268    
269                long expectedChecksum = controlIs.readLong();
270                if( expectedChecksum == 0 ) {
271                    // Checksuming was not enabled when the record was stored.
272                    // we can't validate the record :(
273                    return size;
274                }
275    
276                byte data[] = new byte[size];
277                reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
278    
279                Checksum checksum = new Adler32();
280                checksum.update(data, 0, data.length);
281    
282                if( expectedChecksum!=checksum.getValue() ) {
283                    return -1;
284                }
285    
286            }
287            return size;
288        }
289    
290    
291            void addToTotalLength(int size) {
292                    totalLength.addAndGet(size);
293            }
294        
295        
296        synchronized DataFile getCurrentWriteFile() throws IOException {
297            if (dataFiles.isEmpty()) {
298                rotateWriteFile();
299            }
300            return dataFiles.getTail();
301        }
302    
303        synchronized DataFile rotateWriteFile() {
304                    int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
305                    File file = getFile(nextNum);
306                    DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
307                    // actually allocate the disk space
308                    fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
309                    fileByFileMap.put(file, nextWriteFile);
310                    dataFiles.addLast(nextWriteFile);
311                    return nextWriteFile;
312            }
313    
314            public File getFile(int nextNum) {
315                    String fileName = filePrefix + nextNum + fileSuffix;
316                    File file = new File(directory, fileName);
317                    return file;
318            }
319    
320        synchronized DataFile getDataFile(Location item) throws IOException {
321            Integer key = Integer.valueOf(item.getDataFileId());
322            DataFile dataFile = fileMap.get(key);
323            if (dataFile == null) {
324                LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
325                throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
326            }
327            return dataFile;
328        }
329    
330        synchronized File getFile(Location item) throws IOException {
331            Integer key = Integer.valueOf(item.getDataFileId());
332            DataFile dataFile = fileMap.get(key);
333            if (dataFile == null) {
334                LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
335                throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
336            }
337            return dataFile.getFile();
338        }
339    
340        private DataFile getNextDataFile(DataFile dataFile) {
341            return dataFile.getNext();
342        }
343    
344        public synchronized void close() throws IOException {
345            if (!started) {
346                return;
347            }
348            Scheduler.cancel(cleanupTask);
349            accessorPool.close();
350            appender.close();
351            fileMap.clear();
352            fileByFileMap.clear();
353            dataFiles.clear();
354            lastAppendLocation.set(null);
355            started = false;
356        }
357    
358        synchronized void cleanup() {
359            if (accessorPool != null) {
360                accessorPool.disposeUnused();
361            }
362        }
363    
364        public synchronized boolean delete() throws IOException {
365    
366            // Close all open file handles...
367            appender.close();
368            accessorPool.close();
369    
370            boolean result = true;
371            for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
372                DataFile dataFile = i.next();
373                totalLength.addAndGet(-dataFile.getLength());
374                result &= dataFile.delete();
375            }
376            fileMap.clear();
377            fileByFileMap.clear();
378            lastAppendLocation.set(null);
379            dataFiles = new LinkedNodeList<DataFile>();
380    
381            // reopen open file handles...
382            accessorPool = new DataFileAccessorPool(this);
383            appender = new DataFileAppender(this);
384            return result;
385        }
386    
387        public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
388            for (Integer key : files) {
389                // Can't remove the data file (or subsequent files) that is currently being written to.
390                    if( key >= lastAppendLocation.get().getDataFileId() ) {
391                            continue;
392                    }
393                DataFile dataFile = fileMap.get(key);
394                if( dataFile!=null ) {
395                    forceRemoveDataFile(dataFile);
396                }
397            }
398        }
399    
400        private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
401            accessorPool.disposeDataFileAccessors(dataFile);
402            fileByFileMap.remove(dataFile.getFile());
403            fileMap.remove(dataFile.getDataFileId());
404            totalLength.addAndGet(-dataFile.getLength());
405            dataFile.unlink();
406            if (archiveDataLogs) {
407                dataFile.move(getDirectoryArchive());
408                LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
409            } else {
410                if ( dataFile.delete() ) {
411                    LOG.debug("Discarded data file " + dataFile);
412                } else {
413                    LOG.warn("Failed to discard data file " + dataFile.getFile());
414                }
415            }
416        }
417    
418        /**
419         * @return the maxFileLength
420         */
421        public int getMaxFileLength() {
422            return maxFileLength;
423        }
424    
425        /**
426         * @param maxFileLength the maxFileLength to set
427         */
428        public void setMaxFileLength(int maxFileLength) {
429            this.maxFileLength = maxFileLength;
430        }
431    
432        public String toString() {
433            return directory.toString();
434        }
435    
436            public synchronized void appendedExternally(Location loc, int length) throws IOException {
437                    DataFile dataFile = null;
438                    if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
439                            // It's an update to the current log file..
440                            dataFile = dataFiles.getTail();
441                            dataFile.incrementLength(length);
442                    } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
443                            // It's an update to the next log file.
444                int nextNum = loc.getDataFileId();
445                File file = getFile(nextNum);
446                dataFile = new DataFile(file, nextNum, preferedFileLength);
447                // actually allocate the disk space
448                fileMap.put(dataFile.getDataFileId(), dataFile);
449                fileByFileMap.put(file, dataFile);
450                dataFiles.addLast(dataFile);
451                    } else {
452                            throw new IOException("Invalid external append.");
453                    }
454            }
455    
456        public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
457    
458            Location cur = null;
459            while (true) {
460                if (cur == null) {
461                    if (location == null) {
462                        DataFile head = dataFiles.getHead();
463                        if( head == null ) {
464                            return null;
465                        }
466                        cur = new Location();
467                        cur.setDataFileId(head.getDataFileId());
468                        cur.setOffset(0);
469                    } else {
470                        // Set to the next offset..
471                        if (location.getSize() == -1) {
472                            cur = new Location(location);
473                        } else {
474                            cur = new Location(location);
475                            cur.setOffset(location.getOffset() + location.getSize());
476                        }
477                    }
478                } else {
479                    cur.setOffset(cur.getOffset() + cur.getSize());
480                }
481    
482                DataFile dataFile = getDataFile(cur);
483    
484                // Did it go into the next file??
485                if (dataFile.getLength() <= cur.getOffset()) {
486                    dataFile = getNextDataFile(dataFile);
487                    if (dataFile == null) {
488                        return null;
489                    } else {
490                        cur.setDataFileId(dataFile.getDataFileId().intValue());
491                        cur.setOffset(0);
492                    }
493                }
494    
495                // Load in location size and type.
496                DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
497                try {
498                                    reader.readLocationDetails(cur);
499                } finally {
500                    accessorPool.closeDataFileAccessor(reader);
501                }
502    
503                if (cur.getType() == 0) {
504                    return null;
505                } else if (cur.getType() == USER_RECORD_TYPE) {
506                    // Only return user records.
507                    return cur;
508                }
509            }
510        }
511    
512        public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException {
513            DataFile df = fileByFileMap.get(file);
514            return getNextLocation(df, lastLocation, thisFileOnly);
515        }
516    
517        public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException {
518    
519            Location cur = null;
520            while (true) {
521                if (cur == null) {
522                    if (lastLocation == null) {
523                        DataFile head = dataFile.getHeadNode();
524                        cur = new Location();
525                        cur.setDataFileId(head.getDataFileId());
526                        cur.setOffset(0);
527                    } else {
528                        // Set to the next offset..
529                        cur = new Location(lastLocation);
530                        cur.setOffset(cur.getOffset() + cur.getSize());
531                    }
532                } else {
533                    cur.setOffset(cur.getOffset() + cur.getSize());
534                }
535    
536                // Did it go into the next file??
537                if (dataFile.getLength() <= cur.getOffset()) {
538                    if (thisFileOnly) {
539                        return null;
540                    } else {
541                        dataFile = getNextDataFile(dataFile);
542                        if (dataFile == null) {
543                            return null;
544                        } else {
545                            cur.setDataFileId(dataFile.getDataFileId().intValue());
546                            cur.setOffset(0);
547                        }
548                    }
549                }
550    
551                // Load in location size and type.
552                DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
553                try {
554                    reader.readLocationDetails(cur);
555                } finally {
556                    accessorPool.closeDataFileAccessor(reader);
557                }
558    
559                if (cur.getType() == 0) {
560                    return null;
561                } else if (cur.getType() > 0) {
562                    // Only return user records.
563                    return cur;
564                }
565            }
566        }
567    
568        public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
569            DataFile dataFile = getDataFile(location);
570            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
571            ByteSequence rc = null;
572            try {
573                rc = reader.readRecord(location);
574            } finally {
575                accessorPool.closeDataFileAccessor(reader);
576            }
577            return rc;
578        }
579    
580        public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
581            Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
582            return loc;
583        }
584    
585        public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
586            Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
587            return loc;
588        }
589    
590        public void update(Location location, ByteSequence data, boolean sync) throws IOException {
591            DataFile dataFile = getDataFile(location);
592            DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
593            try {
594                updater.updateRecord(location, data, sync);
595            } finally {
596                accessorPool.closeDataFileAccessor(updater);
597            }
598        }
599    
600        public File getDirectory() {
601            return directory;
602        }
603    
604        public void setDirectory(File directory) {
605            this.directory = directory;
606        }
607    
608        public String getFilePrefix() {
609            return filePrefix;
610        }
611    
612        public void setFilePrefix(String filePrefix) {
613            this.filePrefix = filePrefix;
614        }
615    
616        public Map<WriteKey, WriteCommand> getInflightWrites() {
617            return inflightWrites;
618        }
619    
620        public Location getLastAppendLocation() {
621            return lastAppendLocation.get();
622        }
623    
624        public void setLastAppendLocation(Location lastSyncedLocation) {
625            this.lastAppendLocation.set(lastSyncedLocation);
626        }
627    
628        public File getDirectoryArchive() {
629            return directoryArchive;
630        }
631    
632        public void setDirectoryArchive(File directoryArchive) {
633            this.directoryArchive = directoryArchive;
634        }
635    
636        public boolean isArchiveDataLogs() {
637            return archiveDataLogs;
638        }
639    
640        public void setArchiveDataLogs(boolean archiveDataLogs) {
641            this.archiveDataLogs = archiveDataLogs;
642        }
643    
644        synchronized public Integer getCurrentDataFileId() {
645            if (dataFiles.isEmpty())
646                return null;
647            return dataFiles.getTail().getDataFileId();
648        }
649    
650        /**
651         * Get a set of files - only valid after start()
652         * 
653         * @return files currently being used
654         */
655        public Set<File> getFiles() {
656            return fileByFileMap.keySet();
657        }
658    
659        public Map<Integer, DataFile> getFileMap() {
660            return new TreeMap<Integer, DataFile>(fileMap);
661        }
662        
663        public long getDiskSize() {
664            long tailLength=0;
665            synchronized( this ) {
666                if( !dataFiles.isEmpty() ) {
667                    tailLength = dataFiles.getTail().getLength();
668                }
669            }
670            
671            long rc = totalLength.get();
672            
673            // The last file is actually at a minimum preferedFileLength big.
674            if( tailLength < preferedFileLength ) {
675                rc -= tailLength;
676                rc += preferedFileLength;
677            }
678            return rc;
679        }
680    
681            public void setReplicationTarget(ReplicationTarget replicationTarget) {
682                    this.replicationTarget = replicationTarget;
683            }
684            public ReplicationTarget getReplicationTarget() {
685                    return replicationTarget;
686            }
687    
688        public String getFileSuffix() {
689            return fileSuffix;
690        }
691    
692        public void setFileSuffix(String fileSuffix) {
693            this.fileSuffix = fileSuffix;
694        }
695    
696            public boolean isChecksum() {
697                    return checksum;
698            }
699    
700            public void setChecksum(boolean checksumWrites) {
701                    this.checksum = checksumWrites;
702            }
703    
704        public boolean isCheckForCorruptionOnStartup() {
705            return checkForCorruptionOnStartup;
706        }
707    
708        public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
709            this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
710        }
711    
712        public void setWriteBatchSize(int writeBatchSize) {
713            this.writeBatchSize = writeBatchSize;
714        }
715        
716        public int getWriteBatchSize() {
717            return writeBatchSize;
718        }
719    }