001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.kahadb.journal; 018 019 import java.io.File; 020 import java.io.FilenameFilter; 021 import java.io.IOException; 022 import java.io.UnsupportedEncodingException; 023 import java.util.*; 024 import java.util.concurrent.ConcurrentHashMap; 025 import java.util.concurrent.atomic.AtomicLong; 026 import java.util.concurrent.atomic.AtomicReference; 027 import java.util.zip.Adler32; 028 import java.util.zip.Checksum; 029 030 import org.apache.commons.logging.Log; 031 import org.apache.commons.logging.LogFactory; 032 import org.apache.kahadb.journal.DataFileAppender.WriteCommand; 033 import org.apache.kahadb.journal.DataFileAppender.WriteKey; 034 import org.apache.kahadb.util.*; 035 036 /** 037 * Manages DataFiles 038 * 039 * @version $Revision: 813467 $ 040 */ 041 public class Journal { 042 043 private static final int MAX_BATCH_SIZE = 32*1024*1024; 044 045 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 046 public static final int RECORD_HEAD_SPACE = 4 + 1; 047 048 public static final byte USER_RECORD_TYPE = 1; 049 public static final byte BATCH_CONTROL_RECORD_TYPE = 2; 050 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 051 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); 052 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8; 053 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); 054 055 private static byte[] createBatchControlRecordHeader() { 056 try { 057 DataByteArrayOutputStream os = new DataByteArrayOutputStream(); 058 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 059 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 060 os.write(BATCH_CONTROL_RECORD_MAGIC); 061 ByteSequence sequence = os.toByteSequence(); 062 sequence.compact(); 063 return sequence.getData(); 064 } catch (IOException e) { 065 throw new RuntimeException("Could not create batch control record header."); 066 } 067 } 068 069 public static final String DEFAULT_DIRECTORY = "."; 070 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 071 public static final String DEFAULT_FILE_PREFIX = "db-"; 072 public static final String DEFAULT_FILE_SUFFIX = ".log"; 073 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 074 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 075 public static final int PREFERED_DIFF = 1024 * 512; 076 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; 077 078 private static final Log LOG = LogFactory.getLog(Journal.class); 079 080 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 081 082 protected File directory = new File(DEFAULT_DIRECTORY); 083 protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY); 084 protected String filePrefix = DEFAULT_FILE_PREFIX; 085 protected String fileSuffix = DEFAULT_FILE_SUFFIX; 086 protected boolean started; 087 088 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 089 protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF; 090 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; 091 092 protected DataFileAppender appender; 093 protected DataFileAccessorPool accessorPool; 094 095 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 096 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 097 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>(); 098 099 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 100 protected Runnable cleanupTask; 101 protected final AtomicLong totalLength = new AtomicLong(); 102 protected boolean archiveDataLogs; 103 private ReplicationTarget replicationTarget; 104 protected boolean checksum; 105 protected boolean checkForCorruptionOnStartup; 106 107 108 109 public synchronized void start() throws IOException { 110 if (started) { 111 return; 112 } 113 114 long start = System.currentTimeMillis(); 115 accessorPool = new DataFileAccessorPool(this); 116 started = true; 117 preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF); 118 119 appender = new DataFileAppender(this); 120 121 File[] files = directory.listFiles(new FilenameFilter() { 122 public boolean accept(File dir, String n) { 123 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix); 124 } 125 }); 126 127 if (files != null) { 128 for (int i = 0; i < files.length; i++) { 129 try { 130 File file = files[i]; 131 String n = file.getName(); 132 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length()); 133 int num = Integer.parseInt(numStr); 134 DataFile dataFile = new DataFile(file, num, preferedFileLength); 135 fileMap.put(dataFile.getDataFileId(), dataFile); 136 totalLength.addAndGet(dataFile.getLength()); 137 } catch (NumberFormatException e) { 138 // Ignore file that do not match the pattern. 139 } 140 } 141 142 // Sort the list so that we can link the DataFiles together in the 143 // right order. 144 List<DataFile> l = new ArrayList<DataFile>(fileMap.values()); 145 Collections.sort(l); 146 for (DataFile df : l) { 147 dataFiles.addLast(df); 148 fileByFileMap.put(df.getFile(), df); 149 150 if( isCheckForCorruptionOnStartup() ) { 151 lastAppendLocation.set(recoveryCheck(df)); 152 } 153 } 154 } 155 156 getCurrentWriteFile(); 157 158 if( lastAppendLocation.get()==null ) { 159 DataFile df = dataFiles.getTail(); 160 lastAppendLocation.set(recoveryCheck(df)); 161 } 162 163 cleanupTask = new Runnable() { 164 public void run() { 165 cleanup(); 166 } 167 }; 168 Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL); 169 long end = System.currentTimeMillis(); 170 LOG.trace("Startup took: "+(end-start)+" ms"); 171 } 172 173 private static byte[] bytes(String string) { 174 try { 175 return string.getBytes("UTF-8"); 176 } catch (UnsupportedEncodingException e) { 177 throw new RuntimeException(e); 178 } 179 } 180 181 protected Location recoveryCheck(DataFile dataFile) throws IOException { 182 Location location = new Location(); 183 location.setDataFileId(dataFile.getDataFileId()); 184 location.setOffset(0); 185 186 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 187 try { 188 while( true ) { 189 int size = checkBatchRecord(reader, location.getOffset()); 190 if ( size>=0 ) { 191 location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size); 192 } else { 193 194 // Perhaps it's just some corruption... scan through the file to find the next valid batch record. We 195 // may have subsequent valid batch records. 196 int nextOffset = findNextBatchRecord(reader, location.getOffset()+1); 197 if( nextOffset >=0 ) { 198 Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); 199 LOG.info("Corrupt journal records found in '"+dataFile.getFile()+"' between offsets: "+sequence); 200 dataFile.corruptedBlocks.add(sequence); 201 location.setOffset(nextOffset); 202 } else { 203 break; 204 } 205 } 206 } 207 208 } catch (IOException e) { 209 } finally { 210 accessorPool.closeDataFileAccessor(reader); 211 } 212 213 dataFile.setLength(location.getOffset()); 214 215 if( !dataFile.corruptedBlocks.isEmpty() ) { 216 // Is the end of the data file corrupted? 217 if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) { 218 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); 219 } 220 } 221 222 return location; 223 } 224 225 private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException { 226 ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); 227 byte data[] = new byte[1024*4]; 228 ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data)); 229 230 int pos = 0; 231 while( true ) { 232 pos = bs.indexOf(header, pos); 233 if( pos >= 0 ) { 234 return offset+pos; 235 } else { 236 // need to load the next data chunck in.. 237 if( bs.length != data.length ) { 238 // If we had a short read then we were at EOF 239 return -1; 240 } 241 offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length; 242 bs = new ByteSequence(data, 0, reader.read(offset, data)); 243 pos=0; 244 } 245 } 246 } 247 248 249 public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException { 250 byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE]; 251 DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord); 252 253 reader.readFully(offset, controlRecord); 254 255 // Assert that it's a batch record. 256 for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) { 257 if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) { 258 return -1; 259 } 260 } 261 262 int size = controlIs.readInt(); 263 if( size > MAX_BATCH_SIZE ) { 264 return -1; 265 } 266 267 if( isChecksum() ) { 268 269 long expectedChecksum = controlIs.readLong(); 270 if( expectedChecksum == 0 ) { 271 // Checksuming was not enabled when the record was stored. 272 // we can't validate the record :( 273 return size; 274 } 275 276 byte data[] = new byte[size]; 277 reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data); 278 279 Checksum checksum = new Adler32(); 280 checksum.update(data, 0, data.length); 281 282 if( expectedChecksum!=checksum.getValue() ) { 283 return -1; 284 } 285 286 } 287 return size; 288 } 289 290 291 void addToTotalLength(int size) { 292 totalLength.addAndGet(size); 293 } 294 295 296 synchronized DataFile getCurrentWriteFile() throws IOException { 297 if (dataFiles.isEmpty()) { 298 rotateWriteFile(); 299 } 300 return dataFiles.getTail(); 301 } 302 303 synchronized DataFile rotateWriteFile() { 304 int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1; 305 File file = getFile(nextNum); 306 DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength); 307 // actually allocate the disk space 308 fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile); 309 fileByFileMap.put(file, nextWriteFile); 310 dataFiles.addLast(nextWriteFile); 311 return nextWriteFile; 312 } 313 314 public File getFile(int nextNum) { 315 String fileName = filePrefix + nextNum + fileSuffix; 316 File file = new File(directory, fileName); 317 return file; 318 } 319 320 synchronized DataFile getDataFile(Location item) throws IOException { 321 Integer key = Integer.valueOf(item.getDataFileId()); 322 DataFile dataFile = fileMap.get(key); 323 if (dataFile == null) { 324 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 325 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 326 } 327 return dataFile; 328 } 329 330 synchronized File getFile(Location item) throws IOException { 331 Integer key = Integer.valueOf(item.getDataFileId()); 332 DataFile dataFile = fileMap.get(key); 333 if (dataFile == null) { 334 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 335 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 336 } 337 return dataFile.getFile(); 338 } 339 340 private DataFile getNextDataFile(DataFile dataFile) { 341 return dataFile.getNext(); 342 } 343 344 public synchronized void close() throws IOException { 345 if (!started) { 346 return; 347 } 348 Scheduler.cancel(cleanupTask); 349 accessorPool.close(); 350 appender.close(); 351 fileMap.clear(); 352 fileByFileMap.clear(); 353 dataFiles.clear(); 354 lastAppendLocation.set(null); 355 started = false; 356 } 357 358 synchronized void cleanup() { 359 if (accessorPool != null) { 360 accessorPool.disposeUnused(); 361 } 362 } 363 364 public synchronized boolean delete() throws IOException { 365 366 // Close all open file handles... 367 appender.close(); 368 accessorPool.close(); 369 370 boolean result = true; 371 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 372 DataFile dataFile = i.next(); 373 totalLength.addAndGet(-dataFile.getLength()); 374 result &= dataFile.delete(); 375 } 376 fileMap.clear(); 377 fileByFileMap.clear(); 378 lastAppendLocation.set(null); 379 dataFiles = new LinkedNodeList<DataFile>(); 380 381 // reopen open file handles... 382 accessorPool = new DataFileAccessorPool(this); 383 appender = new DataFileAppender(this); 384 return result; 385 } 386 387 public synchronized void removeDataFiles(Set<Integer> files) throws IOException { 388 for (Integer key : files) { 389 // Can't remove the data file (or subsequent files) that is currently being written to. 390 if( key >= lastAppendLocation.get().getDataFileId() ) { 391 continue; 392 } 393 DataFile dataFile = fileMap.get(key); 394 if( dataFile!=null ) { 395 forceRemoveDataFile(dataFile); 396 } 397 } 398 } 399 400 private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException { 401 accessorPool.disposeDataFileAccessors(dataFile); 402 fileByFileMap.remove(dataFile.getFile()); 403 fileMap.remove(dataFile.getDataFileId()); 404 totalLength.addAndGet(-dataFile.getLength()); 405 dataFile.unlink(); 406 if (archiveDataLogs) { 407 dataFile.move(getDirectoryArchive()); 408 LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive()); 409 } else { 410 if ( dataFile.delete() ) { 411 LOG.debug("Discarded data file " + dataFile); 412 } else { 413 LOG.warn("Failed to discard data file " + dataFile.getFile()); 414 } 415 } 416 } 417 418 /** 419 * @return the maxFileLength 420 */ 421 public int getMaxFileLength() { 422 return maxFileLength; 423 } 424 425 /** 426 * @param maxFileLength the maxFileLength to set 427 */ 428 public void setMaxFileLength(int maxFileLength) { 429 this.maxFileLength = maxFileLength; 430 } 431 432 public String toString() { 433 return directory.toString(); 434 } 435 436 public synchronized void appendedExternally(Location loc, int length) throws IOException { 437 DataFile dataFile = null; 438 if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) { 439 // It's an update to the current log file.. 440 dataFile = dataFiles.getTail(); 441 dataFile.incrementLength(length); 442 } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) { 443 // It's an update to the next log file. 444 int nextNum = loc.getDataFileId(); 445 File file = getFile(nextNum); 446 dataFile = new DataFile(file, nextNum, preferedFileLength); 447 // actually allocate the disk space 448 fileMap.put(dataFile.getDataFileId(), dataFile); 449 fileByFileMap.put(file, dataFile); 450 dataFiles.addLast(dataFile); 451 } else { 452 throw new IOException("Invalid external append."); 453 } 454 } 455 456 public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException { 457 458 Location cur = null; 459 while (true) { 460 if (cur == null) { 461 if (location == null) { 462 DataFile head = dataFiles.getHead(); 463 if( head == null ) { 464 return null; 465 } 466 cur = new Location(); 467 cur.setDataFileId(head.getDataFileId()); 468 cur.setOffset(0); 469 } else { 470 // Set to the next offset.. 471 if (location.getSize() == -1) { 472 cur = new Location(location); 473 } else { 474 cur = new Location(location); 475 cur.setOffset(location.getOffset() + location.getSize()); 476 } 477 } 478 } else { 479 cur.setOffset(cur.getOffset() + cur.getSize()); 480 } 481 482 DataFile dataFile = getDataFile(cur); 483 484 // Did it go into the next file?? 485 if (dataFile.getLength() <= cur.getOffset()) { 486 dataFile = getNextDataFile(dataFile); 487 if (dataFile == null) { 488 return null; 489 } else { 490 cur.setDataFileId(dataFile.getDataFileId().intValue()); 491 cur.setOffset(0); 492 } 493 } 494 495 // Load in location size and type. 496 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 497 try { 498 reader.readLocationDetails(cur); 499 } finally { 500 accessorPool.closeDataFileAccessor(reader); 501 } 502 503 if (cur.getType() == 0) { 504 return null; 505 } else if (cur.getType() == USER_RECORD_TYPE) { 506 // Only return user records. 507 return cur; 508 } 509 } 510 } 511 512 public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException { 513 DataFile df = fileByFileMap.get(file); 514 return getNextLocation(df, lastLocation, thisFileOnly); 515 } 516 517 public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException { 518 519 Location cur = null; 520 while (true) { 521 if (cur == null) { 522 if (lastLocation == null) { 523 DataFile head = dataFile.getHeadNode(); 524 cur = new Location(); 525 cur.setDataFileId(head.getDataFileId()); 526 cur.setOffset(0); 527 } else { 528 // Set to the next offset.. 529 cur = new Location(lastLocation); 530 cur.setOffset(cur.getOffset() + cur.getSize()); 531 } 532 } else { 533 cur.setOffset(cur.getOffset() + cur.getSize()); 534 } 535 536 // Did it go into the next file?? 537 if (dataFile.getLength() <= cur.getOffset()) { 538 if (thisFileOnly) { 539 return null; 540 } else { 541 dataFile = getNextDataFile(dataFile); 542 if (dataFile == null) { 543 return null; 544 } else { 545 cur.setDataFileId(dataFile.getDataFileId().intValue()); 546 cur.setOffset(0); 547 } 548 } 549 } 550 551 // Load in location size and type. 552 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 553 try { 554 reader.readLocationDetails(cur); 555 } finally { 556 accessorPool.closeDataFileAccessor(reader); 557 } 558 559 if (cur.getType() == 0) { 560 return null; 561 } else if (cur.getType() > 0) { 562 // Only return user records. 563 return cur; 564 } 565 } 566 } 567 568 public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException { 569 DataFile dataFile = getDataFile(location); 570 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 571 ByteSequence rc = null; 572 try { 573 rc = reader.readRecord(location); 574 } finally { 575 accessorPool.closeDataFileAccessor(reader); 576 } 577 return rc; 578 } 579 580 public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 581 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 582 return loc; 583 } 584 585 public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 586 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 587 return loc; 588 } 589 590 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 591 DataFile dataFile = getDataFile(location); 592 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 593 try { 594 updater.updateRecord(location, data, sync); 595 } finally { 596 accessorPool.closeDataFileAccessor(updater); 597 } 598 } 599 600 public File getDirectory() { 601 return directory; 602 } 603 604 public void setDirectory(File directory) { 605 this.directory = directory; 606 } 607 608 public String getFilePrefix() { 609 return filePrefix; 610 } 611 612 public void setFilePrefix(String filePrefix) { 613 this.filePrefix = filePrefix; 614 } 615 616 public Map<WriteKey, WriteCommand> getInflightWrites() { 617 return inflightWrites; 618 } 619 620 public Location getLastAppendLocation() { 621 return lastAppendLocation.get(); 622 } 623 624 public void setLastAppendLocation(Location lastSyncedLocation) { 625 this.lastAppendLocation.set(lastSyncedLocation); 626 } 627 628 public File getDirectoryArchive() { 629 return directoryArchive; 630 } 631 632 public void setDirectoryArchive(File directoryArchive) { 633 this.directoryArchive = directoryArchive; 634 } 635 636 public boolean isArchiveDataLogs() { 637 return archiveDataLogs; 638 } 639 640 public void setArchiveDataLogs(boolean archiveDataLogs) { 641 this.archiveDataLogs = archiveDataLogs; 642 } 643 644 synchronized public Integer getCurrentDataFileId() { 645 if (dataFiles.isEmpty()) 646 return null; 647 return dataFiles.getTail().getDataFileId(); 648 } 649 650 /** 651 * Get a set of files - only valid after start() 652 * 653 * @return files currently being used 654 */ 655 public Set<File> getFiles() { 656 return fileByFileMap.keySet(); 657 } 658 659 public Map<Integer, DataFile> getFileMap() { 660 return new TreeMap<Integer, DataFile>(fileMap); 661 } 662 663 public long getDiskSize() { 664 long tailLength=0; 665 synchronized( this ) { 666 if( !dataFiles.isEmpty() ) { 667 tailLength = dataFiles.getTail().getLength(); 668 } 669 } 670 671 long rc = totalLength.get(); 672 673 // The last file is actually at a minimum preferedFileLength big. 674 if( tailLength < preferedFileLength ) { 675 rc -= tailLength; 676 rc += preferedFileLength; 677 } 678 return rc; 679 } 680 681 public void setReplicationTarget(ReplicationTarget replicationTarget) { 682 this.replicationTarget = replicationTarget; 683 } 684 public ReplicationTarget getReplicationTarget() { 685 return replicationTarget; 686 } 687 688 public String getFileSuffix() { 689 return fileSuffix; 690 } 691 692 public void setFileSuffix(String fileSuffix) { 693 this.fileSuffix = fileSuffix; 694 } 695 696 public boolean isChecksum() { 697 return checksum; 698 } 699 700 public void setChecksum(boolean checksumWrites) { 701 this.checksum = checksumWrites; 702 } 703 704 public boolean isCheckForCorruptionOnStartup() { 705 return checkForCorruptionOnStartup; 706 } 707 708 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { 709 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; 710 } 711 712 public void setWriteBatchSize(int writeBatchSize) { 713 this.writeBatchSize = writeBatchSize; 714 } 715 716 public int getWriteBatchSize() { 717 return writeBatchSize; 718 } 719 }