001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.plugin;
028    import java.util.concurrent.LinkedBlockingQueue;
029    import java.util.concurrent.TimeUnit;
030    import org.opends.messages.Message;
031    
032    import static org.opends.server.loggers.ErrorLogger.logError;
033    import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
034    import static org.opends.server.loggers.debug.DebugLogger.getTracer;
035    import static org.opends.messages.ReplicationMessages.*;
036    import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
037    
038    import org.opends.server.api.DirectoryThread;
039    import org.opends.server.loggers.debug.DebugTracer;
040    import org.opends.server.replication.protocol.UpdateMessage;
041    
042    /**
043     * Thread that is used to get messages from the Replication servers
044     * and replay them in the current server.
045     */
046    public class ListenerThread extends DirectoryThread
047    {
048      /**
049       * The tracer object for the debug logger.
050       */
051      private static final DebugTracer TRACER = getTracer();
052    
053      private ReplicationDomain repDomain;
054      private boolean shutdown = false;
055      private boolean done = false;
056      private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
057    
058    
059      /**
060       * Constructor for the ListenerThread.
061       *
062       * @param repDomain the replication domain that created this thread
063       * @param updateToReplayQueue The update messages queue we must
064       * store messages in
065       */
066      public ListenerThread(ReplicationDomain repDomain,
067        LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
068      {
069         super("Replication Listener thread " +
070             "serverID=" + repDomain.serverId +
071             " domain=" + repDomain.getName());
072         this.repDomain = repDomain;
073         this.updateToReplayQueue = updateToReplayQueue;
074      }
075    
076      /**
077       * Shutdown this listener thread.
078       */
079      public void shutdown()
080      {
081        shutdown = true;
082      }
083    
084      /**
085       * Run method for this class.
086       */
087      @Override
088      public void run()
089      {
090        UpdateMessage updateMsg = null;
091    
092        if (debugEnabled())
093        {
094          TRACER.debugInfo("Replication Listener thread starting.");
095        }
096    
097        while (!shutdown)
098        {
099          try
100          {
101            // Loop receiving update messages and puting them in the update message
102            // queue
103            while ((!shutdown) && ((updateMsg = repDomain.receive()) != null))
104            {
105              // Put update message into the queue (block until some place in the
106              // queue is available)
107              UpdateToReplay updateToReplay =
108                new UpdateToReplay(updateMsg, repDomain);
109              boolean queued = false;
110              while (!queued && !shutdown)
111              {
112                // Use timedout method (offer) instead of put for being able to
113                // shutdown the thread
114                queued = updateToReplayQueue.offer(updateToReplay,
115                  1L, TimeUnit.SECONDS);
116              }
117              if (!queued)
118              {
119                // Shutdown requested but could not push message: ensure this one is
120                // not lost and put it in the queue before dying
121                updateToReplayQueue.offer(updateToReplay);
122              }
123            }
124            if (updateMsg == null)
125              shutdown = true;
126          } catch (Exception e)
127          {
128            /*
129             * catch all exceptions happening in repDomain.receive so that the
130             * thread never dies even in case of problems.
131             */
132            Message message = ERR_EXCEPTION_RECEIVING_REPLICATION_MESSAGE.get(
133                stackTraceToSingleLineString(e));
134            logError(message);
135          }
136        }
137    
138        // Stop the HeartBeat thread
139        repDomain.getBroker().stopHeartBeat();
140    
141        done = true;
142    
143        if (debugEnabled())
144        {
145          TRACER.debugInfo("Replication Listener thread stopping.");
146        }
147      }
148    
149      /**
150       * Wait for the completion of this thread.
151       */
152      public void waitForShutdown()
153      {
154        try
155        {
156          while (done == false)
157          {
158            Thread.sleep(50);
159          }
160        } catch (InterruptedException e)
161        {
162          // exit the loop if this thread is interrupted.
163        }
164      }
165    }