001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.backends.jeb;
028    import org.opends.messages.Message;
029    
030    import org.opends.server.types.*;
031    
032    import java.util.ArrayList;
033    import java.util.TimerTask;
034    import java.util.Timer;
035    import java.util.concurrent.CopyOnWriteArrayList;
036    import java.util.concurrent.locks.ReentrantLock;
037    
038    import com.sleepycat.je.DatabaseException;
039    import com.sleepycat.je.StatsConfig;
040    import com.sleepycat.je.EnvironmentStats;
041    
042    import static org.opends.server.loggers.ErrorLogger.logError;
043    import static org.opends.server.loggers.debug.DebugLogger.*;
044    import org.opends.server.loggers.debug.DebugTracer;
045    import org.opends.server.core.DirectoryServer;
046    import static org.opends.messages.JebMessages.
047        ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED;
048    import static org.opends.messages.JebMessages.
049        NOTE_JEB_REBUILD_PROGRESS_REPORT;
050    import static org.opends.messages.JebMessages.
051        NOTE_JEB_REBUILD_FINAL_STATUS;
052    import static org.opends.messages.JebMessages.
053        NOTE_JEB_REBUILD_CACHE_AND_MEMORY_REPORT;
054    import static org.opends.messages.JebMessages.
055        ERR_JEB_REBUILD_INDEX_CONFLICT;
056    import static org.opends.messages.JebMessages.
057        NOTE_JEB_REBUILD_START;
058    import static org.opends.messages.JebMessages.
059        ERR_JEB_VLV_INDEX_NOT_CONFIGURED;
060    /**
061     * Runs a index rebuild process on the backend. Each index selected for rebuild
062     * will be done from scratch by first clearing out the database for that index.
063     * Different threads will be used to rebuild each index.
064     * The rebuild process can run concurrently with the backend online and
065     * performing write and read operations. However, during the rebuild process,
066     * other reader and writer activeThreads might notice inconsistencies in index
067     * databases being rebuilt. They can safely ignore these inconsistencies as long
068     * as a rebuild is in progress.
069     */
070    public class RebuildJob
071    {
072      /**
073       * The tracer object for the debug logger.
074       */
075      private static final DebugTracer TRACER = getTracer();
076    
077      /**
078       * The rebuild configuraiton.
079       */
080      private RebuildConfig rebuildConfig;
081    
082      /**
083       * The root container used for the verify job.
084       */
085      private RootContainer rootContainer;
086    
087      /**
088       * The number of milliseconds between job progress reports.
089       */
090      private long progressInterval = 10000;
091    
092      /**
093       * The waiting rebuild threads created to process the rebuild.
094       */
095      private CopyOnWriteArrayList<IndexRebuildThread> waitingThreads =
096          new CopyOnWriteArrayList<IndexRebuildThread>();
097    
098      /**
099       * The active rebuild threads created to process the rebuild.
100       */
101      private CopyOnWriteArrayList<IndexRebuildThread> activeThreads =
102          new CopyOnWriteArrayList<IndexRebuildThread>();
103    
104      /**
105       * The completed rebuild threads used to process the rebuild.
106       */
107      private CopyOnWriteArrayList<IndexRebuildThread> completedThreads =
108          new CopyOnWriteArrayList<IndexRebuildThread>();
109    
110      /**
111       * Rebuild jobs currently running.
112       */
113      private static CopyOnWriteArrayList<RebuildJob> rebuildJobs =
114          new CopyOnWriteArrayList<RebuildJob>();
115    
116      /**
117       * A mutex that will be used to provide threadsafe access to methods changing
118       * the set of currently running rebuild jobs.
119       */
120      private static ReentrantLock jobsMutex = new ReentrantLock();
121    
122      /**
123       * This class reports progress of the rebuild job at fixed intervals.
124       */
125      class ProgressTask extends TimerTask
126      {
127        /**
128         * The number of records that had been processed at the time of the
129         * previous progress report.
130         */
131        private long previousProcessed = 0;
132    
133        /**
134         * The time in milliseconds of the previous progress report.
135         */
136        private long previousTime;
137    
138        /**
139         * The environment statistics at the time of the previous report.
140         */
141        private EnvironmentStats prevEnvStats;
142    
143        /**
144         * The number of bytes in a megabyte.
145         * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB).
146         */
147        private static final int bytesPerMegabyte = 1024*1024;
148       /**
149         * Create a new verify progress task.
150         * @throws DatabaseException An error occurred while accessing the JE
151         * database.
152         */
153        public ProgressTask() throws DatabaseException
154        {
155          previousTime = System.currentTimeMillis();
156          prevEnvStats =
157              rootContainer.getEnvironmentStats(new StatsConfig());
158        }
159    
160        /**
161         * The action to be performed by this timer task.
162         */
163        public void run()
164        {
165          long latestTime = System.currentTimeMillis();
166          long deltaTime = latestTime - previousTime;
167    
168          if (deltaTime == 0)
169          {
170            return;
171          }
172    
173          long totalEntries = 0;
174          long latestProcessed = 0;
175    
176          ArrayList<IndexRebuildThread> allThreads =
177              new ArrayList<IndexRebuildThread>(waitingThreads);
178          allThreads.addAll(activeThreads);
179          allThreads.addAll(completedThreads);
180    
181          for(IndexRebuildThread thread : allThreads)
182          {
183            try
184            {
185              totalEntries += thread.getTotalEntries();
186              latestProcessed += thread.getProcessedEntries();
187    
188              if(debugEnabled())
189              {
190                TRACER.debugVerbose("Rebuild thread %s stats: total %d " +
191                    "processed %d rebuilt %d duplicated %d skipped %d",
192                             thread.getTotalEntries(), thread.getProcessedEntries(),
193                             thread.getRebuiltEntries(),
194                             thread.getDuplicatedEntries(),
195                             thread.getSkippedEntries());
196              }
197            }
198            catch(Exception e)
199            {
200              if(debugEnabled())
201              {
202                TRACER.debugCaught(DebugLogLevel.ERROR, e);
203              }
204            }
205          }
206    
207          long deltaCount = (latestProcessed - previousProcessed);
208          float rate = 1000f*deltaCount / deltaTime;
209          float completed = 0;
210          if(totalEntries > 0)
211          {
212            completed = 100f*latestProcessed / totalEntries;
213          }
214    
215          Message message = NOTE_JEB_REBUILD_PROGRESS_REPORT.get(
216              completed, latestProcessed, totalEntries, rate);
217          logError(message);
218    
219          try
220          {
221            Runtime runtime = Runtime.getRuntime();
222            long freeMemory = runtime.freeMemory() / bytesPerMegabyte;
223    
224            EnvironmentStats envStats =
225                rootContainer.getEnvironmentStats(new StatsConfig());
226            long nCacheMiss =
227                 envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
228    
229            float cacheMissRate = 0;
230            if (deltaCount > 0)
231            {
232              cacheMissRate = nCacheMiss/(float)deltaCount;
233            }
234    
235            message = NOTE_JEB_REBUILD_CACHE_AND_MEMORY_REPORT.get(
236                freeMemory, cacheMissRate);
237            logError(message);
238    
239            prevEnvStats = envStats;
240          }
241          catch (DatabaseException e)
242          {
243            if (debugEnabled())
244            {
245              TRACER.debugCaught(DebugLogLevel.ERROR, e);
246            }
247          }
248    
249    
250          previousProcessed = latestProcessed;
251          previousTime = latestTime;
252        }
253      }
254    
255      /**
256       * Construct a new rebuild job.
257       *
258       * @param rebuildConfig The configuration to use for this rebuild job.
259       */
260      public RebuildJob(RebuildConfig rebuildConfig)
261      {
262        this.rebuildConfig = rebuildConfig;
263      }
264    
265      private static void addJob(RebuildJob job)
266          throws DatabaseException, JebException
267      {
268        //Make sure there are no running rebuild jobs
269        jobsMutex.lock();
270    
271        try
272        {
273          for(RebuildJob otherJob : rebuildJobs)
274          {
275            String conflictIndex =
276                job.rebuildConfig.checkConflicts(otherJob.rebuildConfig);
277            if(conflictIndex != null)
278            {
279              if(debugEnabled())
280              {
281                TRACER.debugError("Conflit detected. This job config: %s, " +
282                    "That job config: %s.",
283                                  job.rebuildConfig, otherJob.rebuildConfig);
284              }
285    
286              Message msg = ERR_JEB_REBUILD_INDEX_CONFLICT.get(conflictIndex);
287              throw new JebException(msg);
288            }
289          }
290    
291          //No conflicts are found. Add the job to the list of currently running
292          // jobs.
293          rebuildJobs.add(job);
294        }
295        finally
296        {
297          jobsMutex.unlock();
298        }
299      }
300    
301      private static void removeJob(RebuildJob job)
302      {
303        jobsMutex.lock();
304    
305        rebuildJobs.remove(job);
306    
307        jobsMutex.unlock();
308      }
309    
310      /**
311       * Initiate the rebuild process on a backend.
312       *
313       * @param rootContainer The root container to rebuild in.
314       * @throws DirectoryException If an error occurs during the rebuild process.
315       * @throws DatabaseException If a JE database error occurs during the rebuild
316       *                           process.
317       * @throws JebException If a JE database error occurs during the rebuild
318       *                      process.
319       */
320      public void rebuildBackend(RootContainer rootContainer)
321          throws DirectoryException, DatabaseException, JebException
322      {
323        //TODO: Add check for only performing internal indexType rebuilds when
324        // backend is offline.
325    
326        addJob(this);
327    
328        try
329        {
330          this.rootContainer = rootContainer;
331          EntryContainer entryContainer =
332              rootContainer.getEntryContainer(rebuildConfig.getBaseDN());
333    
334          ArrayList<String> rebuildList = rebuildConfig.getRebuildList();
335    
336          if(!rebuildList.isEmpty())
337          {
338    
339            for (String index : rebuildList)
340            {
341              IndexRebuildThread rebuildThread;
342              String lowerName = index.toLowerCase();
343              if (lowerName.equals("dn2id"))
344              {
345                rebuildThread = new IndexRebuildThread(entryContainer,
346                                                IndexRebuildThread.IndexType.DN2ID);
347              }
348              else if (lowerName.equals("dn2uri"))
349              {
350                rebuildThread = new IndexRebuildThread(entryContainer,
351                                               IndexRebuildThread.IndexType.DN2URI);
352              }
353              else if (lowerName.equals("id2children"))
354              {
355                rebuildThread = new IndexRebuildThread(entryContainer,
356                                          IndexRebuildThread.IndexType.ID2CHILDREN);
357              }
358              else if (lowerName.equals("id2subtree"))
359              {
360                rebuildThread = new IndexRebuildThread(entryContainer,
361                                           IndexRebuildThread.IndexType.ID2SUBTREE);
362              }
363              else if (lowerName.startsWith("vlv."))
364              {
365                if(lowerName.length() < 5)
366                {
367                  Message msg = ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName);
368                  throw new JebException(msg);
369                }
370    
371                VLVIndex vlvIndex =
372                    entryContainer.getVLVIndex(lowerName.substring(4));
373                if(vlvIndex == null)
374                {
375                  Message msg =
376                      ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName.substring(4));
377                  throw new JebException(msg);
378                }
379    
380                rebuildThread = new IndexRebuildThread(entryContainer, vlvIndex);
381              }
382              else
383              {
384                String[] attrIndexParts = lowerName.split("\\.");
385                if(attrIndexParts.length <= 0)
386                {
387                  Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
388                  throw new JebException(msg);
389                }
390    
391                AttributeType attrType =
392                    DirectoryServer.getAttributeType(attrIndexParts[0]);
393    
394                if (attrType == null)
395                {
396                  Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
397                  throw new JebException(msg);
398                }
399                AttributeIndex attrIndex =
400                    entryContainer.getAttributeIndex(attrType);
401                if (attrIndex == null)
402                {
403                  Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
404                  throw new JebException(msg);
405                }
406    
407                if(attrIndexParts.length > 1)
408                {
409                  Index partialAttrIndex = null;
410                  if(attrIndexParts[1].equals("presence"))
411                  {
412                    partialAttrIndex = attrIndex.presenceIndex;
413                  }
414                  else if(attrIndexParts[1].equals("equality"))
415                  {
416                    partialAttrIndex = attrIndex.equalityIndex;
417                  }
418                  else if(attrIndexParts[1].equals("substring"))
419                  {
420                    partialAttrIndex = attrIndex.substringIndex;
421                  }
422                  else if(attrIndexParts[1].equals("ordering"))
423                  {
424                    partialAttrIndex = attrIndex.orderingIndex;
425                  }
426                  else if(attrIndexParts[1].equals("approximate"))
427                  {
428                    partialAttrIndex = attrIndex.approximateIndex;
429                  }
430    
431                  if(partialAttrIndex == null)
432                  {
433                    Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
434                    throw new JebException(msg);
435                  }
436    
437                  rebuildThread =
438                      new IndexRebuildThread(entryContainer, partialAttrIndex);
439                }
440                else
441                {
442                  rebuildThread = new IndexRebuildThread(entryContainer,
443                                                         attrIndex);
444                }
445              }
446    
447              waitingThreads.add(rebuildThread);
448    
449              if(debugEnabled())
450              {
451                TRACER.debugInfo("Created rebuild thread %s",
452                                 rebuildThread.getName());
453              }
454            }
455    
456            //Log a start message.
457            long totalToProcess = 0;
458    
459            for(IndexRebuildThread thread : waitingThreads)
460            {
461              totalToProcess += thread.getTotalEntries();
462            }
463    
464            StringBuilder sb = new StringBuilder();
465            for(String index : rebuildList)
466            {
467              if(sb.length() > 0)
468              {
469                sb.append(", ");
470              }
471              sb.append(index);
472            }
473            Message message =
474                NOTE_JEB_REBUILD_START.get(sb.toString(), totalToProcess);
475            logError(message);
476    
477            // Make a note of the time we started.
478            long startTime = System.currentTimeMillis();
479    
480            // Start a timer for the progress report.
481            Timer timer = new Timer();
482            TimerTask progressTask = new ProgressTask();
483            timer.scheduleAtFixedRate(progressTask, progressInterval,
484                                      progressInterval);
485    
486            entryContainer.exclusiveLock.lock();
487            try
488            {
489              for(IndexRebuildThread thread : waitingThreads)
490              {
491                thread.clearDatabase();
492              }
493            }
494            finally
495            {
496              if(!rebuildConfig.includesSystemIndex())
497              {
498                entryContainer.exclusiveLock.unlock();
499              }
500            }
501    
502    
503            if(!rebuildConfig.includesSystemIndex())
504            {
505              entryContainer.sharedLock.lock();
506            }
507            try
508            {
509              while(!waitingThreads.isEmpty())
510              {
511                dispatchThreads();
512                joinThreads();
513              }
514            }
515            finally
516            {
517              timer.cancel();
518              if(rebuildConfig.includesSystemIndex())
519              {
520                entryContainer.exclusiveLock.unlock();
521              }
522              else
523              {
524                entryContainer.sharedLock.unlock();
525              }
526            }
527    
528            long totalProcessed = 0;
529            long totalRebuilt = 0;
530            long totalDuplicated = 0;
531            long totalSkipped = 0;
532    
533            for(IndexRebuildThread thread : completedThreads)
534            {
535              totalProcessed += thread.getProcessedEntries();
536              totalRebuilt += thread.getRebuiltEntries();
537              totalDuplicated += thread.getDuplicatedEntries();
538              totalSkipped += thread.getSkippedEntries();
539            }
540    
541            long finishTime = System.currentTimeMillis();
542            long totalTime = (finishTime - startTime);
543    
544            float rate = 0;
545            if (totalTime > 0)
546            {
547              rate = 1000f*totalProcessed / totalTime;
548            }
549    
550            message = NOTE_JEB_REBUILD_FINAL_STATUS.get(
551                totalProcessed, totalTime/1000, rate);
552            logError(message);
553    
554            if(debugEnabled())
555            {
556              TRACER.debugInfo("Detailed overall rebuild job stats: rebuilt %d, " +
557                  "duplicated %d, skipped %d",
558                        totalRebuilt, totalDuplicated, totalSkipped);
559            }
560          }
561        }
562        finally
563        {
564          removeJob(this);
565        }
566    
567      }
568    
569      /**
570       * Dispatch a set of threads based on their dependency and ordering.
571       */
572      private void dispatchThreads() throws DatabaseException
573      {
574        for(IndexRebuildThread t : waitingThreads)
575        {
576          boolean start = true;
577    
578          //Check to see if we have exceeded the max number of threads to use at
579          //one time.
580          if(rebuildConfig.getMaxRebuildThreads() > 0 &&
581              activeThreads.size() > rebuildConfig.getMaxRebuildThreads())
582          {
583            if(debugEnabled())
584            {
585              TRACER.debugInfo("Delaying the start of thread %s because " +
586                  "the max number of rebuild threads has been reached.");
587            }
588            start = false;
589          }
590    
591          /**
592           * We may need to start the threads in stages since the rebuild process
593           * of some index types (id2children, id2subtree) depends on another
594           * index being rebuilt to be completed first.
595           */
596          if(t.getIndexType() == IndexRebuildThread.IndexType.ID2CHILDREN ||
597              t.getIndexType() == IndexRebuildThread.IndexType.ID2SUBTREE)
598          {
599            //Check to see if we have any waiting threads that needs to go
600            //first
601            for(IndexRebuildThread t2 : waitingThreads)
602            {
603              if(t2.getIndexType() == IndexRebuildThread.IndexType.DN2ID ||
604                  t2.getIndexType() == IndexRebuildThread.IndexType.DN2URI)
605              {
606                //We gotta wait for these to start before running the
607                //rebuild on ID2CHILDREN or ID2SUBTREE
608    
609                if(debugEnabled())
610                {
611                  TRACER.debugInfo("Delaying the start of thread %s because " +
612                      "it depends on another index rebuilt to " +
613                      "go first.", t.getName());
614                }
615                start = false;
616                break;
617              }
618            }
619    
620            //Check to see if we have any active threads that needs to
621            //finish first
622            for(IndexRebuildThread t3 : activeThreads)
623            {
624              if(t3.getIndexType() == IndexRebuildThread.IndexType.DN2ID ||
625                  t3.getIndexType() == IndexRebuildThread.IndexType.DN2URI)
626              {
627                //We gotta wait for these to start before running the
628                //rebuild on ID2CHILDREN or ID2SUBTREE
629    
630                if(debugEnabled())
631                {
632                  TRACER.debugInfo("Delaying the start of thread %s because " +
633                      "it depends on another index being rebuilt to " +
634                      "finish.", t.getName());
635                }
636                start = false;
637                break;
638              }
639            }
640          }
641    
642          if(start)
643          {
644            if(debugEnabled())
645            {
646              TRACER.debugInfo("Starting rebuild thread %s.", t.getName());
647            }
648            waitingThreads.remove(t);
649            activeThreads.add(t);
650            t.start();
651          }
652        }
653      }
654    
655      /**
656       * Wait for all worker activeThreads to exit.
657       */
658      private void joinThreads()
659      {
660        for (IndexRebuildThread t : activeThreads)
661        {
662          try
663          {
664            t.join();
665    
666            if(debugEnabled())
667            {
668              TRACER.debugInfo("Rebuild thread %s finished.", t.getName());
669            }
670            activeThreads.remove(t);
671            completedThreads.add(t);
672          }
673          catch (InterruptedException ie)
674          {
675            // No action needed?
676          }
677        }
678      }
679    }