001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.server;
028    
029    import org.opends.messages.*;
030    
031    import static org.opends.server.loggers.ErrorLogger.logError;
032    import static org.opends.server.loggers.debug.DebugLogger.*;
033    
034    import org.opends.server.loggers.debug.DebugTracer;
035    import static org.opends.messages.ReplicationMessages.*;
036    import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
037    
038    import java.io.IOException;
039    import java.util.Date;
040    import java.util.List;
041    import java.util.ArrayList;
042    import java.util.HashMap;
043    import java.util.LinkedHashSet;
044    import java.util.Map;
045    import java.util.Set;
046    import java.util.SortedSet;
047    import java.util.TreeSet;
048    import java.util.concurrent.ConcurrentHashMap;
049    import java.util.concurrent.Semaphore;
050    import java.util.concurrent.TimeUnit;
051    
052    import org.opends.server.admin.std.server.MonitorProviderCfg;
053    import org.opends.server.api.MonitorProvider;
054    import org.opends.server.config.ConfigException;
055    import org.opends.server.core.DirectoryServer;
056    import org.opends.server.replication.common.ChangeNumber;
057    import org.opends.server.replication.common.ServerState;
058    import org.opends.server.replication.protocol.*;
059    import org.opends.server.types.Attribute;
060    import org.opends.server.types.AttributeType;
061    import org.opends.server.types.AttributeValue;
062    import org.opends.server.types.DN;
063    import org.opends.server.types.InitializationException;
064    import org.opends.server.util.TimeThread;
065    
066    /**
067     * This class defines a server handler, which handles all interaction with a
068     * replication server.
069     */
070    public class ServerHandler extends MonitorProvider<MonitorProviderCfg>
071    {
072      /**
073       * The tracer object for the debug logger.
074       */
075      private static final DebugTracer TRACER = getTracer();
076    
077      /**
078       * Time during which the server will wait for existing thread to stop
079       * during the shutdown.
080       */
081      private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
082    
083      private short serverId;
084      private ProtocolSession session;
085      private final MsgQueue msgQueue = new MsgQueue();
086      private MsgQueue lateQueue = new MsgQueue();
087      private final Map<ChangeNumber, AckMessageList> waitingAcks  =
088              new HashMap<ChangeNumber, AckMessageList>();
089      private ReplicationServerDomain replicationServerDomain = null;
090      private String serverURL;
091      private int outCount = 0; // number of update sent to the server
092      private int inCount = 0;  // number of updates received from the server
093      private int inAckCount = 0;
094      private int outAckCount = 0;
095      private int maxReceiveQueue = 0;
096      private int maxSendQueue = 0;
097      private int maxReceiveDelay = 0;
098      private int maxSendDelay = 0;
099      private int maxQueueSize = 10000;
100      private int restartReceiveQueue;
101      private int restartSendQueue;
102      private int restartReceiveDelay;
103      private int restartSendDelay;
104      private boolean serverIsLDAPserver;
105      private boolean following = false;
106      private ServerState serverState;
107      private boolean active = true;
108      private ServerWriter writer = null;
109      private DN baseDn = null;
110      private String serverAddressURL;
111      private int rcvWindow;
112      private int rcvWindowSizeHalf;
113      private int maxRcvWindow;
114      private ServerReader reader;
115      private Semaphore sendWindow;
116      private int sendWindowSize;
117      private boolean flowControl = false; // indicate that the server is
118                                           // flow controled and should
119                                           // be stopped from sending messsages.
120      private int saturationCount = 0;
121      private short replicationServerId;
122    
123      private short protocolVersion;
124      private long generationId = -1;
125    
126    
127      /**
128       * When this Handler is related to a remote replication server
129       * this collection will contain as many elements as there are
130       * LDAP servers connected to the remote replication server.
131       */
132      private final Map<Short, LightweightServerHandler> connectedServers =
133        new ConcurrentHashMap<Short, LightweightServerHandler>();
134    
135      /**
136       * The time in milliseconds between heartbeats from the replication
137       * server.  Zero means heartbeats are off.
138       */
139      private long heartbeatInterval = 0;
140    
141      /**
142       * The thread that will send heartbeats.
143       */
144      HeartbeatThread heartbeatThread = null;
145    
146      /**
147       * Set when ServerHandler is stopping.
148       */
149      private boolean shutdown = false;
150    
151      private static final Map<ChangeNumber, ReplServerAckMessageList>
152       changelogsWaitingAcks =
153           new HashMap<ChangeNumber, ReplServerAckMessageList>();
154    
155      /**
156       * Creates a new server handler instance with the provided socket.
157       *
158       * @param session The ProtocolSession used by the ServerHandler to
159       *                 communicate with the remote entity.
160       * @param queueSize The maximum number of update that will be kept
161       *                  in memory by this ServerHandler.
162       */
163      public ServerHandler(ProtocolSession session, int queueSize)
164      {
165        super("Server Handler");
166        this.session = session;
167        this.maxQueueSize = queueSize;
168        this.protocolVersion = ProtocolVersion.currentVersion();
169      }
170    
171      /**
172       * Do the exchange of start messages to know if the remote
173       * server is an LDAP or replication server and to exchange serverID.
174       * Then create the reader and writer thread.
175       *
176       * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
177       *               null if this is an incoming connection (listen).
178       * @param replicationServerId The identifier of the replicationServer that
179       *                            creates this server handler.
180       * @param replicationServerURL The URL of the replicationServer that creates
181       *                             this server handler.
182       * @param windowSize the window size that this server handler must use.
183       * @param sslEncryption For outgoing connections indicates whether encryption
184       *                      should be used after the exchange of start messages.
185       *                      Ignored for incoming connections.
186       * @param replicationServer the ReplicationServer that created this server
187       *                          handler.
188       */
189      public void start(DN baseDn, short replicationServerId,
190                        String replicationServerURL,
191                        int windowSize, boolean sslEncryption,
192                        ReplicationServer replicationServer)
193      {
194        if (debugEnabled())
195          TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() +
196                    " starts a new LS or RS " +
197                    ((baseDn == null)?"incoming connection":"outgoing connection"));
198    
199        this.replicationServerId = replicationServerId;
200        rcvWindowSizeHalf = windowSize/2;
201        maxRcvWindow = windowSize;
202        rcvWindow = windowSize;
203        long localGenerationId = -1;
204        boolean handshakeOnly = false;
205    
206        try
207        {
208          if (baseDn != null)
209          {
210            // This is an outgoing connection. Publish our start message.
211            this.baseDn = baseDn;
212    
213            // Get or create the ReplicationServerDomain
214            replicationServerDomain =
215                    replicationServer.getReplicationServerDomain(baseDn, true);
216            localGenerationId = replicationServerDomain.getGenerationId();
217    
218            ServerState localServerState =
219                    replicationServerDomain.getDbServerState();
220            ReplServerStartMessage msg =
221              new ReplServerStartMessage(replicationServerId, replicationServerURL,
222                                        baseDn, windowSize, localServerState,
223                                        protocolVersion, localGenerationId,
224                                        sslEncryption);
225    
226            session.publish(msg);
227          }
228    
229          // Wait and process ServerStart or ReplServerStart
230          ReplicationMessage msg = session.receive();
231          if (msg instanceof ServerStartMessage)
232          {
233            // The remote server is an LDAP Server.
234            ServerStartMessage receivedMsg = (ServerStartMessage) msg;
235    
236            generationId = receivedMsg.getGenerationId();
237            protocolVersion = ProtocolVersion.minWithCurrent(
238                receivedMsg.getVersion());
239            serverId = receivedMsg.getServerId();
240            serverURL = receivedMsg.getServerURL();
241            this.baseDn = receivedMsg.getBaseDn();
242            this.serverState = receivedMsg.getServerState();
243    
244            maxReceiveDelay = receivedMsg.getMaxReceiveDelay();
245            maxReceiveQueue = receivedMsg.getMaxReceiveQueue();
246            maxSendDelay = receivedMsg.getMaxSendDelay();
247            maxSendQueue = receivedMsg.getMaxSendQueue();
248            heartbeatInterval = receivedMsg.getHeartbeatInterval();
249    
250            handshakeOnly = receivedMsg.isHandshakeOnly();
251    
252            // The session initiator decides whether to use SSL.
253            sslEncryption = receivedMsg.getSSLEncryption();
254    
255            if (maxReceiveQueue > 0)
256              restartReceiveQueue = (maxReceiveQueue > 1000 ?
257                                      maxReceiveQueue - 200 :
258                                      maxReceiveQueue*8/10);
259            else
260              restartReceiveQueue = 0;
261    
262            if (maxSendQueue > 0)
263              restartSendQueue = (maxSendQueue  > 1000 ? maxSendQueue - 200 :
264                maxSendQueue*8/10);
265            else
266              restartSendQueue = 0;
267    
268            if (maxReceiveDelay > 0)
269              restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay -1 :
270                maxReceiveDelay);
271            else
272              restartReceiveDelay = 0;
273    
274            if (maxSendDelay > 0)
275              restartSendDelay = (maxSendDelay > 10 ?
276                                  maxSendDelay -1 :
277                                  maxSendDelay);
278            else
279              restartSendDelay = 0;
280    
281            if (heartbeatInterval < 0)
282            {
283              heartbeatInterval = 0;
284            }
285    
286            serverIsLDAPserver = true;
287    
288            // Get or Create the ReplicationServerDomain
289            replicationServerDomain =
290                    replicationServer.getReplicationServerDomain(this.baseDn, true);
291    
292            replicationServerDomain.waitDisconnection(receivedMsg.getServerId());
293            replicationServerDomain.mayResetGenerationId();
294    
295            localGenerationId = replicationServerDomain.getGenerationId();
296    
297            ServerState localServerState =
298                    replicationServerDomain.getDbServerState();
299            // This an incoming connection. Publish our start message
300            ReplServerStartMessage myStartMsg =
301              new ReplServerStartMessage(replicationServerId, replicationServerURL,
302                                        this.baseDn, windowSize, localServerState,
303                                        protocolVersion, localGenerationId,
304                                        sslEncryption);
305            session.publish(myStartMsg);
306            sendWindowSize = receivedMsg.getWindowSize();
307    
308            /* Until here session is encrypted then it depends on the negociation */
309            if (!sslEncryption)
310            {
311              session.stopEncryption();
312            }
313    
314            if (debugEnabled())
315            {
316              Set<String> ss = this.serverState.toStringSet();
317              Set<String> lss =
318                      replicationServerDomain.getDbServerState().toStringSet();
319              TRACER.debugInfo("In " + replicationServerDomain.
320                       getReplicationServer().getMonitorInstanceName() +
321                       ", SH received START from LS serverId=" + serverId +
322                       " baseDN=" + this.baseDn +
323                       " generationId=" + generationId +
324                       " localGenerationId=" + localGenerationId +
325                       " state=" + ss +
326                       " and sent ReplServerStart with state=" + lss);
327            }
328    
329            /*
330             * If we have already a generationID set for the domain
331             * then
332             *   if the connecting replica has not the same
333             *   then it is degraded locally and notified by an error message
334             * else
335             *   we set the generationID from the one received
336             *   (unsaved yet on disk . will be set with the 1rst change received)
337             */
338            if (localGenerationId>0)
339            {
340              if (generationId != localGenerationId)
341              {
342                Message message = NOTE_BAD_GENERATION_ID.get(
343                    receivedMsg.getBaseDn().toNormalizedString(),
344                    Short.toString(receivedMsg.getServerId()),
345                    Long.toString(generationId),
346                    Long.toString(localGenerationId));
347    
348                ErrorMessage errorMsg =
349                  new ErrorMessage(replicationServerId, serverId, message);
350                session.publish(errorMsg);
351              }
352            }
353            else
354            {
355              // We are an empty Replicationserver
356              if ((generationId>0)&&(!serverState.isEmpty()))
357              {
358                // If the LDAP server has already sent changes
359                // it is not expected to connect to an empty RS
360                Message message = NOTE_BAD_GENERATION_ID.get(
361                    receivedMsg.getBaseDn().toNormalizedString(),
362                    Short.toString(receivedMsg.getServerId()),
363                    Long.toString(generationId),
364                    Long.toString(localGenerationId));
365    
366                ErrorMessage errorMsg =
367                  new ErrorMessage(replicationServerId, serverId, message);
368                session.publish(errorMsg);
369              }
370              else
371              {
372                replicationServerDomain.setGenerationId(generationId, false);
373              }
374            }
375          }
376          else if (msg instanceof ReplServerStartMessage)
377          {
378            // The remote server is a replication server
379            ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
380            protocolVersion = ProtocolVersion.minWithCurrent(
381                receivedMsg.getVersion());
382            generationId = receivedMsg.getGenerationId();
383            serverId = receivedMsg.getServerId();
384            serverURL = receivedMsg.getServerURL();
385            int separator = serverURL.lastIndexOf(':');
386            serverAddressURL =
387              session.getRemoteAddress() + ":" + serverURL.substring(separator + 1);
388            serverIsLDAPserver = false;
389            this.baseDn = receivedMsg.getBaseDn();
390            if (baseDn == null)
391            {
392              // Get or create the ReplicationServerDomain
393              replicationServerDomain = replicationServer.
394                      getReplicationServerDomain(this.baseDn, true);
395              localGenerationId = replicationServerDomain.getGenerationId();
396              ServerState serverState = replicationServerDomain.getDbServerState();
397    
398              // The session initiator decides whether to use SSL.
399              sslEncryption = receivedMsg.getSSLEncryption();
400    
401              // Publish our start message
402              ReplServerStartMessage outMsg =
403                new ReplServerStartMessage(replicationServerId,
404                                           replicationServerURL,
405                                           this.baseDn, windowSize, serverState,
406                                           protocolVersion,
407                                           localGenerationId,
408                                           sslEncryption);
409              session.publish(outMsg);
410            }
411            else
412            {
413              this.baseDn = baseDn;
414            }
415            this.serverState = receivedMsg.getServerState();
416            sendWindowSize = receivedMsg.getWindowSize();
417    
418            /* Until here session is encrypted then it depends on the negociation */
419            if (!sslEncryption)
420            {
421              session.stopEncryption();
422            }
423    
424            if (debugEnabled())
425            {
426              Set<String> ss = this.serverState.toStringSet();
427              Set<String> lss =
428                      replicationServerDomain.getDbServerState().toStringSet();
429              TRACER.debugInfo("In " + replicationServerDomain.
430                       getReplicationServer().getMonitorInstanceName() +
431                       ", SH received START from RS serverId=" + serverId +
432                       " baseDN=" + this.baseDn +
433                       " generationId=" + generationId +
434                       " localGenerationId=" + localGenerationId +
435                       " state=" + ss +
436                       " and sent ReplServerStart with state=" + lss);
437            }
438    
439            // if the remote RS and the local RS have the same genID
440            // then it's ok and nothing else to do
441            if (generationId == localGenerationId)
442            {
443              if (debugEnabled())
444              {
445                TRACER.debugInfo("In " +
446                        replicationServerDomain.getReplicationServer().
447                  getMonitorInstanceName() + " RS with serverID=" + serverId +
448                  " is connected with the right generation ID");
449              }
450            }
451            else
452            {
453              if (localGenerationId>0)
454              {
455                // if the local RS is initialized
456                if (generationId>0)
457                {
458                  // if the remote RS is initialized
459                  if (generationId != localGenerationId)
460                  {
461                    // if the 2 RS have different generationID
462                    if (replicationServerDomain.getGenerationIdSavedStatus())
463                    {
464                      // it the present RS has received changes regarding its
465                      //     gen ID and so won't change without a reset
466                      // then  we are just degrading the peer.
467                      Message message = NOTE_BAD_GENERATION_ID.get(
468                          this.baseDn.toNormalizedString(),
469                          Short.toString(receivedMsg.getServerId()),
470                          Long.toString(generationId),
471                          Long.toString(localGenerationId));
472    
473                      ErrorMessage errorMsg =
474                        new ErrorMessage(replicationServerId, serverId, message);
475                      session.publish(errorMsg);
476                    }
477                    else
478                    {
479                      // The present RS has never received changes regarding its
480                      // gen ID.
481                      //
482                      // Example case:
483                      // - we are in RS1
484                      // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
485                      // - RS1 has genId1 from LS1 /genId1 comes from data in suffix
486                      // - we are in RS1 and we receive a START msg from RS2
487                      // - Each RS keeps its genID / is degraded and when LS2 will
488                      //   be populated from LS1 everything will becomes ok.
489                      //
490                      // Issue:
491                      // FIXME : Would it be a good idea in some cases to just
492                      //         set the gen ID received from the peer RS
493                      //         specially if the peer has a non nul state and
494                      //         we have a nul state ?
495                      // replicationServerDomain.
496                      // setGenerationId(generationId, false);
497                      Message message = NOTE_BAD_GENERATION_ID.get(
498                          this.baseDn.toNormalizedString(),
499                          Short.toString(receivedMsg.getServerId()),
500                          Long.toString(generationId),
501                          Long.toString(localGenerationId));
502    
503                      ErrorMessage errorMsg =
504                        new ErrorMessage(replicationServerId, serverId, message);
505                      session.publish(errorMsg);
506                    }
507                  }
508                }
509                else
510                {
511                  // The remote has no genId. We don't change anything for the
512                  // current RS.
513                }
514              }
515              else
516              {
517                // The local RS is not initialized - take the one received
518                replicationServerDomain.setGenerationId(generationId, false);
519              }
520            }
521          }
522          else
523          {
524            // TODO : log error
525            return;   // we did not recognize the message, ignore it
526          }
527    
528          // Get or create the ReplicationServerDomain
529          replicationServerDomain = replicationServer.
530                  getReplicationServerDomain(this.baseDn,true);
531    
532          if (!handshakeOnly)
533          {
534            boolean started;
535            if (serverIsLDAPserver)
536            {
537              started = replicationServerDomain.startServer(this);
538            }
539            else
540            {
541              started = replicationServerDomain.startReplicationServer(this);
542            }
543    
544            if (started)
545            {
546              // sendWindow MUST be created before starting the writer
547              sendWindow = new Semaphore(sendWindowSize);
548    
549              writer = new ServerWriter(session, serverId,
550                  this, replicationServerDomain);
551              reader = new ServerReader(session, serverId,
552                  this, replicationServerDomain);
553    
554              reader.start();
555              writer.start();
556    
557              // Create a thread to send heartbeat messages.
558              if (heartbeatInterval > 0)
559              {
560                heartbeatThread = new HeartbeatThread(
561                    "replication Heartbeat to " + serverURL +
562                    " for " + this.baseDn,
563                    session, heartbeatInterval/3);
564                heartbeatThread.start();
565              }
566    
567              DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
568              DirectoryServer.registerMonitorProvider(this);
569            }
570            else
571            {
572              // the connection is not valid, close it.
573              try
574              {
575                if (debugEnabled())
576                {
577                  TRACER.debugInfo("In " +
578                      replicationServerDomain.getReplicationServer().
579                      getMonitorInstanceName() + " RS failed to start locally " +
580                      " the connection from serverID="+serverId);
581                }
582                session.close();
583              } catch (IOException e1)
584              {
585                // ignore
586              }
587            }
588          }
589          else
590          {
591            // For a hanshakeOnly connection, let's only create a reader
592            // in order to detect the connection closure.
593            reader = new ServerReader(session, serverId,
594                this, replicationServerDomain);
595            reader.start();
596          }
597        }
598        catch (Exception e)
599        {
600          // some problem happened, reject the connection
601          MessageBuilder mb = new MessageBuilder();
602          mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(
603              this.getMonitorInstanceName()));
604          mb.append(stackTraceToSingleLineString(e));
605          logError(mb.toMessage());
606          try
607          {
608            session.close();
609          } catch (IOException e1)
610          {
611            // ignore
612          }
613        }
614      }
615    
616      /**
617       * get the Server Id.
618       *
619       * @return the ID of the server to which this object is linked
620       */
621      public short getServerId()
622      {
623        return serverId;
624      }
625    
626      /**
627       * Retrieves the Address URL for this server handler.
628       *
629       * @return  The Address URL for this server handler,
630       *          in the form of an IP address and port separated by a colon.
631       */
632      public String getServerAddressURL()
633      {
634        return serverAddressURL;
635      }
636    
637      /**
638       * Retrieves the URL for this server handler.
639       *
640       * @return  The URL for this server handler, in the form of an address and
641       *          port separated by a colon.
642       */
643      public String getServerURL()
644      {
645        return serverURL;
646      }
647    
648      /**
649       * Increase the counter of updates sent to the server.
650       */
651      public void incrementOutCount()
652      {
653        outCount++;
654      }
655    
656      /**
657       * Increase the counter of update received from the server.
658       */
659      public void incrementInCount()
660      {
661        inCount++;
662      }
663    
664      /**
665       * Get the count of updates received from the server.
666       * @return the count of update received from the server.
667       */
668      public int getInCount()
669      {
670        return inCount;
671      }
672    
673      /**
674       * Get the count of updates sent to this server.
675       * @return  The count of update sent to this server.
676       */
677      public int getOutCount()
678      {
679        return outCount;
680      }
681    
682      /**
683       * Get the number of Ack received from the server managed by this handler.
684       *
685       * @return Returns the inAckCount.
686       */
687      public int getInAckCount()
688      {
689        return inAckCount;
690      }
691    
692      /**
693       * Get the number of Ack sent to the server managed by this handler.
694       *
695       * @return Returns the outAckCount.
696       */
697      public int getOutAckCount()
698      {
699        return outAckCount;
700      }
701    
702      /**
703       * Check is this server is saturated (this server has already been
704       * sent a bunch of updates and has not processed them so they are staying
705       * in the message queue for this server an the size of the queue
706       * for this server is above the configured limit.
707       *
708       * The limit can be defined in number of updates or with a maximum delay
709       *
710       * @param changeNumber The changenumber to use to make the delay calculations.
711       * @param sourceHandler The ServerHandler which is sending the update.
712       * @return true is saturated false if not saturated.
713       */
714      public boolean isSaturated(ChangeNumber changeNumber,
715                                 ServerHandler sourceHandler)
716      {
717        synchronized (msgQueue)
718        {
719          int size = msgQueue.size();
720    
721          if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue))
722            return true;
723    
724          if ((sourceHandler.maxSendQueue > 0) &&
725              (size >= sourceHandler.maxSendQueue))
726            return true;
727    
728          if (!msgQueue.isEmpty())
729          {
730            UpdateMessage firstUpdate = msgQueue.first();
731    
732            if (firstUpdate != null)
733            {
734              long timeDiff = changeNumber.getTimeSec() -
735              firstUpdate.getChangeNumber().getTimeSec();
736    
737              if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay))
738                return true;
739    
740              if ((sourceHandler.maxSendDelay > 0) &&
741                  (timeDiff >= sourceHandler.maxSendDelay))
742                return true;
743            }
744          }
745          return false;
746        }
747      }
748    
749      /**
750       * Check that the size of the Server Handler messages Queue has lowered
751       * below the limit and therefore allowing the reception of messages
752       * from other servers to restart.
753       * @param source The ServerHandler which was sending the update.
754       *        can be null.
755       * @return true if the processing can restart
756       */
757      public boolean restartAfterSaturation(ServerHandler source)
758      {
759        synchronized (msgQueue)
760        {
761          int queueSize = msgQueue.size();
762          if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
763            return false;
764          if ((source != null) && (source.maxSendQueue > 0) &&
765               (queueSize >= source.restartSendQueue))
766            return false;
767    
768          if (!msgQueue.isEmpty())
769          {
770            UpdateMessage firstUpdate = msgQueue.first();
771            UpdateMessage lastUpdate = msgQueue.last();
772    
773            if ((firstUpdate != null) && (lastUpdate != null))
774            {
775              long timeDiff = lastUpdate.getChangeNumber().getTimeSec() -
776                   firstUpdate.getChangeNumber().getTimeSec();
777              if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay))
778                return false;
779              if ((source != null) && (source.maxSendDelay > 0)
780                   && (timeDiff >= source.restartSendDelay))
781                return false;
782            }
783          }
784        }
785        return true;
786      }
787    
788      /**
789       * Check if the server associated to this ServerHandler is a replication
790       * server.
791       * @return true if the server associated to this ServerHandler is a
792       *         replication server.
793       */
794      public boolean isReplicationServer()
795      {
796        return (!serverIsLDAPserver);
797      }
798    
799      /**
800       * Get the number of message in the receive message queue.
801       * @return Size of the receive message queue.
802       */
803      public int getRcvMsgQueueSize()
804      {
805       synchronized (msgQueue)
806       {
807        /*
808         * When the server is up to date or close to be up to date,
809         * the number of updates to be sent is the size of the receive queue.
810         */
811         if (isFollowing())
812           return msgQueue.size();
813         else
814         {
815           /*
816            * When the server  is not able to follow, the msgQueue
817            * may become too large and therefore won't contain all the
818            * changes. Some changes may only be stored in the backing DB
819            * of the servers.
820            * The total size of teh receieve queue is calculated by doing
821            * the sum of the number of missing changes for every dbHandler.
822            */
823           int totalCount = 0;
824           ServerState dbState = replicationServerDomain.getDbServerState();
825           for (short id : dbState)
826           {
827             totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id),
828                 serverState.getMaxChangeNumber(id));
829           }
830           return totalCount;
831         }
832       }
833      }
834    
835      /**
836       * Get an approximation of the delay by looking at the age of the oldest
837       * message that has not been sent to this server.
838       * This is an approximation because the age is calculated using the
839       * clock of the servee where the replicationServer is currently running
840       * while it should be calculated using the clock of the server
841       * that originally processed the change.
842       *
843       * The approximation error is therefore the time difference between
844       *
845       * @return the approximate delay for the connected server.
846       */
847      public long getApproxDelay()
848      {
849        long olderUpdateTime = getOlderUpdateTime();
850        if (olderUpdateTime == 0)
851          return 0;
852    
853        long currentTime = TimeThread.getTime();
854        return ((currentTime - olderUpdateTime)/1000);
855      }
856    
857      /**
858       * Get the age of the older change that has not yet been replicated
859       * to the server handled by this ServerHandler.
860       * @return The age if the older change has not yet been replicated
861       *         to the server handled by this ServerHandler.
862       */
863      public Long getApproxFirstMissingDate()
864      {
865        Long result = (long)0;
866    
867        // Get the older CN received
868        ChangeNumber olderUpdateCN = getOlderUpdateCN();
869        if (olderUpdateCN != null)
870        {
871          // If not present in the local RS db,
872          // then approximate with the older update time
873          result=olderUpdateCN.getTime();
874        }
875        return result;
876      }
877    
878      /**
879       * Get the older update time for that server.
880       * @return The older update time.
881       */
882      public long getOlderUpdateTime()
883      {
884        ChangeNumber olderUpdateCN = getOlderUpdateCN();
885        if (olderUpdateCN == null)
886          return 0;
887        return  olderUpdateCN.getTime();
888      }
889    
890      /**
891       * Get the older Change Number for that server.
892       * Returns null when the queue is empty.
893       * @return The older change number.
894       */
895      public ChangeNumber getOlderUpdateCN()
896      {
897        ChangeNumber result = null;
898        synchronized (msgQueue)
899        {
900          if (isFollowing())
901          {
902            if (msgQueue.isEmpty())
903            {
904              result=null;
905            }
906            else
907            {
908              UpdateMessage msg = msgQueue.first();
909              result = msg.getChangeNumber();
910            }
911          }
912          else
913          {
914            if (lateQueue.isEmpty())
915            {
916              // isFollowing is false AND lateQueue is empty
917              // We may be at the very moment when the writer has emptyed the
918              // lateQueue when it sent the last update. The writer will fill again
919              // the lateQueue when it will send the next update but we are not yet
920              // there. So let's take the last change not sent directly from
921              // the db.
922    
923              ReplicationIteratorComparator comparator =
924                new ReplicationIteratorComparator();
925              SortedSet<ReplicationIterator> iteratorSortedSet =
926                new TreeSet<ReplicationIterator>(comparator);
927              try
928              {
929                // Build a list of candidates iterator (i.e. db i.e. server)
930                for (short serverId : replicationServerDomain.getServers())
931                {
932                  // get the last already sent CN from that server
933                  ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
934                  // get an iterator in this server db from that last change
935                  ReplicationIterator iterator =
936                    replicationServerDomain.getChangelogIterator(serverId, lastCsn);
937                  // if that iterator has changes, then it is a candidate
938                  // it is added in the sorted list at a position given by its
939                  // current change (see ReplicationIteratorComparator).
940                  if ((iterator != null) && (iterator.getChange() != null))
941                  {
942                    iteratorSortedSet.add(iterator);
943                  }
944                }
945                UpdateMessage msg = iteratorSortedSet.first().getChange();
946                result = msg.getChangeNumber();
947              }
948              catch(Exception e)
949              {
950                result=null;
951              }
952              finally
953              {
954                for (ReplicationIterator iterator : iteratorSortedSet)
955                {
956                  iterator.releaseCursor();
957                }
958              }
959            }
960            else
961            {
962              UpdateMessage msg = lateQueue.first();
963              result = msg.getChangeNumber();
964            }
965          }
966        }
967        return result;
968      }
969    
970      /**
971       * Check if the LDAP server can follow the speed of the other servers.
972       * @return true when the server has all the not yet sent changes
973       *         in its queue.
974       */
975      public boolean isFollowing()
976      {
977        return following;
978      }
979    
980      /**
981       * Set the following flag of this server.
982       * @param following the value that should be set.
983       */
984      public void setFollowing(boolean following)
985      {
986        this.following = following;
987      }
988    
989      /**
990       * Add an update the list of updates that must be sent to the server
991       * managed by this ServerHandler.
992       *
993       * @param update The update that must be added to the list of updates.
994       * @param sourceHandler The server that sent the update.
995       */
996      public void add(UpdateMessage update, ServerHandler sourceHandler)
997      {
998        /*
999         * Ignore updates from a server that is degraded due to
1000         * its inconsistent generationId
1001         */
1002        long referenceGenerationId = replicationServerDomain.getGenerationId();
1003        if ((referenceGenerationId>0) &&
1004            (referenceGenerationId != generationId))
1005        {
1006          logError(ERR_IGNORING_UPDATE_TO.get(
1007                   update.getDn(),
1008                   this.getMonitorInstanceName()));
1009    
1010          return;
1011        }
1012    
1013        synchronized (msgQueue)
1014        {
1015          /*
1016           * If queue was empty the writer thread was probably asleep
1017           * waiting for some changes, wake it up
1018           */
1019          if (msgQueue.isEmpty())
1020            msgQueue.notify();
1021    
1022          msgQueue.add(update);
1023    
1024          /* TODO : size should be configurable
1025           * and larger than max-receive-queue-size
1026           */
1027          while (msgQueue.size() > maxQueueSize)
1028          {
1029            setFollowing(false);
1030            msgQueue.removeFirst();
1031          }
1032        }
1033    
1034        if (isSaturated(update.getChangeNumber(), sourceHandler))
1035        {
1036          sourceHandler.setSaturated(true);
1037        }
1038    
1039      }
1040    
1041      private void setSaturated(boolean value)
1042      {
1043        flowControl = value;
1044      }
1045    
1046      /**
1047       * Select the next update that must be sent to the server managed by this
1048       * ServerHandler.
1049       *
1050       * @return the next update that must be sent to the server managed by this
1051       *         ServerHandler.
1052       */
1053      public UpdateMessage take()
1054      {
1055        boolean interrupted = true;
1056        UpdateMessage msg = getnextMessage();
1057    
1058        /*
1059         * When we remove a message from the queue we need to check if another
1060         * server is waiting in flow control because this queue was too long.
1061         * This check might cause a performance penalty an therefore it
1062         * is not done for every message removed but only every few messages.
1063         */
1064        if (++saturationCount > 10)
1065        {
1066          saturationCount = 0;
1067          try
1068          {
1069            replicationServerDomain.checkAllSaturation();
1070          }
1071          catch (IOException e)
1072          {
1073          }
1074        }
1075        boolean acquired = false;
1076        do
1077        {
1078          try
1079          {
1080            acquired = sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS);
1081            interrupted = false;
1082          } catch (InterruptedException e)
1083          {
1084            // loop until not interrupted
1085          }
1086        } while (((interrupted) || (!acquired )) && (!shutdown));
1087        this.incrementOutCount();
1088        return msg;
1089      }
1090    
1091      /**
1092       * Get the next update that must be sent to the server
1093       * from the message queue or from the database.
1094       *
1095       * @return The next update that must be sent to the server.
1096       */
1097      private UpdateMessage getnextMessage()
1098      {
1099        UpdateMessage msg;
1100        while (active == true)
1101        {
1102          if (following == false)
1103          {
1104            /* this server is late with regard to some other masters
1105             * in the topology or just joined the topology.
1106             * In such cases, we can't keep all changes in the queue
1107             * without saturating the memory, we therefore use
1108             * a lateQueue that is filled with a few changes from the changelogDB
1109             * If this server is able to close the gap, it will start using again
1110             * the regular msgQueue later.
1111             */
1112            if (lateQueue.isEmpty())
1113            {
1114              /*
1115               * Start from the server State
1116               * Loop until the queue high mark or until no more changes
1117               *   for each known LDAP master
1118               *      get the next CSN after this last one :
1119               *         - try to get next from the file
1120               *         - if not found in the file
1121               *             - try to get the next from the queue
1122               *   select the smallest of changes
1123               *   check if it is in the memory tree
1124               *     yes : lock memory tree.
1125               *           check all changes from the list, remove the ones that
1126               *           are already sent
1127               *           unlock memory tree
1128               *           restart as usual
1129               *   load this change on the delayList
1130               *
1131               */
1132              ReplicationIteratorComparator comparator =
1133                new ReplicationIteratorComparator();
1134              SortedSet<ReplicationIterator> iteratorSortedSet =
1135                new TreeSet<ReplicationIterator>(comparator);
1136              /* fill the lateQueue */
1137              for (short serverId : replicationServerDomain.getServers())
1138              {
1139                ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
1140                ReplicationIterator iterator =
1141                  replicationServerDomain.getChangelogIterator(serverId, lastCsn);
1142                if (iterator != null)
1143                {
1144                  if (iterator.getChange() != null)
1145                  {
1146                    iteratorSortedSet.add(iterator);
1147                  }
1148                  else
1149                  {
1150                    iterator.releaseCursor();
1151                  }
1152                }
1153              }
1154    
1155              // The loop below relies on the fact that it is sorted based
1156              // on the currentChange of each iterator to consider the next
1157              // change accross all servers.
1158              // Hence it is necessary to remove and eventual add again an iterator
1159              // when looping in order to keep consistent the order of the
1160              // iterators (see ReplicationIteratorComparator.
1161              while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
1162              {
1163                ReplicationIterator iterator = iteratorSortedSet.first();
1164                iteratorSortedSet.remove(iterator);
1165                lateQueue.add(iterator.getChange());
1166                if (iterator.next())
1167                  iteratorSortedSet.add(iterator);
1168                else
1169                  iterator.releaseCursor();
1170              }
1171              for (ReplicationIterator iterator : iteratorSortedSet)
1172              {
1173                iterator.releaseCursor();
1174              }
1175              /*
1176               * Check if the first change in the lateQueue is also on the regular
1177               * queue
1178               */
1179              if (lateQueue.isEmpty())
1180              {
1181                synchronized (msgQueue)
1182                {
1183                  if (msgQueue.size() < maxQueueSize)
1184                  {
1185                    setFollowing(true);
1186                  }
1187                }
1188              }
1189              else
1190              {
1191                msg = lateQueue.first();
1192                synchronized (msgQueue)
1193                {
1194                  if (msgQueue.contains(msg))
1195                  {
1196                    /* we finally catched up with the regular queue */
1197                    setFollowing(true);
1198                    lateQueue.clear();
1199                    UpdateMessage msg1;
1200                    do
1201                    {
1202                      msg1 = msgQueue.removeFirst();
1203                    } while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
1204                    this.updateServerState(msg);
1205                    return msg;
1206                  }
1207                }
1208              }
1209            }
1210            else
1211            {
1212              /* get the next change from the lateQueue */
1213              msg = lateQueue.removeFirst();
1214              this.updateServerState(msg);
1215              return msg;
1216            }
1217          }
1218          synchronized (msgQueue)
1219          {
1220            if (following == true)
1221            {
1222              try
1223              {
1224                while (msgQueue.isEmpty())
1225                {
1226                  msgQueue.wait(500);
1227                  if (!active)
1228                    return null;
1229                }
1230              } catch (InterruptedException e)
1231              {
1232                return null;
1233              }
1234              msg = msgQueue.removeFirst();
1235              if (this.updateServerState(msg))
1236              {
1237                /*
1238                 * Only push the message if it has not yet been seen
1239                 * by the other server.
1240                 * Otherwise just loop to select the next message.
1241                 */
1242                return msg;
1243              }
1244            }
1245          }
1246          /*
1247           * Need to loop because following flag may have gone to false between
1248           * the first check at the beginning of this method
1249           * and the second check just above.
1250           */
1251        }
1252        return null;
1253      }
1254    
1255      /**
1256       * Update the serverState with the last message sent.
1257       *
1258       * @param msg the last update sent.
1259       * @return boolean indicating if the update was meaningfull.
1260       */
1261      public boolean  updateServerState(UpdateMessage msg)
1262      {
1263        return serverState.update(msg.getChangeNumber());
1264      }
1265    
1266      /**
1267       * Get the state of this server.
1268       *
1269       * @return ServerState the state for this server..
1270       */
1271      public ServerState getServerState()
1272      {
1273        return serverState;
1274      }
1275    
1276      /**
1277       * Stop this server handler processing.
1278       */
1279      public void stopHandler()
1280      {
1281        active = false;
1282    
1283        // Stop the remote LSHandler
1284        for (LightweightServerHandler lsh : connectedServers.values())
1285        {
1286          lsh.stopHandler();
1287        }
1288        connectedServers.clear();
1289    
1290        try
1291        {
1292          session.close();
1293        } catch (IOException e)
1294        {
1295          // ignore.
1296        }
1297    
1298        synchronized (msgQueue)
1299        {
1300          /* wake up the writer thread on an empty queue so that it disappear */
1301          msgQueue.clear();
1302          msgQueue.notify();
1303          msgQueue.notifyAll();
1304        }
1305    
1306        // Stop the heartbeat thread.
1307        if (heartbeatThread != null)
1308        {
1309          heartbeatThread.shutdown();
1310        }
1311    
1312        DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
1313      }
1314    
1315      /**
1316       * Send the ack to the server that did the original modification.
1317       *
1318       * @param changeNumber The ChangeNumber of the update that is acked.
1319       * @throws IOException In case of Exception thrown sending the ack.
1320       */
1321      public void sendAck(ChangeNumber changeNumber) throws IOException
1322      {
1323        AckMessage ack = new AckMessage(changeNumber);
1324        session.publish(ack);
1325        outAckCount++;
1326      }
1327    
1328      /**
1329       * Do the work when an ack message has been received from another server.
1330       *
1331       * @param message The ack message that was received.
1332       * @param ackingServerId The  id of the server that acked the change.
1333       */
1334      public void ack(AckMessage message, short ackingServerId)
1335      {
1336        ChangeNumber changeNumber = message.getChangeNumber();
1337        AckMessageList ackList;
1338        boolean completedFlag;
1339        synchronized (waitingAcks)
1340        {
1341          ackList = waitingAcks.get(changeNumber);
1342          if (ackList == null)
1343            return;
1344          ackList.addAck(ackingServerId);
1345          completedFlag = ackList.completed();
1346          if (completedFlag)
1347          {
1348            waitingAcks.remove(changeNumber);
1349          }
1350        }
1351        if (completedFlag)
1352        {
1353          replicationServerDomain.sendAck(changeNumber, true);
1354        }
1355      }
1356    
1357      /**
1358       * Process reception of an for an update that was received from a
1359       * ReplicationServer.
1360       *
1361       * @param message the ack message that was received.
1362       * @param ackingServerId The  id of the server that acked the change.
1363       */
1364      public static void ackChangelog(AckMessage message, short ackingServerId)
1365      {
1366        ChangeNumber changeNumber = message.getChangeNumber();
1367        ReplServerAckMessageList ackList;
1368        boolean completedFlag;
1369        synchronized (changelogsWaitingAcks)
1370        {
1371          ackList = changelogsWaitingAcks.get(changeNumber);
1372          if (ackList == null)
1373            return;
1374          ackList.addAck(ackingServerId);
1375          completedFlag = ackList.completed();
1376          if (completedFlag)
1377          {
1378            changelogsWaitingAcks.remove(changeNumber);
1379          }
1380        }
1381        if (completedFlag)
1382        {
1383          ReplicationServerDomain replicationServerDomain =
1384                  ackList.getChangelogCache();
1385          replicationServerDomain.sendAck(changeNumber, false,
1386                                 ackList.getReplicationServerId());
1387        }
1388      }
1389    
1390      /**
1391       * Add an update to the list of update waiting for acks.
1392       *
1393       * @param update the update that must be added to the list
1394       * @param nbWaitedAck  The number of ack that must be received before
1395       *               the update is fully acked.
1396       */
1397      public void addWaitingAck(UpdateMessage update, int nbWaitedAck)
1398      {
1399        AckMessageList ackList = new AckMessageList(update.getChangeNumber(),
1400                                                    nbWaitedAck);
1401        synchronized(waitingAcks)
1402        {
1403          waitingAcks.put(update.getChangeNumber(), ackList);
1404        }
1405      }
1406    
1407      /**
1408       * Add an update to the list of update received from a replicationServer and
1409       * waiting for acks.
1410       *
1411       * @param update The update that must be added to the list.
1412       * @param ChangelogServerId The identifier of the replicationServer that sent
1413       *                          the update.
1414       * @param replicationServerDomain The ReplicationServerDomain from which the
1415       *                                change was processed and to which the ack
1416       *                                must later be sent.
1417       * @param nbWaitedAck The number of ack that must be received before
1418       *                    the update is fully acked.
1419       */
1420      public static void addWaitingAck(
1421          UpdateMessage update,
1422          short ChangelogServerId, ReplicationServerDomain replicationServerDomain,
1423          int nbWaitedAck)
1424      {
1425        ReplServerAckMessageList ackList =
1426              new ReplServerAckMessageList(update.getChangeNumber(),
1427                                          nbWaitedAck,
1428                                          ChangelogServerId,
1429                                          replicationServerDomain);
1430        synchronized(changelogsWaitingAcks)
1431        {
1432          changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
1433        }
1434      }
1435    
1436      /**
1437       * Get the size of the list of update waiting for acks.
1438       *
1439       * @return the size of the list of update waiting for acks.
1440       */
1441      public int getWaitingAckSize()
1442      {
1443        synchronized (waitingAcks)
1444        {
1445          return waitingAcks.size();
1446        }
1447      }
1448    
1449      /**
1450       * Increment the count of Acks received from this server.
1451       */
1452      public void incrementInAckCount()
1453      {
1454        inAckCount++;
1455      }
1456    
1457      /**
1458       * Check type of server handled.
1459       *
1460       * @return true if the handled server is an LDAP server.
1461       *         false if the handled server is a replicationServer
1462       */
1463      public boolean isLDAPserver()
1464      {
1465        return serverIsLDAPserver;
1466      }
1467    
1468      /**
1469       * {@inheritDoc}
1470       */
1471      @Override
1472      public void initializeMonitorProvider(MonitorProviderCfg configuration)
1473                              throws ConfigException,InitializationException
1474      {
1475        // Nothing to do for now
1476      }
1477    
1478      /**
1479       * Retrieves the name of this monitor provider.  It should be unique among all
1480       * monitor providers, including all instances of the same monitor provider.
1481       *
1482       * @return  The name of this monitor provider.
1483       */
1484      @Override
1485      public String getMonitorInstanceName()
1486      {
1487        String str = baseDn.toString() +
1488                     " " + serverURL + " " + String.valueOf(serverId);
1489    
1490        if (serverIsLDAPserver)
1491          return "Direct LDAP Server " + str;
1492        else
1493          return "Remote Repl Server " + str;
1494      }
1495    
1496      /**
1497       * Retrieves the length of time in milliseconds that should elapse between
1498       * calls to the <CODE>updateMonitorData()</CODE> method.  A negative or zero
1499       * return value indicates that the <CODE>updateMonitorData()</CODE> method
1500       * should not be periodically invoked.
1501       *
1502       * @return  The length of time in milliseconds that should elapse between
1503       *          calls to the <CODE>updateMonitorData()</CODE> method.
1504       */
1505      @Override
1506      public long getUpdateInterval()
1507      {
1508        /* we don't wont to do polling on this monitor */
1509        return 0;
1510      }
1511    
1512      /**
1513       * Performs any processing periodic processing that may be desired to update
1514       * the information associated with this monitor.  Note that best-effort
1515       * attempts will be made to ensure that calls to this method come
1516       * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will
1517       * be made.
1518       */
1519      @Override
1520      public void updateMonitorData()
1521      {
1522        // As long as getUpdateInterval() returns 0, this will never get called
1523    
1524      }
1525    
1526      /**
1527       * Retrieves a set of attributes containing monitor data that should be
1528       * returned to the client if the corresponding monitor entry is requested.
1529       *
1530       * @return  A set of attributes containing monitor data that should be
1531       *          returned to the client if the corresponding monitor entry is
1532       *          requested.
1533       */
1534      @Override
1535      public ArrayList<Attribute> getMonitorData()
1536      {
1537        ArrayList<Attribute> attributes = new ArrayList<Attribute>();
1538        if (serverIsLDAPserver)
1539        {
1540          attributes.add(new Attribute("LDAP-Server", serverURL));
1541          attributes.add(new Attribute("connected-to", this.replicationServerDomain.
1542              getReplicationServer().getMonitorInstanceName()));
1543    
1544        }
1545        else
1546        {
1547          attributes.add(new Attribute("ReplicationServer-Server", serverURL));
1548        }
1549        attributes.add(new Attribute("server-id",
1550                                     String.valueOf(serverId)));
1551        attributes.add(new Attribute("base-dn",
1552                                     baseDn.toString()));
1553    
1554        if (serverIsLDAPserver)
1555        {
1556          MonitorData md;
1557          try
1558          {
1559            md = replicationServerDomain.getMonitorData();
1560    
1561            // Oldest missing update
1562            Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
1563            if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0))
1564            {
1565              Date date = new Date(approxFirstMissingDate);
1566              attributes.add(new Attribute("approx-older-change-not-synchronized",
1567                  date.toString()));
1568              attributes.add(
1569                  new Attribute("approx-older-change-not-synchronized-millis",
1570                      String.valueOf(approxFirstMissingDate)));
1571            }
1572    
1573            // Missing changes
1574            long missingChanges = md.getMissingChanges(serverId);
1575            attributes.add(new Attribute("missing-changes",
1576                String.valueOf(missingChanges)));
1577    
1578            // Replication delay
1579            long delay = md.getApproxDelay(serverId);
1580            attributes.add(new Attribute("approximate-delay",
1581                String.valueOf(delay)));
1582          }
1583          catch(Exception e)
1584          {
1585            // TODO: improve the log
1586            // We failed retrieving the remote monitor data.
1587            attributes.add(new Attribute("error",
1588                stackTraceToSingleLineString(e)));
1589          }
1590        }
1591    
1592        // Deprecated
1593        attributes.add(new Attribute("max-waiting-changes",
1594                                      String.valueOf(maxQueueSize)));
1595        attributes.add(new Attribute("update-sent",
1596                                     String.valueOf(getOutCount())));
1597        attributes.add(new Attribute("update-received",
1598                                     String.valueOf(getInCount())));
1599    
1600        // Deprecated as long as assured is not exposed
1601        attributes.add(new Attribute("update-waiting-acks",
1602            String.valueOf(getWaitingAckSize())));
1603        attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount())));
1604        attributes.add(new Attribute("ack-received",
1605                                     String.valueOf(getInAckCount())));
1606    
1607        // Window stats
1608        attributes.add(new Attribute("max-send-window",
1609                                     String.valueOf(sendWindowSize)));
1610        attributes.add(new Attribute("current-send-window",
1611                                    String.valueOf(sendWindow.availablePermits())));
1612        attributes.add(new Attribute("max-rcv-window",
1613                                     String.valueOf(maxRcvWindow)));
1614        attributes.add(new Attribute("current-rcv-window",
1615                                     String.valueOf(rcvWindow)));
1616    
1617        /*
1618         * FIXME:PGB DEPRECATED
1619         *
1620        // Missing changes
1621        attributes.add(new Attribute("waiting-changes",
1622            String.valueOf(getRcvMsgQueueSize())));
1623        // Age of oldest missing change
1624    
1625        // Date of the oldest missing change
1626        long olderUpdateTime = getOlderUpdateTime();
1627        if (olderUpdateTime != 0)
1628        {
1629          Date date = new Date(getOlderUpdateTime());
1630          attributes.add(new Attribute("older-change-not-synchronized",
1631                                     String.valueOf(date.toString())));
1632        }
1633        */
1634    
1635        /* get the Server State */
1636        final String ATTR_SERVER_STATE = "server-state";
1637        AttributeType type =
1638          DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
1639        LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
1640        for (String str : serverState.toStringSet())
1641        {
1642          values.add(new AttributeValue(type,str));
1643        }
1644        Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
1645        attributes.add(attr);
1646    
1647        // Encryption
1648        attributes.add(new Attribute("ssl-encryption",
1649            String.valueOf(session.isEncrypted())));
1650    
1651        // Data generation
1652        attributes.add(new Attribute("generation-id",
1653            String.valueOf(generationId)));
1654    
1655        return attributes;
1656      }
1657    
1658      /**
1659       * Shutdown This ServerHandler.
1660       */
1661      public void shutdown()
1662      {
1663        shutdown  = true;
1664        try
1665        {
1666          session.close();
1667        } catch (IOException e)
1668        {
1669          // Service is closing.
1670        }
1671    
1672        stopHandler();
1673    
1674        try
1675        {
1676          if (writer != null) {
1677            writer.join(SHUTDOWN_JOIN_TIMEOUT);
1678          }
1679          if (reader != null) {
1680            reader.join(SHUTDOWN_JOIN_TIMEOUT);
1681          }
1682        } catch (InterruptedException e)
1683        {
1684          // don't try anymore to join and return.
1685        }
1686      }
1687    
1688      /**
1689       * {@inheritDoc}
1690       */
1691      @Override
1692      public String toString()
1693      {
1694        String localString;
1695        if (serverId != 0)
1696        {
1697          if (serverIsLDAPserver)
1698            localString = "Directory Server ";
1699          else
1700            localString = "Replication Server ";
1701    
1702    
1703          localString += serverId + " " + serverURL + " " + baseDn;
1704        }
1705        else
1706          localString = "Unknown server";
1707    
1708        return localString;
1709      }
1710    
1711      /**
1712       * Decrement the protocol window, then check if it is necessary
1713       * to send a WindowMessage and send it.
1714       *
1715       * @throws IOException when the session becomes unavailable.
1716       */
1717      public synchronized void decAndCheckWindow() throws IOException
1718      {
1719        rcvWindow--;
1720        checkWindow();
1721      }
1722    
1723      /**
1724       * Check the protocol window and send WindowMessage if necessary.
1725       *
1726       * @throws IOException when the session becomes unavailable.
1727       */
1728      public synchronized void checkWindow() throws IOException
1729      {
1730        if (rcvWindow < rcvWindowSizeHalf)
1731        {
1732          if (flowControl)
1733          {
1734            if (replicationServerDomain.restartAfterSaturation(this))
1735            {
1736              flowControl = false;
1737            }
1738          }
1739          if (!flowControl)
1740          {
1741            WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
1742            session.publish(msg);
1743            outAckCount++;
1744            rcvWindow += rcvWindowSizeHalf;
1745          }
1746        }
1747      }
1748    
1749      /**
1750       * Update the send window size based on the credit specified in the
1751       * given window message.
1752       *
1753       * @param windowMsg The Window Message containing the information
1754       *                  necessary for updating the window size.
1755       */
1756      public void updateWindow(WindowMessage windowMsg)
1757      {
1758        sendWindow.release(windowMsg.getNumAck());
1759      }
1760    
1761      /**
1762       * Get our heartbeat interval.
1763       * @return Our heartbeat interval.
1764       */
1765      public long getHeartbeatInterval()
1766      {
1767        return heartbeatInterval;
1768      }
1769    
1770      /**
1771       * Processes a routable message.
1772       *
1773       * @param msg The message to be processed.
1774       */
1775      public void process(RoutableMessage msg)
1776      {
1777        if (debugEnabled())
1778           TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
1779                     getMonitorInstanceName() +
1780                     " SH for remote server " + this.getMonitorInstanceName() +
1781                     " processes received msg=" + msg);
1782        replicationServerDomain.process(msg, this);
1783      }
1784    
1785      /**
1786       * Sends the provided ReplServerInfoMessage.
1787       *
1788       * @param info The ReplServerInfoMessage message to be sent.
1789       * @throws IOException When it occurs while sending the message,
1790       *
1791       */
1792       public void sendInfo(ReplServerInfoMessage info)
1793       throws IOException
1794       {
1795         if (debugEnabled())
1796           TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
1797               getMonitorInstanceName() +
1798               " SH for remote server " + this.getMonitorInstanceName() +
1799               " sends message=" + info);
1800    
1801         session.publish(info);
1802       }
1803    
1804       /**
1805        *
1806        * Sets the replication server from the message provided.
1807        *
1808        * @param infoMsg The information message.
1809        */
1810       public void receiveReplServerInfo(ReplServerInfoMessage infoMsg)
1811       {
1812         if (debugEnabled())
1813           TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
1814               getMonitorInstanceName() +
1815               " SH for remote server " + this.getMonitorInstanceName() +
1816               " sets replServerInfo " + "<" + infoMsg + ">");
1817    
1818         List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
1819         generationId = infoMsg.getGenerationId();
1820    
1821         synchronized(connectedServers)
1822         {
1823           // Removes the existing structures
1824           for (LightweightServerHandler lsh : connectedServers.values())
1825           {
1826             lsh.stopHandler();
1827           }
1828           connectedServers.clear();
1829    
1830           // Creates the new structure according to the message received.
1831           for (String newConnectedServer : newRemoteLDAPservers)
1832           {
1833             LightweightServerHandler lsh
1834             = new LightweightServerHandler(newConnectedServer, this);
1835             lsh.startHandler();
1836             connectedServers.put(lsh.getServerId(), lsh);
1837           }
1838         }
1839       }
1840    
1841       /**
1842        * When this handler is connected to a replication server, specifies if
1843        * a wanted server is connected to this replication server.
1844        *
1845        * @param wantedServer The server we want to know if it is connected
1846        * to the replication server represented by this handler.
1847        * @return boolean True is the wanted server is connected to the server
1848        * represented by this handler.
1849        */
1850       public boolean isRemoteLDAPServer(short wantedServer)
1851       {
1852         synchronized(connectedServers)
1853         {
1854           for (LightweightServerHandler server : connectedServers.values())
1855           {
1856             if (wantedServer == server.getServerId())
1857             {
1858               return true;
1859             }
1860           }
1861           return false;
1862         }
1863       }
1864    
1865       /**
1866        * When the handler is connected to a replication server, specifies the
1867        * replication server has remote LDAP servers connected to it.
1868        *
1869        * @return boolean True is the replication server has remote LDAP servers
1870        * connected to it.
1871        */
1872       public boolean hasRemoteLDAPServers()
1873       {
1874         return !connectedServers.isEmpty();
1875       }
1876    
1877      /**
1878       * Send an InitializeRequestMessage to the server connected through this
1879       * handler.
1880       *
1881       * @param msg The message to be processed
1882       * @throws IOException when raised by the underlying session
1883       */
1884      public void send(RoutableMessage msg) throws IOException
1885      {
1886        if (debugEnabled())
1887              TRACER.debugInfo("In " +
1888                  replicationServerDomain.getReplicationServer().
1889                  getMonitorInstanceName() +
1890                  " SH for remote server " + this.getMonitorInstanceName() +
1891                  " sends message=" + msg);
1892        session.publish(msg);
1893      }
1894    
1895      /**
1896       * Send an ErrorMessage to the peer.
1897       *
1898       * @param errorMsg The message to be sent
1899       * @throws IOException when raised by the underlying session
1900       */
1901      public void sendError(ErrorMessage errorMsg) throws IOException
1902      {
1903        session.publish(errorMsg);
1904      }
1905    
1906      /**
1907       * Process the reception of a WindowProbe message.
1908       *
1909       * @param  windowProbeMsg The message to process.
1910       *
1911       * @throws IOException    When the session becomes unavailable.
1912       */
1913      public void process(WindowProbe windowProbeMsg) throws IOException
1914      {
1915        if (rcvWindow > 0)
1916        {
1917          // The LDAP server believes that its window is closed
1918          // while it is not, this means that some problem happened in the
1919          // window exchange procedure !
1920          // lets update the LDAP server with out current window size and hope
1921          // that everything will work better in the futur.
1922          // TODO also log an error message.
1923          WindowMessage msg = new WindowMessage(rcvWindow);
1924          session.publish(msg);
1925          outAckCount++;
1926        }
1927        else
1928        {
1929          // Both the LDAP server and the replication server believes that the
1930          // window is closed. Lets check the flowcontrol in case we
1931          // can now resume operations and send a windowMessage if necessary.
1932          checkWindow();
1933        }
1934      }
1935    
1936      /**
1937       * Returns the value of generationId for that handler.
1938       * @return The value of the generationId.
1939       */
1940      public long getGenerationId()
1941      {
1942        return generationId;
1943      }
1944    
1945      /**
1946       * Resets the generationId for this domain.
1947       */
1948      public void warnBadGenerationId()
1949      {
1950        // Notify the peer that it is now invalid regarding the generationId
1951        // We are now waiting a startServer message from this server with
1952        // a valid generationId.
1953        try
1954        {
1955          Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString());
1956          ErrorMessage errorMsg =
1957            new ErrorMessage(serverId, replicationServerId, message);
1958          session.publish(errorMsg);
1959        }
1960        catch (Exception e)
1961        {
1962          // FIXME Log exception when sending reset error message
1963        }
1964      }
1965    
1966      /**
1967       * Sends a message containing a generationId to a peer server.
1968       * The peer is expected to be a replication server.
1969       *
1970       * @param  msg         The GenerationIdMessage message to be sent.
1971       * @throws IOException When it occurs while sending the message,
1972       *
1973       */
1974      public void forwardGenerationIdToRS(ResetGenerationId msg)
1975      throws IOException
1976      {
1977        session.publish(msg);
1978      }
1979    
1980      /**
1981       * Set a new generation ID.
1982       *
1983       * @param generationId The new generation ID
1984       *
1985       */
1986      public void setGenerationId(long generationId)
1987      {
1988        this.generationId = generationId;
1989      }
1990    
1991      /**
1992       * Returns the Replication Server Domain to which belongs this server handler.
1993       *
1994       * @return The replication server domain.
1995       */
1996      public ReplicationServerDomain getDomain()
1997      {
1998        return this.replicationServerDomain;
1999      }
2000    
2001      /**
2002       * Return a Set containing the servers known by this replicationServer.
2003       * @return a set containing the servers known by this replicationServer.
2004       */
2005      public Set<Short> getConnectedServerIds()
2006      {
2007        return connectedServers.keySet();
2008      }
2009    }