001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.plugin;
028    import java.util.concurrent.BlockingQueue;
029    import java.util.concurrent.TimeUnit;
030    import org.opends.messages.Message;
031    
032    import static org.opends.server.loggers.ErrorLogger.logError;
033    import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
034    import static org.opends.server.loggers.debug.DebugLogger.getTracer;
035    import static org.opends.messages.ReplicationMessages.*;
036    import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
037    
038    import org.opends.server.api.DirectoryThread;
039    import org.opends.server.loggers.debug.DebugTracer;
040    import org.opends.server.replication.protocol.UpdateMessage;
041    
042    /**
043     * Thread that is used to get message from the replication servers (stored
044     * in the updates queue) and replay them in the current server. A configurable
045     * number of this thread is created for the whole MultimasterReplication object
046     * (i.e: these threads are shared accross the ReplicationDomain objects for
047     * replaying the updates they receive)
048     */
049    public class ReplayThread extends DirectoryThread
050    {
051      /**
052       * The tracer object for the debug logger.
053       */
054      private static final DebugTracer TRACER = getTracer();
055    
056      private BlockingQueue<UpdateToReplay> updateToReplayQueue = null;
057      private boolean shutdown = false;
058      private boolean done = false;
059      private static int count = 0;
060    
061      /**
062       * Constructor for the ReplayThread.
063       *
064       * @param updateToReplayQueue The queue of update messages we have to replay
065       */
066      public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
067      {
068         super("Replication Replay thread " + count++);
069         this.updateToReplayQueue = updateToReplayQueue;
070      }
071    
072      /**
073       * Shutdown this replay thread.
074       */
075      public void shutdown()
076      {
077        shutdown = true;
078      }
079    
080      /**
081       * Run method for this class.
082       */
083      @Override
084      public void run()
085      {
086    
087        if (debugEnabled())
088        {
089          TRACER.debugInfo("Replication Replay thread starting.");
090        }
091    
092        UpdateToReplay updateToreplay = null;
093    
094        while (!shutdown)
095        {
096          try
097          {
098            // Loop getting an updateToReplayQueue from the update message queue and
099            // replaying matching changes
100            while ( (!shutdown) &&
101              ((updateToreplay = updateToReplayQueue.poll(1L,
102              TimeUnit.SECONDS)) != null))
103            {
104              // Find replication domain for that update message
105              UpdateMessage updateMsg = updateToreplay.getUpdateMessage();
106              ReplicationDomain domain = updateToreplay.getReplicationDomain();
107              domain.replay(updateMsg);
108            }
109          } catch (Exception e)
110          {
111            /*
112             * catch all exceptions happening so that the thread never dies even
113             * in case of problems.
114             */
115            Message message = ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE.get(
116                stackTraceToSingleLineString(e));
117            logError(message);
118          }
119        }
120        done = true;
121        if (debugEnabled())
122        {
123          TRACER.debugInfo("Replication Replay thread stopping.");
124        }
125      }
126    
127      /**
128       * Wait for the completion of this thread.
129       */
130      public void waitForShutdown()
131      {
132        try
133        {
134          while ((done == false) && (this.isAlive()))
135          {
136            Thread.sleep(50);
137          }
138        } catch (InterruptedException e)
139        {
140          // exit the loop if this thread is interrupted.
141        }
142      }
143    }