001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.server;
028    import org.opends.messages.Message;
029    import org.opends.messages.MessageBuilder;
030    
031    import static org.opends.server.loggers.debug.DebugLogger.*;
032    
033    import org.opends.server.loggers.debug.DebugTracer;
034    import static org.opends.server.loggers.ErrorLogger.logError;
035    import static org.opends.messages.ReplicationMessages.*;
036    import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
037    
038    import java.io.IOException;
039    import java.util.ArrayList;
040    import java.util.LinkedHashSet;
041    import java.util.List;
042    import java.util.Map;
043    import java.util.Set;
044    import java.util.concurrent.ConcurrentHashMap;
045    import java.util.concurrent.Semaphore;
046    import java.util.concurrent.TimeUnit;
047    import java.util.Iterator;
048    
049    import org.opends.server.replication.common.ChangeNumber;
050    import org.opends.server.replication.common.ServerState;
051    import org.opends.server.replication.protocol.AckMessage;
052    import org.opends.server.replication.protocol.ErrorMessage;
053    import org.opends.server.replication.protocol.RoutableMessage;
054    import org.opends.server.replication.protocol.UpdateMessage;
055    import org.opends.server.replication.protocol.ReplServerInfoMessage;
056    import org.opends.server.replication.protocol.MonitorMessage;
057    import org.opends.server.replication.protocol.MonitorRequestMessage;
058    import org.opends.server.replication.protocol.ResetGenerationId;
059    import org.opends.server.types.DN;
060    import org.opends.server.types.DirectoryException;
061    import org.opends.server.types.ResultCode;
062    import org.opends.server.util.TimeThread;
063    import com.sleepycat.je.DatabaseException;
064    
065    /**
066     * This class define an in-memory cache that will be used to store
067     * the messages that have been received from an LDAP server or
068     * from another replication server and that should be forwarded to
069     * other servers.
070     *
071     * The size of the cache is set by configuration.
072     * If the cache becomes bigger than the configured size, the older messages
073     * are removed and should they be needed again must be read from the backing
074     * file
075     *
076     *
077     * it runs a thread that is responsible for saving the messages
078     * received to the disk and for trimming them
079     * Decision to trim can be based on disk space or age of the message
080     */
081    public class ReplicationServerDomain
082    {
083      private final Object flowControlLock = new Object();
084      private final DN baseDn;
085    
086      /*
087       * The following map contains one balanced tree for each replica ID
088       * to which we are currently publishing
089       * the first update in the balanced tree is the next change that we
090       * must push to this particular server
091       *
092       * We add new TreeSet in the HashMap when a new server register
093       * to this replication server.
094       *
095       */
096      private final Map<Short, ServerHandler> connectedServers =
097        new ConcurrentHashMap<Short, ServerHandler>();
098    
099      /*
100       * This map contains one ServerHandler for each replication servers
101       * with which we are connected (so normally all the replication servers)
102       * the first update in the balanced tree is the next change that we
103       * must push to this particular server
104       *
105       * We add new TreeSet in the HashMap when a new replication server register
106       * to this replication server.
107       */
108    
109      private final Map<Short, ServerHandler> replicationServers =
110        new ConcurrentHashMap<Short, ServerHandler>();
111    
112      /*
113       * This map contains the List of updates received from each
114       * LDAP server
115       */
116      private final Map<Short, DbHandler> sourceDbHandlers =
117        new ConcurrentHashMap<Short, DbHandler>();
118      private ReplicationServer replicationServer;
119    
120      /* GenerationId management */
121      private long generationId = -1;
122      private boolean generationIdSavedStatus = false;
123    
124      /**
125       * The tracer object for the debug logger.
126       */
127      private static final DebugTracer TRACER = getTracer();
128    
129      /* Monitor data management */
130    
131      // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
132      private long monitorDataLifeTime = 500;
133    
134      /* Search op on monitor data is processed by a worker thread.
135       * Requests are sent to the other RS,and responses are received by the
136       * listener threads.
137       * The worker thread is awoke on this semaphore, or on timeout.
138       */
139      Semaphore remoteMonitorResponsesSemaphore;
140    
141      /**
142       * The monitor data consolidated over the topology.
143       */
144      private  MonitorData monitorData = new MonitorData();
145      private  MonitorData wrkMonitorData;
146    
147      /**
148       * Creates a new ReplicationServerDomain associated to the DN baseDn.
149       *
150       * @param baseDn The baseDn associated to the ReplicationServerDomain.
151       * @param replicationServer the ReplicationServer that created this
152       *                          replicationServer cache.
153       */
154      public ReplicationServerDomain(DN baseDn, ReplicationServer replicationServer)
155      {
156        this.baseDn = baseDn;
157        this.replicationServer = replicationServer;
158      }
159    
160      /**
161       * Add an update that has been received to the list of
162       * updates that must be forwarded to all other servers.
163       *
164       * @param update  The update that has been received.
165       * @param sourceHandler The ServerHandler for the server from which the
166       *        update was received
167       * @throws IOException When an IO exception happens during the update
168       *         processing.
169       */
170      public void put(UpdateMessage update, ServerHandler sourceHandler)
171                  throws IOException
172      {
173        /*
174         * TODO : In case that the source server is a LDAP server this method
175         * should check that change did get pushed to at least one
176         * other replication server before pushing it to the LDAP servers
177         */
178    
179        short id  = update.getChangeNumber().getServerId();
180        sourceHandler.updateServerState(update);
181        sourceHandler.incrementInCount();
182    
183        if (update.isAssured())
184        {
185          int count = this.NumServers();
186          if (count > 1)
187          {
188            if (sourceHandler.isReplicationServer())
189              ServerHandler.addWaitingAck(update, sourceHandler.getServerId(),
190                                          this, count - 1);
191            else
192              sourceHandler.addWaitingAck(update, count - 1);
193          }
194          else
195          {
196            sourceHandler.sendAck(update.getChangeNumber());
197          }
198        }
199    
200        if (generationId < 0)
201        {
202          generationId = sourceHandler.getGenerationId();
203        }
204    
205        // look for the dbHandler that is responsible for the LDAP server which
206        // generated the change.
207        DbHandler dbHandler = null;
208        synchronized (sourceDbHandlers)
209        {
210          dbHandler   = sourceDbHandlers.get(id);
211          if (dbHandler == null)
212          {
213            try
214            {
215              dbHandler = replicationServer.newDbHandler(id, baseDn);
216              generationIdSavedStatus = true;
217            }
218            catch (DatabaseException e)
219            {
220              /*
221               * Because of database problem we can't save any more changes
222               * from at least one LDAP server.
223               * This replicationServer therefore can't do it's job properly anymore
224               * and needs to close all its connections and shutdown itself.
225               */
226              MessageBuilder mb = new MessageBuilder();
227              mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
228              mb.append(stackTraceToSingleLineString(e));
229              logError(mb.toMessage());
230              replicationServer.shutdown();
231              return;
232            }
233            sourceDbHandlers.put(id, dbHandler);
234          }
235        }
236    
237        // Publish the messages to the source handler
238        dbHandler.add(update);
239    
240    
241        /*
242         * Push the message to the replication servers
243         */
244        if (!sourceHandler.isReplicationServer())
245        {
246          for (ServerHandler handler : replicationServers.values())
247          {
248            handler.add(update, sourceHandler);
249          }
250        }
251    
252        /*
253         * Push the message to the LDAP servers
254         */
255        for (ServerHandler handler : connectedServers.values())
256        {
257          // don't forward the change to the server that just sent it
258          if (handler == sourceHandler)
259          {
260            continue;
261          }
262    
263          handler.add(update, sourceHandler);
264        }
265    
266      }
267    
268      /**
269       * Wait a short while for ServerId disconnection.
270       *
271       * @param serverId the serverId to be checked.
272       */
273      public void waitDisconnection(short serverId)
274      {
275        if (connectedServers.containsKey(serverId))
276        {
277          // try again
278          try
279          {
280            Thread.sleep(100);
281          } catch (InterruptedException e)
282          {
283          }
284        }
285      }
286    
287      /**
288       * Create initialize context necessary for finding the changes
289       * that must be sent to a given LDAP or replication server.
290       *
291       * @param handler handler for the server that must be started
292       * @throws Exception when method has failed
293       * @return A boolean indicating if the start was successfull.
294       */
295      public boolean startServer(ServerHandler handler) throws Exception
296      {
297        /*
298         * create the balanced tree that will be used to forward changes
299         */
300        synchronized (connectedServers)
301        {
302          ServerHandler oldHandler = connectedServers.get(handler.getServerId());
303    
304          if (connectedServers.containsKey(handler.getServerId()))
305          {
306            // looks like two LDAP servers have the same serverId
307            // log an error message and drop this connection.
308            Message message = ERR_DUPLICATE_SERVER_ID.get(
309                oldHandler.toString(), handler.toString(), handler.getServerId());
310            logError(message);
311            return false;
312          }
313          connectedServers.put(handler.getServerId(), handler);
314    
315          // It can be that the server that connects here is the
316          // first server connected for a domain.
317          // In that case, we will establish the appriopriate connections
318          // to the other repl servers for this domain and receive
319          // their ReplServerInfo messages.
320          // FIXME: Is it necessary to end this above processing BEFORE listening
321          //        to incoming messages for that domain ? But the replica
322          //        would raise Read Timeout for replica that connects.
323    
324          // Update the remote replication servers with our list
325          // of connected LDAP servers
326          sendReplServerInfo();
327    
328          return true;
329        }
330      }
331    
332      /**
333       * Stop operations with a given server.
334       *
335       * @param handler the server for which we want to stop operations
336       */
337      public void stopServer(ServerHandler handler)
338      {
339        if (debugEnabled())
340          TRACER.debugInfo(
341            "In RS " + this.replicationServer.getMonitorInstanceName() +
342            " for " + baseDn + " " +
343            " stopServer " + handler.getMonitorInstanceName());
344    
345    
346          if (handler.isReplicationServer())
347          {
348            if (replicationServers.containsValue(handler))
349            {
350              replicationServers.remove(handler.getServerId());
351              handler.stopHandler();
352    
353              // Update the remote replication servers with our list
354              // of connected LDAP servers
355              sendReplServerInfo();
356            }
357          }
358          else
359          {
360            if (connectedServers.containsValue(handler))
361            {
362              connectedServers.remove(handler.getServerId());
363              handler.stopHandler();
364    
365              // Update the remote replication servers with our list
366              // of connected LDAP servers
367              sendReplServerInfo();
368            }
369          }
370      }
371    
372      /**
373       * Resets the generationId for this domain if there is no LDAP
374       * server currently connected and if the generationId has never
375       * been saved.
376       */
377      protected void mayResetGenerationId()
378      {
379        if (debugEnabled())
380          TRACER.debugInfo(
381            "In RS " + this.replicationServer.getMonitorInstanceName() +
382            " for " + baseDn + " " +
383            " mayResetGenerationId generationIdSavedStatus=" +
384            generationIdSavedStatus);
385    
386        // If there is no more any LDAP server connected to this domain in the
387        // topology and the generationId has never been saved, then we can reset
388        // it and the next LDAP server to connect will become the new reference.
389        boolean lDAPServersConnectedInTheTopology = false;
390        if (connectedServers.isEmpty())
391        {
392          for (ServerHandler rsh : replicationServers.values())
393          {
394            if (generationId != rsh.getGenerationId())
395            {
396              if (debugEnabled())
397                TRACER.debugInfo(
398                    "In RS " + this.replicationServer.getMonitorInstanceName() +
399                    " for " + baseDn + " " +
400                    " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() +
401                    " thas different genId");
402            }
403            else
404            {
405              if (rsh.hasRemoteLDAPServers())
406              {
407                lDAPServersConnectedInTheTopology = true;
408    
409                if (debugEnabled())
410                  TRACER.debugInfo(
411                      "In RS " + this.replicationServer.getMonitorInstanceName() +
412                      " for " + baseDn + " " +
413                      " mayResetGenerationId RS" + rsh.getMonitorInstanceName() +
414                  " has servers connected to it - will not reset generationId");
415              }
416            }
417          }
418        }
419        else
420        {
421          lDAPServersConnectedInTheTopology = true;
422          if (debugEnabled())
423            TRACER.debugInfo(
424              "In RS " + this.replicationServer.getMonitorInstanceName() +
425              " for " + baseDn + " " +
426              " has servers connected to it - will not reset generationId");
427        }
428    
429        if ((!lDAPServersConnectedInTheTopology) && (!this.generationIdSavedStatus)
430            && (generationId != -1))
431        {
432          setGenerationId(-1, false);
433        }
434      }
435    
436      /**
437       * Create initialize context necessary for finding the changes
438       * that must be sent to a given replication server.
439       *
440       * @param handler the server ID to which we want to forward changes
441       * @throws Exception in case of errors
442       * @return A boolean indicating if the start was successfull.
443       */
444      public boolean startReplicationServer(ServerHandler handler) throws Exception
445      {
446        /*
447         * create the balanced tree that will be used to forward changes
448         */
449        synchronized (replicationServers)
450        {
451          ServerHandler oldHandler = replicationServers.get(handler.getServerId());
452          if ((oldHandler != null))
453          {
454            if (oldHandler.getServerAddressURL().equals(
455                handler.getServerAddressURL()))
456            {
457              // this is the same server, this means that our ServerStart messages
458              // have been sent at about the same time and 2 connections
459              // have been established.
460              // Silently drop this connection.
461            }
462            else
463            {
464              // looks like two replication servers have the same serverId
465              // log an error message and drop this connection.
466              Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.
467                  get(oldHandler.getServerAddressURL(),
468                      handler.getServerAddressURL(), handler.getServerId());
469              logError(message);
470            }
471            return false;
472          }
473          replicationServers.put(handler.getServerId(), handler);
474    
475          // Update this server with the list of LDAP servers
476          // already connected
477          handler.sendInfo(
478              new ReplServerInfoMessage(getConnectedLDAPservers(),generationId));
479    
480          return true;
481        }
482      }
483    
484    /**
485     * Get the next update that need to be sent to a given LDAP server.
486     * This call is blocking when no update is available or when dependencies
487     * do not allow to send the next available change
488     *
489     * @param  handler  The server handler for the target directory server.
490     *
491     * @return the update that must be forwarded
492     */
493      public UpdateMessage take(ServerHandler handler)
494      {
495        UpdateMessage msg;
496        /*
497         * Get the balanced tree that we use to sort the changes to be
498         * sent to the replica from the cookie
499         *
500         * The next change to send is always the first one in the tree
501         * So this methods simply need to check that dependencies are OK
502         * and update this replicaId RUV
503         *
504         *  TODO : dependency  :
505         *  before forwarding change, we should check that the dependency
506         *  that is indicated in this change is OK (change already in the RUV)
507         */
508        msg = handler.take();
509        synchronized (flowControlLock)
510        {
511          if (handler.restartAfterSaturation(null))
512            flowControlLock.notifyAll();
513        }
514        return msg;
515      }
516    
517      /**
518       * Return a Set of String containing the lists of Replication servers
519       * connected to this server.
520       * @return the set of connected servers
521       */
522      public Set<String> getChangelogs()
523      {
524        LinkedHashSet<String> mySet = new LinkedHashSet<String>();
525    
526        for (ServerHandler handler : replicationServers.values())
527        {
528          mySet.add(handler.getServerAddressURL());
529        }
530    
531        return mySet;
532      }
533    
534    
535      /**
536       * Return a Set containing the servers known by this replicationServer.
537       * @return a set containing the servers known by this replicationServer.
538       */
539      public Set<Short> getServers()
540      {
541        return sourceDbHandlers.keySet();
542      }
543    
544      /**
545       * Returns as a set of String the list of LDAP servers connected to us.
546       * Each string is the serverID of a connected LDAP server.
547       *
548       * @return The set of connected LDAP servers
549       */
550      public List<String> getConnectedLDAPservers()
551      {
552        List<String> mySet = new ArrayList<String>(0);
553    
554        for (ServerHandler handler : connectedServers.values())
555        {
556          mySet.add(String.valueOf(handler.getServerId()));
557        }
558        return mySet;
559      }
560    
561      /**
562       * Creates and returns an iterator.
563       * When the iterator is not used anymore, the caller MUST call the
564       * ReplicationIterator.releaseCursor() method to free the ressources
565       * and locks used by the ReplicationIterator.
566       *
567       * @param serverId Identifier of the server for which the iterator is created.
568       * @param changeNumber Starting point for the iterator.
569       * @return the created ReplicationIterator. Null when no DB is available
570       * for the provided server Id.
571       */
572      public ReplicationIterator getChangelogIterator(short serverId,
573                        ChangeNumber changeNumber)
574      {
575        DbHandler handler = sourceDbHandlers.get(serverId);
576        if (handler == null)
577          return null;
578    
579        try
580        {
581          return handler.generateIterator(changeNumber);
582        }
583        catch (Exception e)
584        {
585         return null;
586        }
587      }
588    
589      /**
590       * Returns the change count for that ReplicationServerDomain.
591       *
592       * @return the change count.
593       */
594      public long getChangesCount()
595      {
596        long entryCount = 0;
597        for (DbHandler dbHandler : sourceDbHandlers.values())
598        {
599          entryCount += dbHandler.getChangesCount();
600        }
601        return entryCount;
602      }
603    
604      /**
605       * Get the baseDn.
606       * @return Returns the baseDn.
607       */
608      public DN getBaseDn()
609      {
610        return baseDn;
611      }
612    
613      /**
614       * Sets the provided DbHandler associated to the provided serverId.
615       *
616       * @param serverId  the serverId for the server to which is
617       *                  associated the Dbhandler.
618       * @param dbHandler the dbHandler associated to the serverId.
619       *
620       * @throws DatabaseException If a database error happened.
621       */
622      public void setDbHandler(short serverId, DbHandler dbHandler)
623      throws DatabaseException
624      {
625        synchronized (sourceDbHandlers)
626        {
627          sourceDbHandlers.put(serverId , dbHandler);
628        }
629      }
630    
631      /**
632       * Get the number of currently connected servers.
633       *
634       * @return the number of currently connected servers.
635       */
636      private int NumServers()
637      {
638        return replicationServers.size() + connectedServers.size();
639      }
640    
641    
642      /**
643       * Add an ack to the list of ack received for a given change.
644       *
645       * @param message The ack message received.
646       * @param fromServerId The identifier of the server that sent the ack.
647       */
648      public void ack(AckMessage message, short fromServerId)
649      {
650        /*
651         * there are 2 possible cases here :
652         *  - the message that was acked comes from a server to which
653         *    we are directly connected.
654         *    In this case, we can find the handler from the connectedServers map
655         *  - the message that was acked comes from a server to which we are not
656         *    connected.
657         *    In this case we need to find the replication server that forwarded
658         *    the change and send back the ack to this server.
659         */
660        ServerHandler handler = connectedServers.get(
661                                           message.getChangeNumber().getServerId());
662        if (handler != null)
663          handler.ack(message, fromServerId);
664        else
665        {
666          ServerHandler.ackChangelog(message, fromServerId);
667        }
668      }
669    
670      /**
671       * Retrieves the destination handlers for a routable message.
672       *
673       * @param msg The message to route.
674       * @param senderHandler The handler of the server that published this message.
675       * @return The list of destination handlers.
676       */
677      protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
678          ServerHandler senderHandler)
679      {
680        List<ServerHandler> servers =
681          new ArrayList<ServerHandler>();
682    
683        if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
684        {
685          // TODO Import from the "closest server" to be implemented
686        }
687        else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
688        {
689          if (!senderHandler.isReplicationServer())
690          {
691            // Send to all replication servers with a least one remote
692            // server connected
693            for (ServerHandler rsh : replicationServers.values())
694            {
695              if (rsh.hasRemoteLDAPServers())
696              {
697                servers.add(rsh);
698              }
699            }
700          }
701    
702          // Sends to all connected LDAP servers
703          for (ServerHandler destinationHandler : connectedServers.values())
704          {
705            // Don't loop on the sender
706            if (destinationHandler == senderHandler)
707              continue;
708            servers.add(destinationHandler);
709          }
710        }
711        else
712        {
713          // Destination is one server
714          ServerHandler destinationHandler =
715            connectedServers.get(msg.getDestination());
716          if (destinationHandler != null)
717          {
718            servers.add(destinationHandler);
719          }
720          else
721          {
722            // the targeted server is NOT connected
723            // Let's search for THE changelog server that MAY
724            // have the targeted server connected.
725            if (senderHandler.isLDAPserver())
726            {
727              for (ServerHandler h : replicationServers.values())
728              {
729                // Send to all replication servers with a least one remote
730                // server connected
731                if (h.isRemoteLDAPServer(msg.getDestination()))
732                {
733                  servers.add(h);
734                }
735              }
736            }
737          }
738        }
739        return servers;
740      }
741    
742      /**
743       * Processes a message coming from one server in the topology
744       * and potentially forwards it to one or all other servers.
745       *
746       * @param msg The message received and to be processed.
747       * @param senderHandler The server handler of the server that emitted
748       * the message.
749       */
750      public void process(RoutableMessage msg, ServerHandler senderHandler)
751      {
752    
753        // Test the message for which a ReplicationServer is expected
754        // to be the destination
755        if (msg.getDestination() == this.replicationServer.getServerId())
756        {
757          if (msg instanceof ErrorMessage)
758          {
759            ErrorMessage errorMsg = (ErrorMessage)msg;
760            logError(ERR_ERROR_MSG_RECEIVED.get(
761                errorMsg.getDetails()));
762          }
763          else if (msg instanceof MonitorRequestMessage)
764          {
765            MonitorRequestMessage replServerMonitorRequestMsg =
766              (MonitorRequestMessage) msg;
767    
768            MonitorMessage monitorMsg =
769              new MonitorMessage(
770                  replServerMonitorRequestMsg.getDestination(),
771                  replServerMonitorRequestMsg.getsenderID());
772    
773            // Populate for each connected LDAP Server
774            // from the states stored in the serverHandler.
775            // - the server state
776            // - the older missing change
777            for (ServerHandler lsh : this.connectedServers.values())
778            {
779              monitorMsg.setServerState(
780                  lsh.getServerId(),
781                  lsh.getServerState(),
782                  lsh.getApproxFirstMissingDate(),
783                  true);
784            }
785    
786            // Same for the connected RS
787            for (ServerHandler rsh : this.replicationServers.values())
788            {
789              monitorMsg.setServerState(
790                  rsh.getServerId(),
791                  rsh.getServerState(),
792                  rsh.getApproxFirstMissingDate(),
793                  false);
794            }
795    
796            // Populate the RS state in the msg from the DbState
797            monitorMsg.setReplServerDbState(this.getDbServerState());
798    
799    
800            try
801            {
802              senderHandler.send(monitorMsg);
803            }
804            catch(Exception e)
805            {
806              // We log the error. The requestor will detect a timeout or
807              // any other failure on the connection.
808              logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
809                  Short.toString((msg.getDestination()))));
810            }
811          }
812          else if (msg instanceof MonitorMessage)
813          {
814            MonitorMessage monitorMsg =
815              (MonitorMessage) msg;
816    
817            receivesMonitorDataResponse(monitorMsg);
818          }
819          else
820          {
821            logError(NOTE_ERR_ROUTING_TO_SERVER.get(
822                msg.getClass().getCanonicalName()));
823          }
824          return;
825        }
826    
827        List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
828    
829        if (servers.isEmpty())
830        {
831          MessageBuilder mb = new MessageBuilder();
832          mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
833          mb.append(" In Replication Server=" + this.replicationServer.
834              getMonitorInstanceName());
835          mb.append(" domain =" + this.baseDn);
836          mb.append(" unroutable message =" + msg.toString());
837          mb.append(" routing table is empty");
838          ErrorMessage errMsg = new ErrorMessage(
839              this.replicationServer.getServerId(),
840              msg.getsenderID(),
841              mb.toMessage());
842          logError(mb.toMessage());
843          try
844          {
845            senderHandler.send(errMsg);
846          }
847          catch(IOException ioe)
848          {
849            // TODO Handle error properly (sender timeout in addition)
850            /*
851             * An error happened trying to send an error msg to this server.
852             * Log an error and close the connection to this server.
853             */
854            MessageBuilder mb2 = new MessageBuilder();
855            mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
856            mb2.append(stackTraceToSingleLineString(ioe));
857            logError(mb2.toMessage());
858            senderHandler.shutdown();
859          }
860        }
861        else
862        {
863          for (ServerHandler targetHandler : servers)
864          {
865            try
866            {
867              targetHandler.send(msg);
868            }
869            catch(IOException ioe)
870            {
871              /*
872               * An error happened trying the send a routabled message
873               * to its destination server.
874               * Send back an error to the originator of the message.
875               */
876              MessageBuilder mb = new MessageBuilder();
877              mb.append(ERR_CHANGELOG_ERROR_SENDING_MSG.get(this.toString()));
878              mb.append(stackTraceToSingleLineString(ioe));
879              mb.append(" ");
880              mb.append(msg.getClass().getCanonicalName());
881              logError(mb.toMessage());
882    
883              MessageBuilder mb1 = new MessageBuilder();
884              mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
885              mb1.append("serverID:" + msg.getDestination());
886              ErrorMessage errMsg = new ErrorMessage(
887                  msg.getsenderID(), mb1.toMessage());
888              try
889              {
890                senderHandler.send(errMsg);
891              }
892              catch(IOException ioe1)
893              {
894                // an error happened on the sender session trying to recover
895                // from an error on the receiver session.
896                // We don't have much solution left beside closing the sessions.
897                senderHandler.shutdown();
898                targetHandler.shutdown();
899              }
900              // TODO Handle error properly (sender timeout in addition)
901            }
902          }
903        }
904    
905      }
906    
907        /**
908         * Send back an ack to the server that sent the change.
909         *
910         * @param changeNumber The ChangeNumber of the change that must be acked.
911         * @param isLDAPserver This boolean indicates if the server that sent the
912         *                     change was an LDAP server or a ReplicationServer.
913         */
914        public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
915        {
916          short serverId = changeNumber.getServerId();
917          sendAck(changeNumber, isLDAPserver, serverId);
918        }
919    
920        /**
921         *
922         * Send back an ack to a server that sent the change.
923         *
924         * @param changeNumber The ChangeNumber of the change that must be acked.
925         * @param isLDAPserver This boolean indicates if the server that sent the
926         *                     change was an LDAP server or a ReplicationServer.
927         * @param serverId     The identifier of the server from which we
928         *                     received the change..
929         */
930        public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
931            short serverId)
932        {
933          ServerHandler handler;
934          if (isLDAPserver)
935            handler = connectedServers.get(serverId);
936          else
937            handler = replicationServers.get(serverId);
938    
939          // TODO : check for null handler and log error
940          try
941          {
942            handler.sendAck(changeNumber);
943          } catch (IOException e)
944          {
945            /*
946             * An error happened trying the send back an ack to this server.
947             * Log an error and close the connection to this server.
948             */
949            MessageBuilder mb = new MessageBuilder();
950            mb.append(ERR_CHANGELOG_ERROR_SENDING_ACK.get(this.toString()));
951            mb.append(stackTraceToSingleLineString(e));
952            logError(mb.toMessage());
953            handler.shutdown();
954          }
955        }
956    
957        /**
958         * Shutdown this ReplicationServerDomain.
959         */
960        public void shutdown()
961        {
962          // Close session with other changelogs
963          for (ServerHandler serverHandler : replicationServers.values())
964          {
965            serverHandler.shutdown();
966          }
967    
968          // Close session with other LDAP servers
969          for (ServerHandler serverHandler : connectedServers.values())
970          {
971            serverHandler.shutdown();
972          }
973    
974          // Shutdown the dbHandlers
975          synchronized (sourceDbHandlers)
976          {
977            for (DbHandler dbHandler : sourceDbHandlers.values())
978            {
979              dbHandler.shutdown();
980            }
981            sourceDbHandlers.clear();
982          }
983        }
984    
985        /**
986         * Returns the ServerState describing the last change from this replica.
987         *
988         * @return The ServerState describing the last change from this replica.
989         */
990        public ServerState getDbServerState()
991        {
992          ServerState serverState = new ServerState();
993          for (DbHandler db : sourceDbHandlers.values())
994          {
995            serverState.update(db.getLastChange());
996          }
997          return serverState;
998        }
999    
1000        /**
1001         * {@inheritDoc}
1002         */
1003        @Override
1004        public String toString()
1005        {
1006          return "ReplicationServerDomain " + baseDn;
1007        }
1008    
1009        /**
1010         * Check if some server Handler should be removed from flow control state.
1011         * @throws IOException If an error happened.
1012         */
1013        public void checkAllSaturation() throws IOException
1014        {
1015          for (ServerHandler handler : replicationServers.values())
1016          {
1017            handler.checkWindow();
1018          }
1019    
1020          for (ServerHandler handler : connectedServers.values())
1021          {
1022            handler.checkWindow();
1023          }
1024        }
1025    
1026        /**
1027         * Check if a server that was in flow control can now restart
1028         * sending updates.
1029         * @param sourceHandler The server that must be checked.
1030         * @return true if the server can restart sending changes.
1031         *         false if the server can't restart sending changes.
1032         */
1033        public boolean restartAfterSaturation(ServerHandler sourceHandler)
1034        {
1035          for (ServerHandler handler : replicationServers.values())
1036          {
1037            if (!handler.restartAfterSaturation(sourceHandler))
1038              return false;
1039          }
1040    
1041          for (ServerHandler handler : connectedServers.values())
1042          {
1043            if (!handler.restartAfterSaturation(sourceHandler))
1044              return false;
1045          }
1046          return true;
1047        }
1048    
1049        /**
1050         * Send a ReplServerInfoMessage to all the connected replication servers
1051         * in order to let them know our connected LDAP servers.
1052         */
1053        private void sendReplServerInfo()
1054        {
1055          ReplServerInfoMessage info =
1056            new ReplServerInfoMessage(getConnectedLDAPservers(), generationId);
1057          for (ServerHandler handler : replicationServers.values())
1058          {
1059            try
1060            {
1061              handler.sendInfo(info);
1062            }
1063            catch (IOException e)
1064            {
1065              /*
1066               * An error happened trying the send back an ack to this server.
1067               * Log an error and close the connection to this server.
1068               */
1069              MessageBuilder mb = new MessageBuilder();
1070              mb.append(ERR_CHANGELOG_ERROR_SENDING_INFO.get(this.toString()));
1071              mb.append(stackTraceToSingleLineString(e));
1072              logError(mb.toMessage());
1073              handler.shutdown();
1074            }
1075          }
1076        }
1077    
1078        /**
1079         * Get the generationId associated to this domain.
1080         *
1081         * @return The generationId
1082         */
1083        public long getGenerationId()
1084        {
1085          return generationId;
1086        }
1087    
1088        /**
1089         * Get the generationId saved status.
1090         *
1091         * @return The generationId saved status.
1092         */
1093        public boolean getGenerationIdSavedStatus()
1094        {
1095          return generationIdSavedStatus;
1096        }
1097    
1098        /**
1099         * Sets the provided value as the new in memory generationId.
1100         *
1101         * @param generationId The new value of generationId.
1102         * @param savedStatus  The saved status of the generationId.
1103         */
1104        synchronized public void setGenerationId(long generationId,
1105            boolean savedStatus)
1106        {
1107          if (debugEnabled())
1108            TRACER.debugInfo(
1109              "In " + this.replicationServer.getMonitorInstanceName() +
1110              " baseDN=" + baseDn +
1111              " RCache.set GenerationId=" + generationId);
1112    
1113          if (this.generationId != generationId)
1114          {
1115            // we are changing of genId
1116            clearDbs();
1117    
1118            this.generationId = generationId;
1119            this.generationIdSavedStatus = savedStatus;
1120    
1121            // they have a generationId different from the reference one
1122            for (ServerHandler handler : connectedServers.values())
1123            {
1124              if (generationId != handler.getGenerationId())
1125              {
1126                // Notify our remote LS that from now on have a different genID
1127                handler.warnBadGenerationId();
1128              }
1129            }
1130          }
1131        }
1132    
1133        /**
1134         * Resets the generationID.
1135         *
1136         * @param senderHandler The handler associated to the server
1137         *        that requested to reset the generationId.
1138         * @param genIdMsg The reset generation ID msg received.
1139         */
1140        public void resetGenerationId(ServerHandler senderHandler,
1141            ResetGenerationId genIdMsg)
1142        {
1143          long newGenId = genIdMsg.getGenerationId();
1144    
1145          if (newGenId != this.generationId)
1146          {
1147            this.setGenerationId(newGenId, false);
1148          }
1149    
1150          // If we are the first replication server warned,
1151          // then forwards the reset message to the remote replication servers
1152          for (ServerHandler rsHandler : replicationServers.values())
1153          {
1154            try
1155            {
1156              // After we'll have send the mssage , the remote RS will adopt
1157              // the new genId
1158              rsHandler.setGenerationId(newGenId);
1159              if (senderHandler.isLDAPserver())
1160              {
1161                rsHandler.forwardGenerationIdToRS(genIdMsg);
1162              }
1163            }
1164            catch (IOException e)
1165            {
1166              logError(ERR_CHANGELOG_ERROR_SENDING_INFO.
1167                  get(rsHandler.getMonitorInstanceName()));
1168            }
1169          }
1170        }
1171    
1172        /**
1173         * Clears the Db associated with that cache.
1174         */
1175        public void clearDbs()
1176        {
1177          // Reset the localchange and state db for the current domain
1178          synchronized (sourceDbHandlers)
1179          {
1180            for (DbHandler dbHandler : sourceDbHandlers.values())
1181            {
1182              try
1183              {
1184                dbHandler.clear();
1185              }
1186              catch (Exception e)
1187              {
1188                // TODO: i18n
1189                MessageBuilder mb = new MessageBuilder();
1190                mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
1191                    e.getMessage() + " " +
1192                    stackTraceToSingleLineString(e)));
1193                logError(mb.toMessage());
1194              }
1195            }
1196            sourceDbHandlers.clear();
1197    
1198            if (debugEnabled())
1199              TRACER.debugInfo(
1200                  "In " + this.replicationServer.getMonitorInstanceName() +
1201                  " baseDN=" + baseDn +
1202              " The source db handler has been cleared");
1203          }
1204          try
1205          {
1206            replicationServer.clearGenerationId(baseDn);
1207          }
1208          catch (Exception e)
1209          {
1210            // TODO: i18n
1211            logError(Message.raw(
1212                "Exception caught while clearing generationId:" +
1213                e.getLocalizedMessage()));
1214          }
1215        }
1216    
1217        /**
1218         * Returns whether the provided server is in degraded
1219         * state due to the fact that the peer server has an invalid
1220         * generationId for this domain.
1221         *
1222         * @param serverId The serverId for which we want to know the
1223         *                 the state.
1224         * @return Whether it is degraded or not.
1225         */
1226    
1227        public boolean isDegradedDueToGenerationId(short serverId)
1228        {
1229          if (debugEnabled())
1230            TRACER.debugInfo(
1231                "In " + this.replicationServer.getMonitorInstanceName() +
1232                " baseDN=" + baseDn +
1233                " isDegraded serverId=" + serverId +
1234                " given local generation Id=" + this.generationId);
1235    
1236          ServerHandler handler = replicationServers.get(serverId);
1237          if (handler == null)
1238          {
1239            handler = connectedServers.get(serverId);
1240            if (handler == null)
1241            {
1242              return false;
1243            }
1244          }
1245    
1246          if (debugEnabled())
1247            TRACER.debugInfo(
1248                "In " + this.replicationServer.getMonitorInstanceName() +
1249                " baseDN=" + baseDn +
1250                " Compute degradation of serverId=" + serverId +
1251                " LS server generation Id=" + handler.getGenerationId());
1252          return (handler.getGenerationId() != this.generationId);
1253        }
1254    
1255        /**
1256         * Return the associated replication server.
1257         * @return The replication server.
1258         */
1259        public ReplicationServer getReplicationServer()
1260        {
1261          return replicationServer;
1262        }
1263    
1264        /**
1265         * Process reception of a ReplServerInfoMessage.
1266         *
1267         * @param infoMsg The received message.
1268         * @param handler The handler that received the message.
1269         * @throws IOException when raised by the underlying session.
1270         */
1271        public void receiveReplServerInfo(
1272            ReplServerInfoMessage infoMsg, ServerHandler handler) throws IOException
1273        {
1274          if (debugEnabled())
1275          {
1276            if (handler.isReplicationServer())
1277              TRACER.debugInfo(
1278               "In RS " + getReplicationServer().getServerId() +
1279               " Receiving replServerInfo from " + handler.getServerId() +
1280               " baseDn=" + baseDn +
1281               " genId=" + infoMsg.getGenerationId());
1282          }
1283    
1284          mayResetGenerationId();
1285          if (generationId < 0)
1286            generationId = handler.getGenerationId();
1287          if (generationId > 0 && (generationId != infoMsg.getGenerationId()))
1288          {
1289            Message message = NOTE_BAD_GENERATION_ID.get(
1290                baseDn.toNormalizedString(),
1291                Short.toString(handler.getServerId()),
1292                Long.toString(infoMsg.getGenerationId()),
1293                Long.toString(generationId));
1294    
1295            ErrorMessage errorMsg = new ErrorMessage(
1296                getReplicationServer().getServerId(),
1297                handler.getServerId(),
1298                message);
1299            handler.sendError(errorMsg);
1300          }
1301        }
1302    
1303        /* =======================
1304         * Monitor Data generation
1305         * =======================
1306         */
1307    
1308        /**
1309         * Retrieves the global monitor data.
1310         * @return The monitor data.
1311         * @throws DirectoryException When an error occurs.
1312         */
1313        synchronized protected MonitorData getMonitorData()
1314          throws DirectoryException
1315        {
1316          if (monitorData.getBuildDate() + monitorDataLifeTime
1317              > TimeThread.getTime())
1318          {
1319            if (debugEnabled())
1320              TRACER.debugInfo(
1321              "In " + this.replicationServer.getMonitorInstanceName() +
1322              " baseDn=" + baseDn + " getRemoteMonitorData in cache");
1323            // The current data are still valid. No need to renew them.
1324            return monitorData;
1325          }
1326    
1327          wrkMonitorData = new MonitorData();
1328          synchronized(wrkMonitorData)
1329          {
1330            if (debugEnabled())
1331              TRACER.debugInfo(
1332              "In " + this.replicationServer.getMonitorInstanceName() +
1333              " baseDn=" + baseDn + " Computing monitor data ");
1334    
1335            // Let's process our directly connected LSes
1336            // - in the ServerHandler for a given LS1, the stored state contains :
1337            //   - the max CN produced by LS1
1338            //   - the last CN consumed by LS1 from LS2..n
1339            // - in the RSdomain/dbHandler, the built-in state contains :
1340            //   - the max CN produced by each server
1341            // So for a given LS connected we can take the state and the max from
1342            // the LS/state.
1343    
1344            for (ServerHandler directlsh : connectedServers.values())
1345            {
1346              short serverID = directlsh.getServerId();
1347    
1348              // the state comes from the state stored in the SH
1349              ServerState directlshState = directlsh.getServerState().duplicate();
1350    
1351              // the max CN sent by that LS also comes from the SH
1352              ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID);
1353              if (maxcn == null)
1354              {
1355                // This directly connected LS has never produced any change
1356                maxcn = new ChangeNumber(0, 0 , serverID);
1357              }
1358              wrkMonitorData.setMaxCN(serverID, maxcn);
1359              wrkMonitorData.setLDAPServerState(serverID, directlshState);
1360              wrkMonitorData.setFirstMissingDate(serverID, directlsh.
1361                                                 getApproxFirstMissingDate());
1362            }
1363    
1364            // Then initialize the max CN for the LS that produced something
1365            // - from our own local db state
1366            // - whatever they are directly or undirectly connected
1367            ServerState dbServerState = getDbServerState();
1368            Iterator<Short> it = dbServerState.iterator();
1369            while (it.hasNext())
1370            {
1371              short sid = it.next();
1372              ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
1373              wrkMonitorData.setMaxCN(sid, storedCN);
1374            }
1375    
1376            // Now we have used all available local informations
1377            // and we need the remote ones.
1378            if (debugEnabled())
1379              TRACER.debugInfo(
1380                "In " + this.replicationServer.getMonitorInstanceName() +
1381                " baseDn=" + baseDn + " Local monitor data: " +
1382                wrkMonitorData.toString());
1383          }
1384    
1385          // Send Request to the other Replication Servers
1386          if (remoteMonitorResponsesSemaphore == null)
1387          {
1388            remoteMonitorResponsesSemaphore = new Semaphore(0);
1389            short requestCnt = sendMonitorDataRequest();
1390            // Wait reponses from them or timeout
1391            waitMonitorDataResponses(requestCnt);
1392          }
1393          else
1394          {
1395            // The processing of renewing the monitor cache is already running
1396            // We'll make it sleeping until the end
1397            // TODO: unit test for this case.
1398            while (remoteMonitorResponsesSemaphore!=null)
1399            {
1400              waitMonitorDataResponses(1);
1401            }
1402          }
1403    
1404          wrkMonitorData.completeComputing();
1405    
1406          // Store the new computed data as the reference
1407          synchronized(monitorData)
1408          {
1409            // Now we have the expected answers or an error occured
1410            monitorData = wrkMonitorData;
1411            wrkMonitorData = null;
1412            if (debugEnabled())
1413              TRACER.debugInfo(
1414              "In " + this.replicationServer.getMonitorInstanceName() +
1415              " baseDn=" + baseDn + " *** Computed MonitorData: " +
1416              monitorData.toString());
1417          }
1418          return monitorData;
1419        }
1420    
1421    
1422        /**
1423         * Sends a MonitorRequest message to all connected RS.
1424         * @return the number of requests sent.
1425         * @throws DirectoryException when a problem occurs.
1426         */
1427        protected short sendMonitorDataRequest()
1428          throws DirectoryException
1429        {
1430          short sent=0;
1431          try
1432          {
1433            for (ServerHandler rs : replicationServers.values())
1434            {
1435              MonitorRequestMessage msg = new
1436                MonitorRequestMessage(this.replicationServer.getServerId(),
1437                  rs.getServerId());
1438              rs.send(msg);
1439              sent++;
1440            }
1441          }
1442          catch(Exception e)
1443          {
1444            Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
1445            logError(message);
1446            throw new DirectoryException(ResultCode.OTHER,
1447                message, e);
1448          }
1449          return sent;
1450        }
1451    
1452        /**
1453         * Wait for the expected count of received MonitorMessage.
1454         * @param expectedResponses The number of expected answers.
1455         * @throws DirectoryException When an error occurs.
1456         */
1457        protected void waitMonitorDataResponses(int expectedResponses)
1458          throws DirectoryException
1459        {
1460          try
1461          {
1462            if (debugEnabled())
1463              TRACER.debugInfo(
1464              "In " + this.replicationServer.getMonitorInstanceName() +
1465              " baseDn=" + baseDn +
1466              " waiting for " + expectedResponses
1467              + " expected monitor messages");
1468    
1469            boolean allPermitsAcquired =
1470              remoteMonitorResponsesSemaphore.tryAcquire(
1471                  expectedResponses,
1472                  (long) 5000, TimeUnit.MILLISECONDS);
1473    
1474            if (!allPermitsAcquired)
1475            {
1476              logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
1477              // let's go on in best effort even with limited data received.
1478            }
1479            else
1480            {
1481              if (debugEnabled())
1482                TRACER.debugInfo(
1483                "In " + this.replicationServer.getMonitorInstanceName() +
1484                " baseDn=" + baseDn +
1485                " Successfully received all " + expectedResponses
1486                + " expected monitor messages");
1487            }
1488          }
1489          catch(Exception e)
1490          {
1491            logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
1492          }
1493          finally
1494          {
1495            remoteMonitorResponsesSemaphore = null;
1496          }
1497        }
1498    
1499        /**
1500         * Processes a Monitor message receives from a remote Replication Server
1501         * and stores the data received.
1502         *
1503         * @param msg The message to be processed.
1504         */
1505        public void receivesMonitorDataResponse(MonitorMessage msg)
1506        {
1507          if (debugEnabled())
1508            TRACER.debugInfo(
1509            "In " + this.replicationServer.getMonitorInstanceName() +
1510            "Receiving " + msg + " from " + msg.getsenderID() +
1511            remoteMonitorResponsesSemaphore);
1512    
1513          if (remoteMonitorResponsesSemaphore == null)
1514          {
1515            // Let's ignore the remote monitor data just received
1516            // since the computing processing has been ended.
1517            // An error - probably a timemout - occured that was already logged
1518            logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
1519                Short.toString(msg.getsenderID())));
1520            return;
1521          }
1522    
1523          try
1524          {
1525            synchronized(wrkMonitorData)
1526            {
1527              // Here is the RS state : list <serverID, lastChangeNumber>
1528              // For each LDAP Server, we keep the max CN accross the RSes
1529              ServerState replServerState = msg.getReplServerDbState();
1530              wrkMonitorData.setMaxCNs(replServerState);
1531    
1532              // Store the remote LDAP servers states
1533              Iterator<Short> lsidIterator = msg.ldapIterator();
1534              while (lsidIterator.hasNext())
1535              {
1536                short sid = lsidIterator.next();
1537                wrkMonitorData.setLDAPServerState(sid,
1538                    msg.getLDAPServerState(sid).duplicate());
1539                wrkMonitorData.setFirstMissingDate(sid,
1540                    msg.getLDAPApproxFirstMissingDate(sid));
1541              }
1542    
1543              // Process the latency reported by the remote RSi on its connections
1544              // to the other RSes
1545              Iterator<Short> rsidIterator = msg.rsIterator();
1546              while (rsidIterator.hasNext())
1547              {
1548                short rsid = rsidIterator.next();
1549                if (rsid == replicationServer.getServerId())
1550                {
1551                  // this is the latency of the remote RSi regarding the current RS
1552                  // let's update the fmd of my connected LS
1553                  for (ServerHandler connectedlsh : connectedServers.values())
1554                  {
1555                    short connectedlsid = connectedlsh.getServerId();
1556                    Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
1557                    wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
1558                  }
1559                }
1560                else
1561                {
1562                  // this is the latency of the remote RSi regarding another RSj
1563                  // let's update the latency of the LSes connected to RSj
1564                  ServerHandler rsjHdr = replicationServers.get(rsid);
1565                  if (rsjHdr != null)
1566                  {
1567                    for(short remotelsid : rsjHdr.getConnectedServerIds())
1568                    {
1569                      Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
1570                      wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
1571                    }
1572                  }
1573                }
1574              }
1575              if (debugEnabled())
1576              {
1577                if (debugEnabled())
1578                  TRACER.debugInfo(
1579                  "In " + this.replicationServer.getMonitorInstanceName() +
1580                  " baseDn=" + baseDn +
1581                  " Processed msg from " + msg.getsenderID() +
1582                  " New monitor data: " + wrkMonitorData.toString());
1583              }
1584            }
1585    
1586            // Decreases the number of expected responses and potentially
1587            // wakes up the waiting requestor thread.
1588            remoteMonitorResponsesSemaphore.release();
1589    
1590          }
1591          catch (Exception e)
1592          {
1593            logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() +
1594                stackTraceToSingleLineString(e)));
1595    
1596            // If an exception occurs while processing one of the expected message,
1597            // the processing is aborted and the waiting thread is awoke.
1598            remoteMonitorResponsesSemaphore.notifyAll();
1599          }
1600        }
1601    
1602        /**
1603         * Set the purge delay on all the db Handlers for this Domain
1604         * of Replicaiton.
1605         *
1606         * @param delay The new purge delay to use.
1607         */
1608        void setPurgeDelay(long delay)
1609        {
1610          for (DbHandler handler : sourceDbHandlers.values())
1611          {
1612            handler.setPurgeDelay(delay);
1613          }
1614        }
1615    }