001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.kaha.impl.async; 018 019 import java.io.ByteArrayInputStream; 020 import java.io.ByteArrayOutputStream; 021 import java.io.DataInputStream; 022 import java.io.DataOutputStream; 023 import java.io.File; 024 import java.io.FilenameFilter; 025 import java.io.IOException; 026 import java.util.ArrayList; 027 import java.util.Collections; 028 import java.util.HashMap; 029 import java.util.HashSet; 030 import java.util.Iterator; 031 import java.util.LinkedHashMap; 032 import java.util.List; 033 import java.util.Map; 034 import java.util.Set; 035 import java.util.concurrent.ConcurrentHashMap; 036 import java.util.concurrent.atomic.AtomicLong; 037 import java.util.concurrent.atomic.AtomicReference; 038 039 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand; 040 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey; 041 import org.apache.activemq.thread.Scheduler; 042 import org.apache.activemq.util.ByteSequence; 043 import org.apache.activemq.util.IOHelper; 044 import org.apache.commons.logging.Log; 045 import org.apache.commons.logging.LogFactory; 046 047 048 049 /** 050 * Manages DataFiles 051 * 052 * @version $Revision: 1.1.1.1 $ 053 */ 054 public class AsyncDataManager { 055 056 public static final int CONTROL_RECORD_MAX_LENGTH = 1024; 057 public static final int ITEM_HEAD_RESERVED_SPACE = 21; 058 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 059 public static final int ITEM_HEAD_SPACE = 4 + 1 + ITEM_HEAD_RESERVED_SPACE + 3; 060 public static final int ITEM_HEAD_OFFSET_TO_SOR = ITEM_HEAD_SPACE - 3; 061 public static final int ITEM_FOOT_SPACE = 3; // EOR 062 063 public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE; 064 065 public static final byte[] ITEM_HEAD_SOR = new byte[] {'S', 'O', 'R'}; // 066 public static final byte[] ITEM_HEAD_EOR = new byte[] {'E', 'O', 'R'}; // 067 068 public static final byte DATA_ITEM_TYPE = 1; 069 public static final byte REDO_ITEM_TYPE = 2; 070 public static final String DEFAULT_DIRECTORY = "data"; 071 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 072 public static final String DEFAULT_FILE_PREFIX = "data-"; 073 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 074 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 075 public static final int PREFERED_DIFF = 1024 * 512; 076 077 private static final Log LOG = LogFactory.getLog(AsyncDataManager.class); 078 protected static Scheduler scheduler = Scheduler.getInstance(); 079 080 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 081 082 protected File directory = new File(DEFAULT_DIRECTORY); 083 protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY); 084 protected String filePrefix = DEFAULT_FILE_PREFIX; 085 protected ControlFile controlFile; 086 protected boolean started; 087 protected boolean useNio = true; 088 089 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 090 protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF; 091 092 protected DataFileAppender appender; 093 protected DataFileAccessorPool accessorPool; 094 095 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 096 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 097 protected DataFile currentWriteFile; 098 099 protected Location mark; 100 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 101 protected Runnable cleanupTask; 102 protected final AtomicLong storeSize; 103 protected boolean archiveDataLogs; 104 105 public AsyncDataManager(AtomicLong storeSize) { 106 this.storeSize=storeSize; 107 } 108 109 public AsyncDataManager() { 110 this(new AtomicLong()); 111 } 112 113 @SuppressWarnings("unchecked") 114 public synchronized void start() throws IOException { 115 if (started) { 116 return; 117 } 118 119 started = true; 120 preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF); 121 lock(); 122 123 accessorPool = new DataFileAccessorPool(this); 124 ByteSequence sequence = controlFile.load(); 125 if (sequence != null && sequence.getLength() > 0) { 126 unmarshallState(sequence); 127 } 128 if (useNio) { 129 appender = new NIODataFileAppender(this); 130 } else { 131 appender = new DataFileAppender(this); 132 } 133 134 File[] files = directory.listFiles(new FilenameFilter() { 135 public boolean accept(File dir, String n) { 136 return dir.equals(directory) && n.startsWith(filePrefix); 137 } 138 }); 139 140 if (files != null) { 141 for (int i = 0; i < files.length; i++) { 142 try { 143 File file = files[i]; 144 String n = file.getName(); 145 String numStr = n.substring(filePrefix.length(), n.length()); 146 int num = Integer.parseInt(numStr); 147 DataFile dataFile = new DataFile(file, num, preferedFileLength); 148 fileMap.put(dataFile.getDataFileId(), dataFile); 149 storeSize.addAndGet(dataFile.getLength()); 150 } catch (NumberFormatException e) { 151 // Ignore file that do not match the pattern. 152 } 153 } 154 155 // Sort the list so that we can link the DataFiles together in the 156 // right order. 157 List<DataFile> l = new ArrayList<DataFile>(fileMap.values()); 158 Collections.sort(l); 159 currentWriteFile = null; 160 for (DataFile df : l) { 161 if (currentWriteFile != null) { 162 currentWriteFile.linkAfter(df); 163 } 164 currentWriteFile = df; 165 fileByFileMap.put(df.getFile(), df); 166 } 167 } 168 169 // Need to check the current Write File to see if there was a partial 170 // write to it. 171 if (currentWriteFile != null) { 172 173 // See if the lastSyncedLocation is valid.. 174 Location l = lastAppendLocation.get(); 175 if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) { 176 l = null; 177 } 178 179 // If we know the last location that was ok.. then we can skip lots 180 // of checking 181 try{ 182 l = recoveryCheck(currentWriteFile, l); 183 lastAppendLocation.set(l); 184 }catch(IOException e){ 185 LOG.warn("recovery check failed", e); 186 } 187 } 188 189 storeState(false); 190 191 cleanupTask = new Runnable() { 192 public void run() { 193 cleanup(); 194 } 195 }; 196 scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL); 197 } 198 199 public void lock() throws IOException { 200 synchronized (this) { 201 if (controlFile == null || controlFile.isDisposed()) { 202 IOHelper.mkdirs(directory); 203 controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH); 204 } 205 controlFile.lock(); 206 } 207 } 208 209 protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException { 210 if (location == null) { 211 location = new Location(); 212 location.setDataFileId(dataFile.getDataFileId()); 213 location.setOffset(0); 214 } 215 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 216 try { 217 reader.readLocationDetails(location); 218 while (reader.readLocationDetailsAndValidate(location)) { 219 location.setOffset(location.getOffset() + location.getSize()); 220 } 221 } finally { 222 accessorPool.closeDataFileAccessor(reader); 223 } 224 dataFile.setLength(location.getOffset()); 225 return location; 226 } 227 228 protected void unmarshallState(ByteSequence sequence) throws IOException { 229 ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength()); 230 DataInputStream dis = new DataInputStream(bais); 231 if (dis.readBoolean()) { 232 mark = new Location(); 233 mark.readExternal(dis); 234 } else { 235 mark = null; 236 } 237 if (dis.readBoolean()) { 238 Location l = new Location(); 239 l.readExternal(dis); 240 lastAppendLocation.set(l); 241 } else { 242 lastAppendLocation.set(null); 243 } 244 } 245 246 private synchronized ByteSequence marshallState() throws IOException { 247 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 248 DataOutputStream dos = new DataOutputStream(baos); 249 250 if (mark != null) { 251 dos.writeBoolean(true); 252 mark.writeExternal(dos); 253 } else { 254 dos.writeBoolean(false); 255 } 256 Location l = lastAppendLocation.get(); 257 if (l != null) { 258 dos.writeBoolean(true); 259 l.writeExternal(dos); 260 } else { 261 dos.writeBoolean(false); 262 } 263 264 byte[] bs = baos.toByteArray(); 265 return new ByteSequence(bs, 0, bs.length); 266 } 267 268 synchronized DataFile allocateLocation(Location location) throws IOException { 269 if (currentWriteFile == null || ((currentWriteFile.getLength() + location.getSize()) > maxFileLength)) { 270 int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1; 271 272 String fileName = filePrefix + nextNum; 273 File file = new File(directory, fileName); 274 DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength); 275 //actually allocate the disk space 276 nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true)); 277 fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile); 278 fileByFileMap.put(file, nextWriteFile); 279 if (currentWriteFile != null) { 280 currentWriteFile.linkAfter(nextWriteFile); 281 if (currentWriteFile.isUnused()) { 282 removeDataFile(currentWriteFile); 283 } 284 } 285 currentWriteFile = nextWriteFile; 286 287 } 288 location.setOffset(currentWriteFile.getLength()); 289 location.setDataFileId(currentWriteFile.getDataFileId().intValue()); 290 int size = location.getSize(); 291 currentWriteFile.incrementLength(size); 292 currentWriteFile.increment(); 293 storeSize.addAndGet(size); 294 return currentWriteFile; 295 } 296 297 public synchronized void removeLocation(Location location) throws IOException{ 298 299 DataFile dataFile = getDataFile(location); 300 dataFile.decrement(); 301 } 302 303 synchronized DataFile getDataFile(Location item) throws IOException { 304 Integer key = Integer.valueOf(item.getDataFileId()); 305 DataFile dataFile = fileMap.get(key); 306 if (dataFile == null) { 307 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 308 throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId()); 309 } 310 return dataFile; 311 } 312 313 synchronized File getFile(Location item) throws IOException { 314 Integer key = Integer.valueOf(item.getDataFileId()); 315 DataFile dataFile = fileMap.get(key); 316 if (dataFile == null) { 317 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 318 throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId()); 319 } 320 return dataFile.getFile(); 321 } 322 323 private DataFile getNextDataFile(DataFile dataFile) { 324 return (DataFile)dataFile.getNext(); 325 } 326 327 public synchronized void close() throws IOException { 328 if (!started) { 329 return; 330 } 331 scheduler.cancel(cleanupTask); 332 accessorPool.close(); 333 storeState(false); 334 appender.close(); 335 fileMap.clear(); 336 fileByFileMap.clear(); 337 controlFile.unlock(); 338 controlFile.dispose(); 339 started = false; 340 } 341 342 synchronized void cleanup() { 343 if (accessorPool != null) { 344 accessorPool.disposeUnused(); 345 } 346 } 347 348 public synchronized boolean delete() throws IOException { 349 350 // Close all open file handles... 351 appender.close(); 352 accessorPool.close(); 353 354 boolean result = true; 355 for (Iterator i = fileMap.values().iterator(); i.hasNext();) { 356 DataFile dataFile = (DataFile)i.next(); 357 storeSize.addAndGet(-dataFile.getLength()); 358 result &= dataFile.delete(); 359 } 360 fileMap.clear(); 361 fileByFileMap.clear(); 362 lastAppendLocation.set(null); 363 mark = null; 364 currentWriteFile = null; 365 366 // reopen open file handles... 367 accessorPool = new DataFileAccessorPool(this); 368 if (useNio) { 369 appender = new NIODataFileAppender(this); 370 } else { 371 appender = new DataFileAppender(this); 372 } 373 return result; 374 } 375 376 public synchronized void addInterestInFile(int file) throws IOException { 377 if (file >= 0) { 378 Integer key = Integer.valueOf(file); 379 DataFile dataFile = (DataFile)fileMap.get(key); 380 if (dataFile == null) { 381 throw new IOException("That data file does not exist"); 382 } 383 addInterestInFile(dataFile); 384 } 385 } 386 387 synchronized void addInterestInFile(DataFile dataFile) { 388 if (dataFile != null) { 389 dataFile.increment(); 390 } 391 } 392 393 public synchronized void removeInterestInFile(int file) throws IOException { 394 if (file >= 0) { 395 Integer key = Integer.valueOf(file); 396 DataFile dataFile = (DataFile)fileMap.get(key); 397 removeInterestInFile(dataFile); 398 } 399 400 } 401 402 synchronized void removeInterestInFile(DataFile dataFile) throws IOException { 403 if (dataFile != null) { 404 if (dataFile.decrement() <= 0) { 405 removeDataFile(dataFile); 406 } 407 } 408 } 409 410 public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer>inProgress) throws IOException { 411 Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet()); 412 unUsed.removeAll(inUse); 413 unUsed.removeAll(inProgress); 414 415 List<DataFile> purgeList = new ArrayList<DataFile>(); 416 for (Integer key : unUsed) { 417 DataFile dataFile = (DataFile)fileMap.get(key); 418 purgeList.add(dataFile); 419 } 420 for (DataFile dataFile : purgeList) { 421 if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) { 422 forceRemoveDataFile(dataFile); 423 } 424 } 425 } 426 427 public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException { 428 Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet()); 429 unUsed.removeAll(inUse); 430 431 List<DataFile> purgeList = new ArrayList<DataFile>(); 432 for (Integer key : unUsed) { 433 // Only add files less than the lastFile.. 434 if( key.intValue() < lastFile.intValue() ) { 435 DataFile dataFile = (DataFile)fileMap.get(key); 436 purgeList.add(dataFile); 437 } 438 } 439 if (LOG.isDebugEnabled()) { 440 LOG.debug("lastFileId=" + lastFile + ", purgeList: (" + purgeList.size() + ") " + purgeList); 441 } 442 for (DataFile dataFile : purgeList) { 443 forceRemoveDataFile(dataFile); 444 } 445 } 446 447 public synchronized void consolidateDataFiles() throws IOException { 448 List<DataFile> purgeList = new ArrayList<DataFile>(); 449 for (DataFile dataFile : fileMap.values()) { 450 if (dataFile.isUnused()) { 451 purgeList.add(dataFile); 452 } 453 } 454 for (DataFile dataFile : purgeList) { 455 removeDataFile(dataFile); 456 } 457 } 458 459 private synchronized void removeDataFile(DataFile dataFile) throws IOException { 460 461 // Make sure we don't delete too much data. 462 if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) { 463 LOG.debug("Won't remove DataFile" + dataFile); 464 return; 465 } 466 forceRemoveDataFile(dataFile); 467 } 468 469 private synchronized void forceRemoveDataFile(DataFile dataFile) 470 throws IOException { 471 accessorPool.disposeDataFileAccessors(dataFile); 472 fileByFileMap.remove(dataFile.getFile()); 473 fileMap.remove(dataFile.getDataFileId()); 474 storeSize.addAndGet(-dataFile.getLength()); 475 dataFile.unlink(); 476 if (archiveDataLogs) { 477 dataFile.move(getDirectoryArchive()); 478 LOG.debug("moved data file " + dataFile + " to " 479 + getDirectoryArchive()); 480 } else { 481 boolean result = dataFile.delete(); 482 if (!result) { 483 LOG.info("Failed to discard data file " + dataFile); 484 } 485 } 486 } 487 488 /** 489 * @return the maxFileLength 490 */ 491 public int getMaxFileLength() { 492 return maxFileLength; 493 } 494 495 /** 496 * @param maxFileLength the maxFileLength to set 497 */ 498 public void setMaxFileLength(int maxFileLength) { 499 this.maxFileLength = maxFileLength; 500 } 501 502 public String toString() { 503 return "DataManager:(" + filePrefix + ")"; 504 } 505 506 public synchronized Location getMark() throws IllegalStateException { 507 return mark; 508 } 509 510 public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException { 511 512 Location cur = null; 513 while (true) { 514 if (cur == null) { 515 if (location == null) { 516 DataFile head = (DataFile)currentWriteFile.getHeadNode(); 517 cur = new Location(); 518 cur.setDataFileId(head.getDataFileId()); 519 cur.setOffset(0); 520 } else { 521 // Set to the next offset.. 522 if( location.getSize() == -1 ) { 523 cur = new Location(location); 524 } else { 525 cur = new Location(location); 526 cur.setOffset(location.getOffset()+location.getSize()); 527 } 528 } 529 } else { 530 cur.setOffset(cur.getOffset() + cur.getSize()); 531 } 532 533 DataFile dataFile = getDataFile(cur); 534 535 // Did it go into the next file?? 536 if (dataFile.getLength() <= cur.getOffset()) { 537 dataFile = getNextDataFile(dataFile); 538 if (dataFile == null) { 539 return null; 540 } else { 541 cur.setDataFileId(dataFile.getDataFileId().intValue()); 542 cur.setOffset(0); 543 } 544 } 545 546 // Load in location size and type. 547 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 548 try { 549 reader.readLocationDetails(cur); 550 } finally { 551 accessorPool.closeDataFileAccessor(reader); 552 } 553 554 if (cur.getType() == 0) { 555 return null; 556 } else if (cur.getType() > 0) { 557 // Only return user records. 558 return cur; 559 } 560 } 561 } 562 563 public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{ 564 DataFile df = fileByFileMap.get(file); 565 return getNextLocation(df, lastLocation,thisFileOnly); 566 } 567 568 public synchronized Location getNextLocation(DataFile dataFile, 569 Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException { 570 571 Location cur = null; 572 while (true) { 573 if (cur == null) { 574 if (lastLocation == null) { 575 DataFile head = (DataFile)dataFile.getHeadNode(); 576 cur = new Location(); 577 cur.setDataFileId(head.getDataFileId()); 578 cur.setOffset(0); 579 } else { 580 // Set to the next offset.. 581 cur = new Location(lastLocation); 582 cur.setOffset(cur.getOffset() + cur.getSize()); 583 } 584 } else { 585 cur.setOffset(cur.getOffset() + cur.getSize()); 586 } 587 588 589 // Did it go into the next file?? 590 if (dataFile.getLength() <= cur.getOffset()) { 591 if (thisFileOnly) { 592 return null; 593 }else { 594 dataFile = getNextDataFile(dataFile); 595 if (dataFile == null) { 596 return null; 597 } else { 598 cur.setDataFileId(dataFile.getDataFileId().intValue()); 599 cur.setOffset(0); 600 } 601 } 602 } 603 604 // Load in location size and type. 605 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 606 try { 607 reader.readLocationDetails(cur); 608 } finally { 609 accessorPool.closeDataFileAccessor(reader); 610 } 611 612 if (cur.getType() == 0) { 613 return null; 614 } else if (cur.getType() > 0) { 615 // Only return user records. 616 return cur; 617 } 618 } 619 } 620 621 public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException { 622 DataFile dataFile = getDataFile(location); 623 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 624 ByteSequence rc = null; 625 try { 626 rc = reader.readRecord(location); 627 } finally { 628 accessorPool.closeDataFileAccessor(reader); 629 } 630 return rc; 631 } 632 633 public void setMark(Location location, boolean sync) throws IOException, IllegalStateException { 634 synchronized (this) { 635 mark = location; 636 } 637 storeState(sync); 638 } 639 640 protected synchronized void storeState(boolean sync) throws IOException { 641 ByteSequence state = marshallState(); 642 appender.storeItem(state, Location.MARK_TYPE, sync); 643 controlFile.store(state, sync); 644 } 645 646 public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 647 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 648 return loc; 649 } 650 651 public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 652 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 653 return loc; 654 } 655 656 public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException { 657 return appender.storeItem(data, type, sync); 658 } 659 660 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 661 DataFile dataFile = getDataFile(location); 662 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 663 try { 664 updater.updateRecord(location, data, sync); 665 } finally { 666 accessorPool.closeDataFileAccessor(updater); 667 } 668 } 669 670 public File getDirectory() { 671 return directory; 672 } 673 674 public void setDirectory(File directory) { 675 this.directory = directory; 676 } 677 678 public String getFilePrefix() { 679 return filePrefix; 680 } 681 682 public void setFilePrefix(String filePrefix) { 683 this.filePrefix = IOHelper.toFileSystemSafeName(filePrefix); 684 } 685 686 public Map<WriteKey, WriteCommand> getInflightWrites() { 687 return inflightWrites; 688 } 689 690 public Location getLastAppendLocation() { 691 return lastAppendLocation.get(); 692 } 693 694 public void setLastAppendLocation(Location lastSyncedLocation) { 695 this.lastAppendLocation.set(lastSyncedLocation); 696 } 697 698 public boolean isUseNio() { 699 return useNio; 700 } 701 702 public void setUseNio(boolean useNio) { 703 this.useNio = useNio; 704 } 705 706 public File getDirectoryArchive() { 707 return directoryArchive; 708 } 709 710 public void setDirectoryArchive(File directoryArchive) { 711 this.directoryArchive = directoryArchive; 712 } 713 714 public boolean isArchiveDataLogs() { 715 return archiveDataLogs; 716 } 717 718 public void setArchiveDataLogs(boolean archiveDataLogs) { 719 this.archiveDataLogs = archiveDataLogs; 720 } 721 722 synchronized public Integer getCurrentDataFileId() { 723 if( currentWriteFile==null ) 724 return null; 725 return currentWriteFile.getDataFileId(); 726 } 727 728 /** 729 * Get a set of files - only valid after start() 730 * @return files currently being used 731 */ 732 public Set<File> getFiles(){ 733 return fileByFileMap.keySet(); 734 } 735 736 synchronized public long getDiskSize() { 737 long rc=0; 738 DataFile cur = (DataFile)currentWriteFile.getHeadNode(); 739 while( cur !=null ) { 740 rc += cur.getLength(); 741 cur = (DataFile) cur.getNext(); 742 } 743 return rc; 744 } 745 746 synchronized public long getDiskSizeUntil(Location startPosition) { 747 long rc=0; 748 DataFile cur = (DataFile)currentWriteFile.getHeadNode(); 749 while( cur !=null ) { 750 if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) { 751 return rc + startPosition.getOffset(); 752 } 753 rc += cur.getLength(); 754 cur = (DataFile) cur.getNext(); 755 } 756 return rc; 757 } 758 759 }