001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2008 Sun Microsystems, Inc. 026 */ 027 028 029 package org.opends.server.backends.jeb.importLDIF; 030 031 import org.opends.server.types.Entry; 032 import org.opends.server.backends.jeb.Index; 033 import org.opends.server.backends.jeb.EntryID; 034 import com.sleepycat.je.Transaction; 035 import com.sleepycat.je.DatabaseException; 036 import com.sleepycat.je.DatabaseEntry; 037 import com.sleepycat.je.dbi.MemoryBudget; 038 import static org.opends.server.loggers.ErrorLogger.logError; 039 import org.opends.messages.Message; 040 import static org.opends.messages.JebMessages.*; 041 import java.util.*; 042 import java.util.concurrent.locks.ReentrantLock; 043 044 045 /** 046 * Manages a shared cache among worker threads that caches substring 047 * key/value pairs to avoid DB cache access. Once the cache is above it's 048 * memory usage limit, it will start slowly flushing keys (similar to the 049 * JEB eviction process) until it is under the limit. 050 */ 051 052 public class BufferManager { 053 054 //Memory usage counter. 055 private long memoryUsage=0; 056 057 //Memory limit. 058 private long memoryLimit; 059 060 //Next element in the cache to start flushing at during next flushAll cycle. 061 private KeyHashElement nextElem; 062 063 //Extra bytes to flushAll. 064 private final int extraBytes = 1024 * 1024; 065 066 //Counters for statistics, total is number of accesses, hit is number of 067 //keys found in cache. 068 private long total=0, hit=0; 069 070 //Actual map used to buffer keys. 071 private TreeMap<KeyHashElement, KeyHashElement> elementMap = 072 new TreeMap<KeyHashElement, KeyHashElement>(); 073 074 //The current backup map being used. 075 private int currentMap = 1; 076 077 //Reference to use when the maps are switched. 078 private TreeMap<KeyHashElement, KeyHashElement> backupMap; 079 080 //The two backup maps to insert into if the main element map is being used. 081 private TreeMap<KeyHashElement, KeyHashElement> backupMap2 = 082 new TreeMap<KeyHashElement, KeyHashElement>(); 083 private TreeMap<KeyHashElement, KeyHashElement> backupMap1 = 084 new TreeMap<KeyHashElement, KeyHashElement>(); 085 086 //Overhead values determined from using JHAT. They appear to be the same 087 //for both 32 and 64 bit machines. Close enough. 088 private final static int TREEMAP_ENTRY_OVERHEAD = 29; 089 private final static int KEY_ELEMENT_OVERHEAD = 32; 090 091 //Lock used to get main element map. 092 private ReentrantLock lock = new ReentrantLock(); 093 094 //Object to synchronize on if backup maps are being written. 095 private Object backupSynchObj = new Object(); 096 097 /** 098 * Create buffer manager instance. 099 * 100 * @param memoryLimit The memory limit. 101 * @param importThreadCount The count of import worker threads. 102 */ 103 public BufferManager(long memoryLimit, int importThreadCount) { 104 this.memoryLimit = memoryLimit; 105 this.nextElem = null; 106 this.backupMap = backupMap1; 107 } 108 109 /** 110 * Insert an entry ID into the buffer using the both the specified index and 111 * entry to build a key set. Will flush the buffer if over the memory limit. 112 * 113 * @param index The index to use. 114 * @param entry The entry used to build the key set. 115 * @param entryID The entry ID to insert into the key set. 116 * @param txn A transaction. 117 * @param keySet Keyset hash to store the keys in. 118 * @throws DatabaseException If a problem happened during a flushAll cycle. 119 */ 120 121 void insert(Index index, Entry entry, 122 EntryID entryID, Transaction txn, Set<byte[]> keySet) 123 throws DatabaseException { 124 125 keySet.clear(); 126 index.indexer.indexEntry(entry, keySet); 127 if(!lock.tryLock()) { 128 insertBackupMap(keySet, index, entryID); 129 return; 130 } 131 insertKeySet(keySet, index, entryID, elementMap, true); 132 if(!backupMap.isEmpty()) { 133 mergeMap(); 134 } 135 //If over the memory limit, flush some keys from the cache to make room. 136 if(memoryUsage > memoryLimit) { 137 flushUntilUnderLimit(); 138 } 139 lock.unlock(); 140 } 141 142 /** 143 * Insert an entry ID into buffer using specified id2children and id2subtree 144 * indexes. 145 * 146 * @param id2children The id2children index to use. 147 * @param id2subtree The id2subtree index to use. 148 * @param entry The entry used to build the key set. 149 * @param entryID The entry ID to insert into the key set. 150 * @param txn A transaction. 151 * @param childKeySet id2children key set hash to use. 152 * @param subKeySet subtree key set hash to use. 153 * @throws DatabaseException If a problem occurs during processing. 154 */ 155 void insert(Index id2children, Index id2subtree, Entry entry, 156 EntryID entryID, Transaction txn, Set<byte[]> childKeySet, 157 Set<byte[]> subKeySet) throws DatabaseException { 158 childKeySet.clear(); 159 id2children.indexer.indexEntry(entry, childKeySet); 160 subKeySet.clear(); 161 id2subtree.indexer.indexEntry(entry, subKeySet); 162 if(!lock.tryLock()) { 163 insertBackupMap(childKeySet, id2children, subKeySet, id2subtree, entryID); 164 return; 165 } 166 insertKeySet(childKeySet, id2children, entryID, elementMap, true); 167 insertKeySet(subKeySet, id2subtree, entryID, elementMap, true); 168 lock.unlock(); 169 } 170 171 /** 172 * Insert into a backup tree if can't get a lock on the main table. 173 * @param childrenKeySet The id2children keyset to use. 174 * @param id2children The id2children index to use. 175 * @param subtreeKeySet The subtree keyset to use. 176 * @param id2subtree The id2subtree index to use. 177 * @param entryID The entry ID to insert into the key set. 178 */ 179 void insertBackupMap(Set<byte[]> childrenKeySet, Index id2children, 180 Set<byte[]> subtreeKeySet, 181 Index id2subtree, EntryID entryID) { 182 synchronized(backupSynchObj) { 183 insertKeySet(childrenKeySet, id2children, entryID, backupMap, false); 184 insertKeySet(subtreeKeySet, id2subtree, entryID, backupMap, false); 185 } 186 } 187 188 189 /** 190 * Insert specified keyset, index and entry ID into the backup map. 191 * 192 * @param keySet The keyset to use. 193 * @param index The index to use. 194 * @param entryID The entry ID to use. 195 */ 196 void insertBackupMap(Set<byte[]> keySet, Index index, EntryID entryID) { 197 synchronized(backupSynchObj) { 198 insertKeySet(keySet, index, entryID, backupMap, false); 199 } 200 } 201 202 203 /** 204 * Merge the backup map with the element map after switching the backup 205 * map reference to an empty map. 206 */ 207 void mergeMap() { 208 TreeMap<KeyHashElement, KeyHashElement> tmpMap; 209 synchronized(backupSynchObj) { 210 tmpMap = backupMap; 211 if(currentMap == 1) { 212 backupMap = backupMap2; 213 tmpMap = backupMap1; 214 currentMap = 2; 215 } else { 216 backupMap = backupMap1; 217 tmpMap = backupMap2; 218 currentMap = 1; 219 } 220 } 221 TreeSet<KeyHashElement> tSet = 222 new TreeSet<KeyHashElement>(tmpMap.keySet()); 223 for (KeyHashElement elem : tSet) { 224 total++; 225 if(!elementMap.containsKey(elem)) { 226 elementMap.put(elem, elem); 227 memoryUsage += TREEMAP_ENTRY_OVERHEAD + elem.getMemorySize(); 228 } else { 229 KeyHashElement curElem = elementMap.get(elem); 230 if(curElem.isDefined() || curElem.getIndex().getMaintainCount()) { 231 int oldSize = curElem.getMemorySize(); 232 curElem.merge(elem); 233 memoryUsage += (curElem.getMemorySize() - oldSize); 234 hit++; 235 } 236 } 237 } 238 tmpMap.clear(); 239 } 240 241 /** 242 * Insert a keySet into the element map using the provided index and entry ID. 243 * @param keySet The key set to add to the map. 244 * @param index The index that eventually will contain the entry IDs. 245 * @param entryID The entry ID to add to the entry ID set. 246 * @param map The map to add the keys to 247 * @param trackStats <CODE>True</CODE> if memory and usage should be tracked. 248 */ 249 private void insertKeySet(Set<byte[]> keySet, Index index, EntryID entryID, 250 TreeMap<KeyHashElement, KeyHashElement> map, 251 boolean trackStats) { 252 KeyHashElement elem = new KeyHashElement(); 253 int entryLimit = index.getIndexEntryLimit(); 254 for(byte[] key : keySet) { 255 elem.reset(key, index); 256 if(trackStats) { 257 total++; 258 } 259 if(!map.containsKey(elem)) { 260 KeyHashElement newElem = new KeyHashElement(key, index, entryID); 261 map.put(newElem, newElem); 262 if(trackStats) { 263 memoryUsage += TREEMAP_ENTRY_OVERHEAD + newElem.getMemorySize(); 264 } 265 } else { 266 KeyHashElement curElem = map.get(elem); 267 if(curElem.isDefined() || index.getMaintainCount()) { 268 int oldSize = curElem.getMemorySize(); 269 curElem.addEntryID(entryID, entryLimit); 270 if(trackStats) { 271 memoryUsage += (curElem.getMemorySize() - oldSize); 272 hit++; 273 } 274 } 275 } 276 } 277 } 278 279 /** 280 * Flush the buffer to DB until the buffer is under the memory limit. 281 * 282 * @throws DatabaseException If a problem happens during an index insert. 283 */ 284 private void flushUntilUnderLimit() throws DatabaseException { 285 Iterator<KeyHashElement> iter; 286 if(nextElem == null) { 287 iter = elementMap.keySet().iterator(); 288 } else { 289 iter = elementMap.tailMap(nextElem).keySet().iterator(); 290 } 291 DatabaseEntry dbEntry = new DatabaseEntry(); 292 DatabaseEntry entry = new DatabaseEntry(); 293 while((memoryUsage + extraBytes) > memoryLimit) { 294 if(iter.hasNext()) { 295 KeyHashElement curElem = iter.next(); 296 //Never flush undefined elements. 297 if(curElem.isDefined()) { 298 int oldSize = curElem.getMemorySize(); 299 Index index = curElem.getIndex(); 300 dbEntry.setData(curElem.getKey()); 301 index.insert(null, dbEntry, curElem.getIDSet(), entry); 302 if(curElem.isDefined()) { 303 memoryUsage -= TREEMAP_ENTRY_OVERHEAD + curElem.getMemorySize(); 304 iter.remove(); 305 } else { 306 //Went undefined don't remove the element, just substract the 307 //memory size difference. 308 memoryUsage -= (oldSize - curElem.getMemorySize()); 309 } 310 } 311 } else { 312 //Wrapped around, start at the first element. 313 nextElem = elementMap.firstKey(); 314 iter = elementMap.keySet().iterator(); 315 } 316 } 317 //Start at this element next flushAll cycle. 318 nextElem = iter.next(); 319 } 320 321 /** 322 * Called from main thread to prepare for final buffer flush at end of 323 * ldif load. 324 */ 325 void prepareFlush() { 326 Message msg = 327 NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH.get(elementMap.size(), total, hit); 328 logError(msg); 329 } 330 331 /** 332 * Writes all of the buffer elements to DB. The specific id is used to 333 * share the buffer among the worker threads so this function can be 334 * multi-threaded. 335 * 336 * @throws DatabaseException If an error occurred during the insert. 337 */ 338 void flushAll() throws DatabaseException { 339 mergeMap(); 340 TreeSet<KeyHashElement> tSet = 341 new TreeSet<KeyHashElement>(elementMap.keySet()); 342 DatabaseEntry dbEntry = new DatabaseEntry(); 343 DatabaseEntry entry = new DatabaseEntry(); 344 for (KeyHashElement curElem : tSet) { 345 Index index = curElem.getIndex(); 346 dbEntry.setData(curElem.getKey()); 347 index.insert(null, dbEntry, curElem.getIDSet(), entry); 348 } 349 } 350 351 /** 352 * Class used to represent an element in the buffer. 353 */ 354 class KeyHashElement implements Comparable { 355 356 //Bytes representing the key. 357 private byte[] key; 358 359 //Hash code returned from the System.identityHashCode method on the index 360 //object. 361 private int indexHashCode; 362 363 //Index related to the element. 364 private Index index; 365 366 //The set of IDs related to the key. 367 private ImportIDSet importIDSet; 368 369 //Used to speed up lookup. 370 private int keyHashCode; 371 372 /** 373 * Empty constructor for use when the element is being reused. 374 */ 375 public KeyHashElement() {} 376 377 /** 378 * Reset the element. Used when the element is being reused. 379 * 380 * @param key The new key to reset to. 381 * @param index The new index to reset to. 382 */ 383 public void reset(byte[] key, Index index) { 384 this.key = key; 385 this.index = index; 386 this.indexHashCode = System.identityHashCode(index); 387 this.keyHashCode = Arrays.hashCode(key); 388 if(this.importIDSet != null) { 389 this.importIDSet.reset(); 390 } 391 } 392 393 /** 394 * Create instance of an element for the specified key and index, the add 395 * the specified entry ID to the ID set. 396 * 397 * @param key The key. 398 * @param index The index. 399 * @param entryID The entry ID to start off with. 400 */ 401 public KeyHashElement(byte[] key, Index index, EntryID entryID) { 402 this.key = key; 403 this.index = index; 404 //Use the integer set for right now. This is good up to 2G number of 405 //entries. There is also a LongImportSet, but it currently isn't used. 406 this.importIDSet = new IntegerImportIDSet(entryID); 407 //Used if there when there are conflicts if two or more indexes have 408 //the same key. 409 this.indexHashCode = System.identityHashCode(index); 410 this.keyHashCode = Arrays.hashCode(key); 411 } 412 413 /** 414 * Add an entry ID to the set. 415 * 416 * @param entryID The entry ID to add. 417 * @param entryLimit The entry limit 418 */ 419 void addEntryID(EntryID entryID, int entryLimit) { 420 importIDSet.addEntryID(entryID, entryLimit, index.getMaintainCount()); 421 } 422 423 /** 424 * Return the index. 425 * 426 * @return The index. 427 */ 428 Index getIndex(){ 429 return index; 430 } 431 432 /** 433 * Return the key. 434 * 435 * @return The key. 436 */ 437 byte[] getKey() { 438 return key; 439 } 440 441 /** 442 * Return value of the key hash code. 443 * 444 * @return The key hash code value. 445 */ 446 int getKeyHashCode() { 447 return keyHashCode; 448 } 449 450 /** 451 * Return the ID set. 452 * @return The import ID set. 453 */ 454 ImportIDSet getIDSet() { 455 return importIDSet; 456 } 457 458 /** 459 * Return if the ID set is defined or not. 460 * 461 * @return <CODE>True</CODE> if the ID set is defined. 462 */ 463 boolean isDefined() { 464 return importIDSet.isDefined(); 465 } 466 467 /** 468 * Compare the bytes of two keys. The is slow, only use if the hashcode 469 * had a collision. 470 * 471 * @param a Key a. 472 * @param b Key b. 473 * @return 0 if the keys are equal, -1 if key a is less than key b, 1 if 474 * key a is greater than key b. 475 */ 476 private int compare(byte[] a, byte[] b) { 477 int i; 478 for (i = 0; i < a.length && i < b.length; i++) { 479 if (a[i] > b[i]) { 480 return 1; 481 } 482 else if (a[i] < b[i]) { 483 return -1; 484 } 485 } 486 if (a.length == b.length) { 487 return 0; 488 } 489 if (a.length > b.length){ 490 return 1; 491 } 492 else { 493 return -1; 494 } 495 } 496 497 /** 498 * Compare two element keys. First check the precomputed hashCode. If 499 * the hashCodes are equal, do a second byte per byte comparision in case 500 * there was a collision. 501 * 502 * @param elem The element to compare. 503 * @return 0 if the keys are equal, -1 if key a is less than key b, 1 if 504 * key a is greater than key b. 505 */ 506 private int compare(KeyHashElement elem) { 507 if(keyHashCode == elem.getKeyHashCode()) { 508 return compare(key, elem.key); 509 } else { 510 if(keyHashCode < elem.getKeyHashCode()) { 511 return -1; 512 } else { 513 return 1; 514 } 515 } 516 } 517 518 /** 519 * Compare the specified object to the current object. If the keys are 520 * equal, then the indexHashCode value is used as a tie-breaker. 521 * 522 * @param o The object representing a KeyHashElement. 523 * @return 0 if the objects are equal, -1 if the current object is less 524 * than the specified object, 1 otherwise. 525 */ 526 public int compareTo(Object o) { 527 if (o == null) { 528 throw new NullPointerException(); 529 } 530 KeyHashElement inElem = (KeyHashElement) o; 531 int keyCompare = compare(inElem); 532 if(keyCompare == 0) { 533 if(indexHashCode == inElem.indexHashCode) { 534 return 0; 535 } else if(indexHashCode < inElem.indexHashCode) { 536 return -1; 537 } else { 538 return 1; 539 } 540 } else { 541 return keyCompare; 542 } 543 } 544 545 /** 546 * Return the current total memory size of the element. 547 * @return The memory size estimate of a KeyHashElement. 548 */ 549 int getMemorySize() { 550 return KEY_ELEMENT_OVERHEAD + 551 MemoryBudget.byteArraySize(key.length) + 552 importIDSet.getMemorySize(); 553 } 554 555 /** 556 * Merge the specified element with this element. 557 * @param e The element to merge. 558 */ 559 public void merge(KeyHashElement e) { 560 importIDSet.merge(e.importIDSet, e.getIndex().getIndexEntryLimit(), 561 e.getIndex().getMaintainCount()); 562 } 563 } 564 }