001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2008 Sun Microsystems, Inc. 026 */ 027 028 package org.opends.server.backends.jeb.importLDIF; 029 030 import static org.opends.server.loggers.debug.DebugLogger.*; 031 import org.opends.server.loggers.debug.DebugTracer; 032 import org.opends.server.types.*; 033 import org.opends.server.api.DirectoryThread; 034 import org.opends.server.backends.jeb.*; 035 import org.opends.messages.Message; 036 import static org.opends.messages.JebMessages.*; 037 import java.util.concurrent.BlockingQueue; 038 import java.util.concurrent.TimeUnit; 039 import java.util.*; 040 import com.sleepycat.je.DatabaseException; 041 import com.sleepycat.je.Transaction; 042 import com.sleepycat.je.LockMode; 043 import com.sleepycat.je.DatabaseEntry; 044 045 /** 046 * A thread to process import entries from a queue. Multiple instances of 047 * this class process entries from a single shared queue. 048 */ 049 public class WorkThread extends DirectoryThread { 050 051 /** 052 * The tracer object for the debug logger. 053 */ 054 private static final DebugTracer TRACER = getTracer(); 055 056 /* 057 * Work queue of work items. 058 */ 059 private BlockingQueue<WorkElement> workQueue; 060 061 062 /** 063 * The number of entries imported by this thread. 064 */ 065 private int importedCount = 0; 066 067 //Root container. 068 private RootContainer rootContainer; 069 070 /** 071 * A flag that is set when the thread has been told to stop processing. 072 */ 073 private boolean stopRequested = false; 074 075 //The thread number related to a thread. 076 private int threadNumber; 077 078 //The substring buffer manager to use. 079 private BufferManager bufferMgr; 080 081 //These are used to try and keep memory usage down. 082 private Set<byte[]> insertKeySet = new HashSet<byte[]>(); 083 private Set<byte[]> childKeySet = new HashSet<byte[]>(); 084 private Set<byte[]> subtreeKeySet = new HashSet<byte[]>(); 085 private Set<byte[]> delKeySet = new HashSet<byte[]>(); 086 private DatabaseEntry keyData = new DatabaseEntry(); 087 private DatabaseEntry data = new DatabaseEntry(); 088 ImportIDSet importIDSet = new IntegerImportIDSet(); 089 090 /** 091 * Create a work thread instance using the specified parameters. 092 * 093 * @param workQueue The work queue to pull work off of. 094 * @param threadNumber The thread number. 095 * @param bufferMgr The buffer manager to use. 096 * @param rootContainer The root container. 097 */ 098 public WorkThread(BlockingQueue<WorkElement> workQueue, int threadNumber, 099 BufferManager bufferMgr, 100 RootContainer rootContainer) { 101 super("Import Worker Thread " + threadNumber); 102 this.threadNumber = threadNumber; 103 this.workQueue = workQueue; 104 this.bufferMgr = bufferMgr; 105 this.rootContainer = rootContainer; 106 } 107 108 /** 109 * Get the number of entries imported by this thread. 110 * @return The number of entries imported by this thread. 111 */ 112 int getImportedCount() { 113 return importedCount; 114 } 115 116 /** 117 * Tells the thread to stop processing. 118 */ 119 void stopProcessing() { 120 stopRequested = true; 121 } 122 123 /** 124 * Run the thread. Read from item from queue and give it to the 125 * buffer manage, unless told to stop. Once stopped, ask buffer manager 126 * to flush and exit. 127 * 128 */ 129 public void run() 130 { 131 try { 132 do { 133 try { 134 WorkElement element = workQueue.poll(1000, TimeUnit.MILLISECONDS); 135 if(element != null) { 136 process(element); 137 } 138 } 139 catch (InterruptedException e) { 140 if (debugEnabled()) { 141 TRACER.debugCaught(DebugLogLevel.ERROR, e); 142 } 143 } 144 } while (!stopRequested); 145 } catch (Exception e) { 146 if (debugEnabled()) { 147 TRACER.debugCaught(DebugLogLevel.ERROR, e); 148 } 149 throw new RuntimeException(e); 150 } 151 } 152 153 /** 154 * Process a work element. 155 * 156 * @param element The work elemenet to process. 157 * 158 * @throws DatabaseException If a database error occurs. 159 * @throws DirectoryException If a directory error occurs. 160 * @throws JebException If a JEB error occurs. 161 */ 162 private void process(WorkElement element) 163 throws DatabaseException, DirectoryException, JebException { 164 Transaction txn = null; 165 EntryID entryID; 166 if((entryID = processDN2ID(element, txn)) == null) 167 return; 168 if(!processID2Entry(element, entryID, txn)) 169 return; 170 procesID2SCEntry(element, entryID, txn); 171 processIndexesEntry(element, entryID, txn); 172 } 173 174 /** 175 * Delete all indexes related to the specified entry ID using the specified 176 * entry to generate the keys. 177 * 178 * @param element The work element. 179 * @param existingEntry The existing entry to replace. 180 * @param entryID The entry ID to remove from the keys. 181 * @param txn A transaction. 182 * @throws DatabaseException If a database error occurs. 183 */ 184 private void 185 processIndexesEntryDelete(WorkElement element, Entry existingEntry, 186 EntryID entryID, Transaction txn) 187 throws DatabaseException { 188 DNContext context = element.getContext(); 189 Map<AttributeType, AttributeIndex> attrIndexMap = 190 context.getAttrIndexMap(); 191 for(Map.Entry<AttributeType, AttributeIndex> mapEntry : 192 attrIndexMap.entrySet()) { 193 AttributeType attrType = mapEntry.getKey(); 194 if(existingEntry.hasAttribute(attrType)) { 195 AttributeIndex attributeIndex = mapEntry.getValue(); 196 Index index; 197 if((index=attributeIndex.getEqualityIndex()) != null) { 198 delete(index, existingEntry, entryID, txn); 199 } 200 if((index=attributeIndex.getPresenceIndex()) != null) { 201 delete(index, existingEntry, entryID, txn); 202 } 203 if((index=attributeIndex.getSubstringIndex()) != null) { 204 delete(index, existingEntry, entryID, txn); 205 } 206 if((index=attributeIndex.getOrderingIndex()) != null) { 207 delete(index, existingEntry, entryID, txn); 208 } 209 if((index=attributeIndex.getApproximateIndex()) != null) { 210 delete(index, existingEntry, entryID, txn); 211 } 212 } 213 } 214 } 215 216 /** 217 * Process all indexes using the specified entry ID. 218 * 219 * @param element The work element. 220 * @param entryID The entry ID to process. 221 * @param txn A transaction. 222 * @throws DatabaseException If an database error occurs. 223 */ 224 private void 225 processIndexesEntry(WorkElement element, EntryID entryID, Transaction txn) 226 throws DatabaseException { 227 Entry entry = element.getEntry(); 228 DNContext context = element.getContext(); 229 LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig(); 230 if (ldifImportConfig.appendToExistingData() && 231 ldifImportConfig.replaceExistingEntries()) { 232 Entry existingEntry = element.getExistingEntry(); 233 if(existingEntry != null) { 234 processIndexesEntryDelete(element, existingEntry, entryID, txn); 235 } 236 } 237 Map<AttributeType, AttributeIndex> attrIndexMap = 238 context.getAttrIndexMap(); 239 for(Map.Entry<AttributeType, AttributeIndex> mapEntry : 240 attrIndexMap.entrySet()) { 241 AttributeType attrType = mapEntry.getKey(); 242 if(entry.hasAttribute(attrType)) { 243 AttributeIndex attributeIndex = mapEntry.getValue(); 244 Index index; 245 if((index=attributeIndex.getEqualityIndex()) != null) { 246 insert(index, entry, entryID, txn); 247 } 248 if((index=attributeIndex.getPresenceIndex()) != null) { 249 insert(index, entry, entryID, txn); 250 } 251 if((index=attributeIndex.getSubstringIndex()) != null) { 252 bufferMgr.insert(index,entry, entryID, txn, insertKeySet); 253 } 254 if((index=attributeIndex.getOrderingIndex()) != null) { 255 insert(index, entry, entryID, txn); 256 } 257 if((index=attributeIndex.getApproximateIndex()) != null) { 258 insert(index, entry, entryID, txn); 259 } 260 } 261 } 262 } 263 264 /** 265 * Process id2children/id2subtree indexes for the specified entry ID. 266 * 267 * @param element The work element. 268 * @param entryID The entry ID to process. 269 * @param txn A transaction. 270 * @throws DatabaseException If an database error occurs. 271 */ 272 private void 273 procesID2SCEntry(WorkElement element, EntryID entryID, 274 Transaction txn) throws DatabaseException { 275 Entry entry = element.getEntry(); 276 DNContext context = element.getContext(); 277 LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig(); 278 if (ldifImportConfig.appendToExistingData() && 279 ldifImportConfig.replaceExistingEntries()) { 280 return; 281 } 282 Index id2children = context.getEntryContainer().getID2Children(); 283 Index id2subtree = context.getEntryContainer().getID2Subtree(); 284 bufferMgr.insert(id2children, id2subtree, entry, entryID, txn, 285 childKeySet, subtreeKeySet); 286 } 287 288 /** 289 * Insert specified entry ID into the specified index using the entry to 290 * generate the keys. 291 * 292 * @param index The index to insert into. 293 * @param entry The entry to generate the keys from. 294 * @param entryID The entry ID to insert. 295 * @param txn A transaction. 296 * @return <CODE>True</CODE> if insert succeeded. 297 * @throws DatabaseException If a database error occurs. 298 */ 299 private boolean 300 insert(Index index, Entry entry, EntryID entryID, 301 Transaction txn) throws DatabaseException { 302 insertKeySet.clear(); 303 index.indexer.indexEntry(entry, insertKeySet); 304 importIDSet.setEntryID(entryID); 305 return index.insert(txn, importIDSet, insertKeySet, keyData, data); 306 } 307 308 /** 309 * Delete specified entry ID into the specified index using the entry to 310 * generate the keys. 311 * 312 * @param index The index to insert into. 313 * @param entry The entry to generate the keys from. 314 * @param entryID The entry ID to insert. 315 * @param txn A transaction. 316 * @throws DatabaseException If a database error occurs. 317 */ 318 private void 319 delete(Index index, Entry entry, EntryID entryID, 320 Transaction txn) throws DatabaseException { 321 delKeySet.clear(); 322 index.indexer.indexEntry(entry, delKeySet); 323 index.delete(txn, delKeySet, entryID); 324 } 325 326 /** 327 * Insert entry from work element into id2entry DB. 328 * 329 * @param element The work element containing the entry. 330 * @param entryID The entry ID to use as the key. 331 * @param txn A transaction. 332 * @return <CODE>True</CODE> If the insert succeeded. 333 * @throws DatabaseException If a database error occurs. 334 * @throws DirectoryException If a directory error occurs. 335 */ 336 private boolean 337 processID2Entry(WorkElement element, EntryID entryID, Transaction txn) 338 throws DatabaseException, DirectoryException { 339 boolean ret; 340 Entry entry = element.getEntry(); 341 DNContext context = element.getContext(); 342 ID2Entry id2entry = context.getEntryContainer().getID2Entry(); 343 DN2URI dn2uri = context.getEntryContainer().getDN2URI(); 344 ret=id2entry.put(txn, entryID, entry); 345 if(ret) { 346 importedCount++; 347 LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig(); 348 if (ldifImportConfig.appendToExistingData() && 349 ldifImportConfig.replaceExistingEntries()) { 350 Entry existingEntry = element.getExistingEntry(); 351 if(existingEntry != null) { 352 dn2uri.replaceEntry(txn, existingEntry, entry); 353 } 354 } else { 355 ret= dn2uri.addEntry(txn, entry); 356 } 357 } 358 return ret; 359 } 360 361 /** 362 * Process entry from work element checking if it's parent exists. 363 * 364 * @param element The work element containing the entry. 365 * @param txn A transaction. 366 * @return <CODE>True</CODE> If the insert succeeded. 367 * @throws DatabaseException If a database error occurs. 368 */ 369 private boolean 370 processParent(WorkElement element, Transaction txn) 371 throws DatabaseException { 372 Entry entry = element.getEntry(); 373 DNContext context = element.getContext(); 374 LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig(); 375 if (ldifImportConfig.appendToExistingData() && 376 ldifImportConfig.replaceExistingEntries()) { 377 return true; 378 } 379 EntryID parentID = null; 380 DN entryDN = entry.getDN(); 381 DN parentDN = context.getEntryContainer().getParentWithinBase(entryDN); 382 DN2ID dn2id = context.getEntryContainer().getDN2ID(); 383 if (parentDN != null) { 384 parentID = context.getParentID(parentDN, dn2id, txn); 385 if (parentID == null) { 386 dn2id.remove(txn, entryDN); 387 Message msg = 388 ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString()); 389 context.getLDIFReader().rejectLastEntry(msg); 390 return false; 391 } 392 } 393 EntryID entryID = rootContainer.getNextEntryID(); 394 ArrayList<EntryID> IDs; 395 if (parentDN != null && context.getParentDN() != null && 396 parentDN.equals(context.getParentDN())) { 397 IDs = new ArrayList<EntryID>(context.getIDs()); 398 IDs.set(0, entryID); 399 } else { 400 EntryID nodeID; 401 IDs = new ArrayList<EntryID>(entryDN.getNumComponents()); 402 IDs.add(entryID); 403 if (parentID != null) 404 { 405 IDs.add(parentID); 406 EntryContainer ec = context.getEntryContainer(); 407 for (DN dn = ec.getParentWithinBase(parentDN); dn != null; 408 dn = ec.getParentWithinBase(dn)) { 409 if((nodeID = getAncestorID(dn2id, dn, txn)) == null) { 410 return false; 411 } else { 412 IDs.add(nodeID); 413 } 414 } 415 } 416 } 417 context.setParentDN(parentDN); 418 context.setIDs(IDs); 419 entry.setAttachment(IDs); 420 return true; 421 } 422 423 private EntryID getAncestorID(DN2ID dn2id, DN dn, Transaction txn) 424 throws DatabaseException { 425 int i=0; 426 EntryID nodeID = dn2id.get(txn, dn, LockMode.DEFAULT); 427 if(nodeID == null) { 428 while((nodeID = dn2id.get(txn, dn, LockMode.DEFAULT)) == null) { 429 try { 430 Thread.sleep(50); 431 if(i == 3) { 432 return null; 433 } 434 i++; 435 } catch (Exception e) { 436 return null; 437 } 438 } 439 } 440 return nodeID; 441 } 442 443 /** 444 * Process the a entry from the work element into the dn2id DB. 445 * 446 * @param element The work element containing the entry. 447 * @param txn A transaction. 448 * @return An entry ID. 449 * @throws DatabaseException If a database error occurs. 450 * @throws JebException If a JEB error occurs. 451 */ 452 private EntryID 453 processDN2ID(WorkElement element, Transaction txn) 454 throws DatabaseException, JebException { 455 Entry entry = element.getEntry(); 456 DNContext context = element.getContext(); 457 DN2ID dn2id = context.getEntryContainer().getDN2ID(); 458 LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig(); 459 DN entryDN = entry.getDN(); 460 EntryID entryID = dn2id.get(txn, entryDN, LockMode.DEFAULT); 461 if (entryID != null) { 462 if (ldifImportConfig.appendToExistingData() && 463 ldifImportConfig.replaceExistingEntries()) { 464 ID2Entry id2entry = context.getEntryContainer().getID2Entry(); 465 Entry existingEntry = id2entry.get(txn, entryID, LockMode.DEFAULT); 466 element.setExistingEntry(existingEntry); 467 } else { 468 Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get(); 469 context.getLDIFReader().rejectLastEntry(msg); 470 entryID = null; 471 } 472 } else { 473 if(!processParent(element, txn)) 474 return null; 475 if (ldifImportConfig.appendToExistingData() && 476 ldifImportConfig.replaceExistingEntries()) { 477 entryID = rootContainer.getNextEntryID(); 478 } else { 479 ArrayList IDs = (ArrayList)entry.getAttachment(); 480 entryID = (EntryID)IDs.get(0); 481 } 482 dn2id.insert(txn, entryDN, entryID); 483 } 484 context.removePending(entryDN); 485 return entryID; 486 } 487 }