001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2008 Sun Microsystems, Inc.
026     */
027    
028    package org.opends.server.backends.jeb.importLDIF;
029    
030    import static org.opends.server.loggers.debug.DebugLogger.*;
031    import org.opends.server.loggers.debug.DebugTracer;
032    import org.opends.server.types.*;
033    import org.opends.server.api.DirectoryThread;
034    import org.opends.server.backends.jeb.*;
035    import org.opends.messages.Message;
036    import static org.opends.messages.JebMessages.*;
037    import java.util.concurrent.BlockingQueue;
038    import java.util.concurrent.TimeUnit;
039    import java.util.*;
040    import com.sleepycat.je.DatabaseException;
041    import com.sleepycat.je.Transaction;
042    import com.sleepycat.je.LockMode;
043    import com.sleepycat.je.DatabaseEntry;
044    
045    /**
046     * A thread to process import entries from a queue.  Multiple instances of
047     * this class process entries from a single shared queue.
048     */
049    public class WorkThread extends DirectoryThread {
050    
051      /**
052       * The tracer object for the debug logger.
053       */
054      private static final DebugTracer TRACER = getTracer();
055    
056      /*
057       * Work queue of work items.
058       */
059      private BlockingQueue<WorkElement> workQueue;
060    
061    
062      /**
063       * The number of entries imported by this thread.
064       */
065      private int importedCount = 0;
066    
067      //Root container.
068      private RootContainer rootContainer;
069    
070      /**
071       * A flag that is set when the thread has been told to stop processing.
072       */
073      private boolean stopRequested = false;
074    
075      //The thread number related to a thread.
076      private int threadNumber;
077    
078      //The substring buffer manager to use.
079      private BufferManager bufferMgr;
080    
081      //These are used to try and keep memory usage down.
082      private Set<byte[]> insertKeySet = new HashSet<byte[]>();
083      private Set<byte[]> childKeySet = new HashSet<byte[]>();
084      private Set<byte[]> subtreeKeySet = new HashSet<byte[]>();
085      private Set<byte[]> delKeySet = new HashSet<byte[]>();
086      private DatabaseEntry keyData = new DatabaseEntry();
087      private DatabaseEntry data = new DatabaseEntry();
088      ImportIDSet importIDSet = new IntegerImportIDSet();
089    
090      /**
091       * Create a work thread instance using the specified parameters.
092       *
093       * @param workQueue  The work queue to pull work off of.
094       * @param threadNumber The thread number.
095       * @param bufferMgr  The buffer manager to use.
096       * @param rootContainer The root container.
097       */
098      public WorkThread(BlockingQueue<WorkElement> workQueue, int threadNumber,
099                                    BufferManager bufferMgr,
100                                    RootContainer rootContainer) {
101        super("Import Worker Thread " + threadNumber);
102        this.threadNumber = threadNumber;
103        this.workQueue = workQueue;
104        this.bufferMgr = bufferMgr;
105        this.rootContainer = rootContainer;
106      }
107    
108      /**
109       * Get the number of entries imported by this thread.
110       * @return The number of entries imported by this thread.
111       */
112       int getImportedCount() {
113        return importedCount;
114      }
115    
116      /**
117       * Tells the thread to stop processing.
118       */
119       void stopProcessing() {
120        stopRequested = true;
121      }
122    
123      /**
124       * Run the thread. Read from item from queue and give it to the
125       * buffer manage, unless told to stop. Once stopped, ask buffer manager
126       * to flush and exit.
127       *
128       */
129      public void run()
130      {
131        try {
132          do {
133            try {
134              WorkElement element = workQueue.poll(1000, TimeUnit.MILLISECONDS);
135              if(element != null) {
136               process(element);
137              }
138            }
139            catch (InterruptedException e) {
140              if (debugEnabled()) {
141                TRACER.debugCaught(DebugLogLevel.ERROR, e);
142              }
143            }
144          } while (!stopRequested);
145        } catch (Exception e) {
146          if (debugEnabled()) {
147            TRACER.debugCaught(DebugLogLevel.ERROR, e);
148          }
149          throw new RuntimeException(e);
150        }
151      }
152    
153      /**
154       * Process a work element.
155       *
156       * @param element The work elemenet to process.
157       *
158       * @throws DatabaseException If a database error occurs.
159       * @throws DirectoryException If a directory error occurs.
160       * @throws JebException If a JEB error occurs.
161       */
162      private void process(WorkElement element)
163      throws DatabaseException, DirectoryException, JebException {
164        Transaction txn = null;
165        EntryID entryID;
166        if((entryID = processDN2ID(element, txn)) == null)
167          return;
168        if(!processID2Entry(element, entryID, txn))
169          return;
170        procesID2SCEntry(element, entryID, txn);
171        processIndexesEntry(element, entryID, txn);
172      }
173    
174      /**
175       * Delete all indexes related to the specified entry ID using the specified
176       * entry to generate the keys.
177       *
178       * @param element The work element.
179       * @param existingEntry The existing entry to replace.
180       * @param entryID The entry ID to remove from the keys.
181       * @param txn A transaction.
182       * @throws DatabaseException If a database error occurs.
183       */
184      private void
185      processIndexesEntryDelete(WorkElement element, Entry existingEntry,
186                                EntryID entryID, Transaction txn)
187              throws DatabaseException {
188        DNContext context = element.getContext();
189        Map<AttributeType, AttributeIndex> attrIndexMap =
190                context.getAttrIndexMap();
191        for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
192                attrIndexMap.entrySet()) {
193          AttributeType attrType = mapEntry.getKey();
194          if(existingEntry.hasAttribute(attrType)) {
195            AttributeIndex attributeIndex = mapEntry.getValue();
196            Index index;
197            if((index=attributeIndex.getEqualityIndex()) != null) {
198              delete(index, existingEntry, entryID, txn);
199            }
200            if((index=attributeIndex.getPresenceIndex()) != null) {
201              delete(index, existingEntry, entryID, txn);
202            }
203            if((index=attributeIndex.getSubstringIndex()) != null) {
204              delete(index, existingEntry, entryID, txn);
205            }
206            if((index=attributeIndex.getOrderingIndex()) != null) {
207              delete(index, existingEntry, entryID, txn);
208            }
209            if((index=attributeIndex.getApproximateIndex()) != null) {
210              delete(index, existingEntry, entryID, txn);
211            }
212          }
213        }
214      }
215    
216      /**
217       * Process all indexes using the specified entry ID.
218       *
219       * @param element The work element.
220       * @param entryID The entry ID to process.
221       * @param txn A transaction.
222       * @throws DatabaseException If an database error occurs.
223       */
224      private void
225      processIndexesEntry(WorkElement element, EntryID entryID, Transaction txn)
226              throws DatabaseException {
227        Entry entry = element.getEntry();
228        DNContext context = element.getContext();
229        LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
230        if (ldifImportConfig.appendToExistingData() &&
231                ldifImportConfig.replaceExistingEntries()) {
232          Entry existingEntry = element.getExistingEntry();
233          if(existingEntry != null) {
234              processIndexesEntryDelete(element, existingEntry, entryID, txn);
235          }
236        }
237        Map<AttributeType, AttributeIndex> attrIndexMap =
238                context.getAttrIndexMap();
239        for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
240                attrIndexMap.entrySet()) {
241          AttributeType attrType = mapEntry.getKey();
242          if(entry.hasAttribute(attrType)) {
243            AttributeIndex attributeIndex = mapEntry.getValue();
244            Index index;
245            if((index=attributeIndex.getEqualityIndex()) != null) {
246              insert(index, entry, entryID, txn);
247            }
248            if((index=attributeIndex.getPresenceIndex()) != null) {
249              insert(index, entry, entryID, txn);
250            }
251            if((index=attributeIndex.getSubstringIndex()) != null) {
252              bufferMgr.insert(index,entry, entryID, txn, insertKeySet);
253            }
254            if((index=attributeIndex.getOrderingIndex()) != null) {
255              insert(index, entry, entryID, txn);
256            }
257            if((index=attributeIndex.getApproximateIndex()) != null) {
258              insert(index, entry, entryID, txn);
259            }
260          }
261        }
262      }
263    
264      /**
265       * Process id2children/id2subtree indexes for the specified entry ID.
266       *
267       * @param element The work element.
268       * @param entryID The entry ID to process.
269       * @param txn A transaction.
270       * @throws DatabaseException If an database error occurs.
271       */
272      private  void
273      procesID2SCEntry(WorkElement element, EntryID entryID,
274                       Transaction txn) throws DatabaseException {
275        Entry entry = element.getEntry();
276        DNContext context = element.getContext();
277        LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
278        if (ldifImportConfig.appendToExistingData() &&
279                ldifImportConfig.replaceExistingEntries()) {
280          return;
281        }
282        Index id2children = context.getEntryContainer().getID2Children();
283        Index id2subtree = context.getEntryContainer().getID2Subtree();
284        bufferMgr.insert(id2children, id2subtree, entry, entryID, txn,
285                        childKeySet, subtreeKeySet);
286      }
287    
288      /**
289       * Insert specified entry ID into the specified index using the entry to
290       * generate the keys.
291       *
292       * @param index  The index to insert into.
293       * @param entry The entry to generate the keys from.
294       * @param entryID The entry ID to insert.
295       * @param txn A transaction.
296       * @return <CODE>True</CODE> if insert succeeded.
297       * @throws DatabaseException If a database error occurs.
298       */
299      private boolean
300      insert(Index index, Entry entry, EntryID entryID,
301             Transaction txn) throws DatabaseException {
302        insertKeySet.clear();
303        index.indexer.indexEntry(entry, insertKeySet);
304        importIDSet.setEntryID(entryID);
305        return index.insert(txn, importIDSet, insertKeySet, keyData, data);
306      }
307    
308      /**
309       * Delete specified entry ID into the specified index using the entry to
310       * generate the keys.
311       *
312       * @param index  The index to insert into.
313       * @param entry The entry to generate the keys from.
314       * @param entryID The entry ID to insert.
315       * @param txn A transaction.
316       * @throws DatabaseException If a database error occurs.
317       */
318      private void
319      delete(Index index, Entry entry, EntryID entryID,
320             Transaction txn) throws DatabaseException {
321        delKeySet.clear();
322        index.indexer.indexEntry(entry, delKeySet);
323        index.delete(txn, delKeySet,  entryID);
324      }
325    
326      /**
327       * Insert entry from work element into id2entry DB.
328       *
329       * @param element The work element containing the entry.
330       * @param entryID The entry ID to use as the key.
331       * @param txn A transaction.
332       * @return <CODE>True</CODE> If the insert succeeded.
333       * @throws DatabaseException If a database error occurs.
334       * @throws DirectoryException  If a directory error occurs.
335       */
336      private boolean
337      processID2Entry(WorkElement element, EntryID entryID, Transaction txn)
338              throws DatabaseException, DirectoryException {
339        boolean ret;
340        Entry entry = element.getEntry();
341        DNContext context = element.getContext();
342        ID2Entry id2entry = context.getEntryContainer().getID2Entry();
343        DN2URI dn2uri = context.getEntryContainer().getDN2URI();
344        ret=id2entry.put(txn, entryID, entry);
345        if(ret) {
346          importedCount++;
347          LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
348          if (ldifImportConfig.appendToExistingData() &&
349                  ldifImportConfig.replaceExistingEntries()) {
350            Entry existingEntry = element.getExistingEntry();
351            if(existingEntry != null) {
352              dn2uri.replaceEntry(txn, existingEntry, entry);
353            }
354          } else {
355            ret= dn2uri.addEntry(txn, entry);
356          }
357        }
358        return ret;
359      }
360    
361      /**
362       * Process entry from work element checking if it's parent exists.
363       *
364       * @param element The work element containing the entry.
365       * @param txn A transaction.
366       * @return <CODE>True</CODE> If the insert succeeded.
367       * @throws DatabaseException If a database error occurs.
368       */
369      private boolean
370      processParent(WorkElement element, Transaction txn)
371              throws DatabaseException {
372        Entry entry = element.getEntry();
373        DNContext context = element.getContext();
374        LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
375        if (ldifImportConfig.appendToExistingData() &&
376                ldifImportConfig.replaceExistingEntries()) {
377          return true;
378        }
379        EntryID parentID = null;
380        DN entryDN = entry.getDN();
381        DN parentDN = context.getEntryContainer().getParentWithinBase(entryDN);
382        DN2ID dn2id = context.getEntryContainer().getDN2ID();
383        if (parentDN != null) {
384          parentID = context.getParentID(parentDN, dn2id, txn);
385          if (parentID == null) {
386            dn2id.remove(txn, entryDN);
387            Message msg =
388                    ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString());
389            context.getLDIFReader().rejectLastEntry(msg);
390            return false;
391          }
392        }
393        EntryID entryID = rootContainer.getNextEntryID();
394        ArrayList<EntryID> IDs;
395        if (parentDN != null && context.getParentDN() != null &&
396                parentDN.equals(context.getParentDN())) {
397          IDs = new ArrayList<EntryID>(context.getIDs());
398          IDs.set(0, entryID);
399        } else {
400          EntryID nodeID;
401          IDs = new ArrayList<EntryID>(entryDN.getNumComponents());
402          IDs.add(entryID);
403          if (parentID != null)
404          {
405            IDs.add(parentID);
406            EntryContainer ec = context.getEntryContainer();
407            for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
408                 dn = ec.getParentWithinBase(dn)) {
409              if((nodeID =  getAncestorID(dn2id, dn, txn)) == null) {
410                return false;
411              } else {
412                IDs.add(nodeID);
413              }
414            }
415          }
416        }
417        context.setParentDN(parentDN);
418        context.setIDs(IDs);
419        entry.setAttachment(IDs);
420        return true;
421      }
422    
423      private EntryID getAncestorID(DN2ID dn2id, DN dn, Transaction txn)
424              throws DatabaseException {
425        int i=0;
426        EntryID nodeID = dn2id.get(txn, dn, LockMode.DEFAULT);
427        if(nodeID == null) {
428          while((nodeID = dn2id.get(txn, dn, LockMode.DEFAULT)) == null) {
429            try {
430              Thread.sleep(50);
431              if(i == 3) {
432                return null;
433              }
434              i++;
435            } catch (Exception e) {
436              return null;
437            }
438          }
439        }
440        return nodeID;
441      }
442    
443      /**
444       * Process the a entry from the work element into the dn2id DB.
445       *
446       * @param element The work element containing the entry.
447       * @param txn A transaction.
448       * @return An entry ID.
449       * @throws DatabaseException If a database error occurs.
450       * @throws JebException If a JEB error occurs.
451       */
452      private EntryID
453      processDN2ID(WorkElement element, Transaction txn)
454              throws DatabaseException, JebException {
455        Entry entry = element.getEntry();
456        DNContext context = element.getContext();
457        DN2ID dn2id = context.getEntryContainer().getDN2ID();
458        LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
459        DN entryDN = entry.getDN();
460        EntryID entryID = dn2id.get(txn, entryDN, LockMode.DEFAULT);
461        if (entryID != null) {
462          if (ldifImportConfig.appendToExistingData() &&
463                  ldifImportConfig.replaceExistingEntries()) {
464            ID2Entry id2entry = context.getEntryContainer().getID2Entry();
465            Entry existingEntry = id2entry.get(txn, entryID, LockMode.DEFAULT);
466            element.setExistingEntry(existingEntry);
467          } else {
468            Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
469            context.getLDIFReader().rejectLastEntry(msg);
470            entryID = null;
471          }
472        } else {
473          if(!processParent(element, txn))
474            return null;
475          if (ldifImportConfig.appendToExistingData() &&
476                  ldifImportConfig.replaceExistingEntries()) {
477            entryID = rootContainer.getNextEntryID();
478          } else {
479            ArrayList IDs = (ArrayList)entry.getAttachment();
480            entryID = (EntryID)IDs.get(0);
481          }
482          dn2id.insert(txn, entryDN, entryID);
483        }
484        context.removePending(entryDN);
485        return entryID;
486      }
487    }