001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.server;
028    import static org.opends.messages.ReplicationMessages.*;
029    import static org.opends.server.loggers.ErrorLogger.logError;
030    import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
031    import static org.opends.server.loggers.debug.DebugLogger.getTracer;
032    import static org.opends.server.util.ServerConstants.EOL;
033    import static org.opends.server.util.StaticUtils.getFileForPath;
034    
035    import java.io.File;
036    import java.io.IOException;
037    import java.io.StringReader;
038    import java.net.InetAddress;
039    import java.net.InetSocketAddress;
040    import java.net.ServerSocket;
041    import java.net.Socket;
042    import java.net.UnknownHostException;
043    import java.util.ArrayList;
044    import java.util.Collection;
045    import java.util.concurrent.ConcurrentHashMap;
046    import java.util.Iterator;
047    import java.util.LinkedHashSet;
048    import java.util.List;
049    import java.util.Set;
050    
051    import org.opends.messages.Message;
052    import org.opends.messages.MessageBuilder;
053    import org.opends.server.admin.server.ConfigurationChangeListener;
054    import org.opends.server.admin.std.server.MonitorProviderCfg;
055    import org.opends.server.admin.std.server.ReplicationServerCfg;
056    import org.opends.server.api.Backend;
057    import org.opends.server.api.BackupTaskListener;
058    import org.opends.server.api.ExportTaskListener;
059    import org.opends.server.api.ImportTaskListener;
060    import org.opends.server.api.MonitorProvider;
061    import org.opends.server.api.RestoreTaskListener;
062    import org.opends.server.config.ConfigException;
063    import org.opends.server.core.DirectoryServer;
064    import org.opends.server.loggers.LogLevel;
065    import org.opends.server.loggers.debug.DebugTracer;
066    import org.opends.server.replication.protocol.ProtocolSession;
067    import org.opends.server.replication.protocol.ReplSessionSecurity;
068    import org.opends.server.types.Attribute;
069    import org.opends.server.types.AttributeType;
070    import org.opends.server.types.AttributeValue;
071    import org.opends.server.types.BackupConfig;
072    import org.opends.server.types.ConfigChangeResult;
073    import org.opends.server.types.DN;
074    import org.opends.server.types.Entry;
075    import org.opends.server.types.LDIFExportConfig;
076    import org.opends.server.types.LDIFImportConfig;
077    import org.opends.server.types.RestoreConfig;
078    import org.opends.server.types.ResultCode;
079    import org.opends.server.util.LDIFReader;
080    
081    import com.sleepycat.je.DatabaseException;
082    
083    /**
084     * ReplicationServer Listener.
085     *
086     * This singleton is the main object of the replication server
087     * It waits for the incoming connections and create listener
088     * and publisher objects for
089     * connection with LDAP servers and with replication servers
090     *
091     * It is responsible for creating the replication server replicationServerDomain
092     * and managing it
093     */
094    public class ReplicationServer extends MonitorProvider<MonitorProviderCfg>
095      implements Runnable, ConfigurationChangeListener<ReplicationServerCfg>,
096                 BackupTaskListener, RestoreTaskListener, ImportTaskListener,
097                 ExportTaskListener
098    {
099      private short serverId;
100      private String serverURL;
101    
102      private ServerSocket listenSocket;
103      private Thread listenThread;
104      private Thread connectThread;
105    
106      /* The list of replication servers configured by the administrator */
107      private Collection<String> replicationServers;
108    
109      /* This table is used to store the list of dn for which we are currently
110       * handling servers.
111       */
112      private ConcurrentHashMap<DN, ReplicationServerDomain> baseDNs =
113              new ConcurrentHashMap<DN, ReplicationServerDomain>();
114    
115      private String localURL = "null";
116      private boolean shutdown = false;
117      private short replicationServerId;
118      private ReplicationDbEnv dbEnv;
119      private int rcvWindow;
120      private int queueSize;
121      private String dbDirname = null;
122    
123      // The delay (in sec) after which the  changes must
124      // be deleted from the persistent storage.
125      private long purgeDelay;
126    
127      private int replicationPort;
128      private boolean stopListen = false;
129      private ReplSessionSecurity replSessionSecurity;
130    
131      // For the backend associated to this replication server,
132      // DN of the config entry of the backend
133      private DN backendConfigEntryDN;
134      // ID of the backend
135      private static final String backendId = "replicationChanges";
136    
137      // At startup, the listen thread wait on this flag for the connet
138      // thread to look for other servers in the topology.
139      private boolean connectedInTopology = false;
140      private final Object connectedInTopologyLock = new Object();
141    
142      /**
143       * The tracer object for the debug logger.
144       */
145      private static final DebugTracer TRACER = getTracer();
146    
147      /**
148       * Creates a new Replication server using the provided configuration entry.
149       *
150       * @param configuration The configuration of this replication server.
151       * @throws ConfigException When Configuration is invalid.
152       */
153      public ReplicationServer(ReplicationServerCfg configuration)
154        throws ConfigException
155      {
156        super("Replication Server" + configuration.getReplicationPort());
157    
158        replicationPort = configuration.getReplicationPort();
159        replicationServerId = (short) configuration.getReplicationServerId();
160        replicationServers = configuration.getReplicationServer();
161        if (replicationServers == null)
162          replicationServers = new ArrayList<String>();
163        queueSize = configuration.getQueueSize();
164        purgeDelay = configuration.getReplicationPurgeDelay();
165        dbDirname = configuration.getReplicationDBDirectory();
166        rcvWindow = configuration.getWindowSize();
167        if (dbDirname == null)
168        {
169          dbDirname = "changelogDb";
170        }
171        // Check that this path exists or create it.
172        File f = getFileForPath(dbDirname);
173        try
174        {
175          if (!f.exists())
176          {
177            f.mkdir();
178          }
179        }
180        catch (Exception e)
181        {
182    
183          MessageBuilder mb = new MessageBuilder();
184          mb.append(e.getLocalizedMessage());
185          mb.append(" ");
186          mb.append(String.valueOf(getFileForPath(dbDirname)));
187          Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
188          throw new ConfigException(msg, e);
189        }
190    
191        replSessionSecurity = new ReplSessionSecurity(configuration);
192        initialize(replicationServerId, replicationPort);
193        configuration.addChangeListener(this);
194        DirectoryServer.registerMonitorProvider(this);
195    
196        try
197        {
198          backendConfigEntryDN = DN.decode(
199          "ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
200        } catch (Exception e) {}
201    
202        // Creates the backend associated to this ReplicationServer
203        // if it does not exist.
204        createBackend();
205    
206        DirectoryServer.registerBackupTaskListener(this);
207        DirectoryServer.registerRestoreTaskListener(this);
208        DirectoryServer.registerExportTaskListener(this);
209        DirectoryServer.registerImportTaskListener(this);
210      }
211    
212    
213      /**
214       * The run method for the Listen thread.
215       * This thread accept incoming connections on the replication server
216       * ports from other replication servers or from LDAP servers
217       * and spawn further thread responsible for handling those connections
218       */
219    
220      void runListen()
221      {
222        Socket newSocket;
223    
224        // wait for the connect thread to find other replication
225        // servers in the topology before starting to accept connections
226        // from the ldap servers.
227        synchronized (connectedInTopologyLock)
228        {
229          if (connectedInTopology == false)
230          {
231            try
232            {
233              connectedInTopologyLock.wait(1000);
234            } catch (InterruptedException e)
235            {
236            }
237          }
238        }
239    
240        while ((shutdown == false) && (stopListen  == false))
241        {
242          // Wait on the replicationServer port.
243          // Read incoming messages and create LDAP or ReplicationServer listener
244          // and Publisher.
245    
246          try
247          {
248            newSocket =  listenSocket.accept();
249            newSocket.setReceiveBufferSize(1000000);
250            newSocket.setTcpNoDelay(true);
251            newSocket.setKeepAlive(true);
252            ProtocolSession session =
253                 replSessionSecurity.createServerSession(newSocket);
254            if (session == null) // Error, go back to accept
255              continue;
256            ServerHandler handler = new ServerHandler(session, queueSize);
257            handler.start(null, serverId, serverURL, rcvWindow,
258                          false, this);
259          }
260          catch (Exception e)
261          {
262            // The socket has probably been closed as part of the
263            // shutdown or changing the port number process.
264            // just log debug information and loop.
265            Message message = ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
266            logError(message);
267          }
268        }
269      }
270    
271      /**
272       * This method manages the connection with the other replication servers.
273       * It periodically checks that this replication server is indeed connected
274       * to all the other replication servers and if not attempts to
275       * make the connection.
276       */
277      void runConnect()
278      {
279        while (shutdown == false)
280        {
281          /*
282           * periodically check that we are connected to all other
283           * replication servers and if not establish the connection
284           */
285          for (ReplicationServerDomain replicationServerDomain: baseDNs.values())
286          {
287            Set<String> connectedReplServers =
288                    replicationServerDomain.getChangelogs();
289            /*
290             * check that all replication server in the config are in the connected
291             * Set. If not create the connection
292             */
293            for (String serverURL : replicationServers)
294            {
295              int separator = serverURL.lastIndexOf(':');
296              String port = serverURL.substring(separator + 1);
297              String hostname = serverURL.substring(0, separator);
298    
299              try
300              {
301                InetAddress inetAddress = InetAddress.getByName(hostname);
302                String serverAddress = inetAddress.getHostAddress() + ":" + port;
303    
304                if ((serverAddress.compareTo("127.0.0.1:" + replicationPort) != 0)
305                    && (serverAddress.compareTo(this.localURL) != 0)
306                    && (!connectedReplServers.contains(serverAddress)))
307                {
308                  this.connect(serverURL, replicationServerDomain.getBaseDn());
309                }
310              }
311              catch (IOException e)
312              {
313                Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(hostname);
314                logError(message);
315              }
316            }
317          }
318          synchronized (connectedInTopologyLock)
319          {
320            // wake up the listen thread if necessary.
321            if (connectedInTopology == false)
322            {
323              connectedInTopologyLock.notify();
324              connectedInTopology = true;
325            }
326          }
327          try
328          {
329            synchronized (this)
330            {
331              /* check if we are connected every second */
332              int randomizer = (int) Math.random()*100;
333              wait(1000 + randomizer);
334            }
335          } catch (InterruptedException e)
336          {
337            // ignore error, will try to connect again or shutdown
338          }
339        }
340      }
341    
342      /**
343       * Establish a connection to the server with the address and port.
344       *
345       * @param serverURL  The address and port for the server, separated by a
346       *                    colon.
347       * @param baseDn     The baseDn of the connection
348       */
349      private void connect(String serverURL, DN baseDn)
350      {
351        int separator = serverURL.lastIndexOf(':');
352        String port = serverURL.substring(separator + 1);
353        String hostname = serverURL.substring(0, separator);
354        boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL);
355    
356        if (debugEnabled())
357          TRACER.debugInfo("RS " + this.getMonitorInstanceName() +
358                   " connects to " + serverURL);
359    
360        try
361        {
362          InetSocketAddress ServerAddr = new InetSocketAddress(
363                         InetAddress.getByName(hostname), Integer.parseInt(port));
364          Socket socket = new Socket();
365          socket.setReceiveBufferSize(1000000);
366          socket.setTcpNoDelay(true);
367          socket.connect(ServerAddr, 500);
368    
369          ServerHandler handler = new ServerHandler(
370               replSessionSecurity.createClientSession(serverURL, socket),
371               queueSize);
372          handler.start(baseDn, serverId, this.serverURL, rcvWindow,
373                        sslEncryption, this);
374        }
375        catch (Exception e)
376        {
377          // ignore
378        }
379    
380      }
381    
382      /**
383       * initialization function for the replicationServer.
384       *
385       * @param  changelogId       The unique identifier for this replicationServer.
386       * @param  changelogPort     The port on which the replicationServer should
387       *                           listen.
388       *
389       */
390      private void initialize(short changelogId, int changelogPort)
391      {
392        shutdown = false;
393    
394        try
395        {
396          /*
397           * Initialize the replicationServer database.
398           */
399          dbEnv = new ReplicationDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
400              this);
401    
402          /*
403           * create replicationServer replicationServerDomain
404           */
405          serverId = changelogId;
406    
407          /*
408           * Open replicationServer socket
409           */
410          String localhostname = InetAddress.getLocalHost().getHostName();
411          String localAdddress = InetAddress.getLocalHost().getHostAddress();
412          serverURL = localhostname + ":" + String.valueOf(changelogPort);
413          localURL = localAdddress + ":" + String.valueOf(changelogPort);
414          listenSocket = new ServerSocket();
415          listenSocket.setReceiveBufferSize(1000000);
416          listenSocket.bind(new InetSocketAddress(changelogPort));
417    
418          /*
419           * creates working threads
420           * We must first connect, then start to listen.
421           */
422          if (debugEnabled())
423            TRACER.debugInfo("RS " +getMonitorInstanceName()+
424                " creates connect threads");
425          connectThread =
426            new ReplicationServerConnectThread("Replication Server Connect", this);
427          connectThread.start();
428    
429          // FIXME : Is it better to have the time to receive the ReplServerInfo
430          // from all the other replication servers since this info is necessary
431          // to route an early received total update request.
432          try { Thread.sleep(300);} catch(Exception e) {}
433          if (debugEnabled())
434            TRACER.debugInfo("RS " +getMonitorInstanceName()+
435                " creates listen threads");
436    
437          listenThread =
438            new ReplicationServerListenThread("Replication Server Listener", this);
439          listenThread.start();
440    
441          if (debugEnabled())
442            TRACER.debugInfo("RS " +getMonitorInstanceName()+
443                " successfully initialized");
444    
445        } catch (DatabaseException e)
446        {
447          Message message = ERR_COULD_NOT_INITIALIZE_DB.get(
448            getFileForPath(dbDirname).getAbsolutePath());
449          logError(message);
450        } catch (ReplicationDBException e)
451        {
452          Message message = ERR_COULD_NOT_READ_DB.get(dbDirname,
453              e.getLocalizedMessage());
454          logError(message);
455        } catch (UnknownHostException e)
456        {
457          Message message = ERR_UNKNOWN_HOSTNAME.get();
458          logError(message);
459        } catch (IOException e)
460        {
461          Message message =
462              ERR_COULD_NOT_BIND_CHANGELOG.get(changelogPort, e.getMessage());
463          logError(message);
464        }
465      }
466    
467      /**
468       * Get the ReplicationServerDomain associated to the base DN given in
469       * parameter.
470       *
471       * @param baseDn The base Dn for which the ReplicationServerDomain must be
472       * returned.
473       * @param create Specifies whether to create the ReplicationServerDomain if
474       *        it does not already exist.
475       * @return The ReplicationServerDomain associated to the base DN given in
476       *         parameter.
477       */
478      public ReplicationServerDomain getReplicationServerDomain(DN baseDn,
479              boolean create)
480      {
481        ReplicationServerDomain replicationServerDomain;
482    
483        synchronized (baseDNs)
484        {
485          replicationServerDomain = baseDNs.get(baseDn);
486          if ((replicationServerDomain == null) && (create))
487          {
488            replicationServerDomain = new ReplicationServerDomain(baseDn, this);
489            baseDNs.put(baseDn, replicationServerDomain);
490          }
491        }
492    
493        return replicationServerDomain;
494      }
495    
496      /**
497       * Shutdown the Replication Server service and all its connections.
498       */
499      public void shutdown()
500      {
501        if (shutdown)
502          return;
503    
504        shutdown = true;
505    
506        // shutdown the connect thread
507        if (connectThread != null)
508        {
509          connectThread.interrupt();
510        }
511    
512        // shutdown the listener thread
513        try
514        {
515          if (listenSocket != null)
516          {
517            listenSocket.close();
518          }
519        } catch (IOException e)
520        {
521          // replication Server service is closing anyway.
522        }
523    
524        // shutdown the listen thread
525        if (listenThread != null)
526        {
527          listenThread.interrupt();
528        }
529    
530        // shutdown all the ChangelogCaches
531        for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
532        {
533          replicationServerDomain.shutdown();
534        }
535    
536        if (dbEnv != null)
537        {
538          dbEnv.shutdown();
539        }
540        DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
541    }
542    
543    
544      /**
545       * Creates a new DB handler for this ReplicationServer and the serverId and
546       * DN given in parameter.
547       *
548       * @param id The serverId for which the dbHandler must be created.
549       * @param baseDn The DN for which the dbHandler muste be created.
550       * @return The new DB handler for this ReplicationServer and the serverId and
551       *         DN given in parameter.
552       * @throws DatabaseException in case of underlying database problem.
553       */
554      public DbHandler newDbHandler(short id, DN baseDn)
555      throws DatabaseException
556      {
557        return new DbHandler(id, baseDn, this, dbEnv);
558      }
559    
560      /**
561       * Clears the generationId for the replicationServerDomain related to the
562       * provided baseDn.
563       * @param  baseDn The baseDn for which to delete the generationId.
564       * @throws DatabaseException When it occurs.
565       */
566      public void clearGenerationId(DN baseDn)
567      throws DatabaseException
568      {
569        try
570        {
571          dbEnv.clearGenerationId(baseDn);
572        }
573        catch(Exception e)
574        {
575          TRACER.debugCaught(LogLevel.ALL, e);
576        }
577      }
578    
579      /**
580       * Retrieves the time after which changes must be deleted from the
581       * persistent storage (in milliseconds).
582       *
583       * @return  The time after which changes must be deleted from the
584       *          persistent storage (in milliseconds).
585       */
586      long getTrimage()
587      {
588        return purgeDelay * 1000;
589      }
590    
591      /**
592       * Check if the provided configuration is acceptable for add.
593       *
594       * @param configuration The configuration to check.
595       * @param unacceptableReasons When the configuration is not acceptable, this
596       *                            table is use to return the reasons why this
597       *                            configuration is not acceptbale.
598       *
599       * @return true if the configuration is acceptable, false other wise.
600       */
601      public static boolean isConfigurationAcceptable(
602          ReplicationServerCfg configuration, List<Message> unacceptableReasons)
603      {
604        int port = configuration.getReplicationPort();
605    
606        try
607        {
608          ServerSocket tmpSocket = new ServerSocket();
609          tmpSocket.bind(new InetSocketAddress(port));
610          tmpSocket.close();
611        }
612        catch (Exception e)
613        {
614          Message message = ERR_COULD_NOT_BIND_CHANGELOG.get(port, e.getMessage());
615          unacceptableReasons.add(message);
616          return false;
617        }
618    
619        return true;
620      }
621    
622      /**
623       * {@inheritDoc}
624       */
625      public ConfigChangeResult applyConfigurationChange(
626          ReplicationServerCfg configuration)
627      {
628        // Changing those properties don't need specific code.
629        // They will be applied for next connections.
630        replicationServers = configuration.getReplicationServer();
631        if (replicationServers == null)
632          replicationServers = new ArrayList<String>();
633        queueSize = configuration.getQueueSize();
634        long newPurgeDelay = configuration.getReplicationPurgeDelay();
635        if (newPurgeDelay != purgeDelay)
636        {
637          purgeDelay = newPurgeDelay;
638          // propagate
639          for (ReplicationServerDomain domain : baseDNs.values())
640          {
641            domain.setPurgeDelay(purgeDelay);
642          }
643        }
644    
645        rcvWindow = configuration.getWindowSize();
646    
647        // changing the listen port requires to stop the listen thread
648        // and restart it.
649        int newPort = configuration.getReplicationPort();
650        if (newPort != replicationPort)
651        {
652          stopListen = true;
653          try
654          {
655            listenSocket.close();
656            listenThread.join();
657            stopListen = false;
658    
659            replicationPort = newPort;
660            String localhostname = InetAddress.getLocalHost().getHostName();
661            String localAdddress = InetAddress.getLocalHost().getHostAddress();
662            serverURL = localhostname + ":" + String.valueOf(replicationPort);
663            localURL = localAdddress + ":" + String.valueOf(replicationPort);
664            listenSocket = new ServerSocket();
665            listenSocket.setReceiveBufferSize(1000000);
666            listenSocket.bind(new InetSocketAddress(replicationPort));
667    
668            listenThread =
669              new ReplicationServerListenThread(
670                  "Replication Server Listener", this);
671            listenThread.start();
672          }
673          catch (IOException e)
674          {
675            Message message = ERR_COULD_NOT_CLOSE_THE_SOCKET.get(e.toString());
676            logError(message);
677            new ConfigChangeResult(DirectoryServer.getServerErrorResultCode(),
678                                   false);
679          }
680          catch (InterruptedException e)
681          {
682            Message message = ERR_COULD_NOT_STOP_LISTEN_THREAD.get(e.toString());
683            logError(message);
684            new ConfigChangeResult(DirectoryServer.getServerErrorResultCode(),
685                                   false);
686          }
687        }
688    
689        if ((configuration.getReplicationDBDirectory() != null) &&
690            (!dbDirname.equals(configuration.getReplicationDBDirectory())))
691        {
692          return new ConfigChangeResult(ResultCode.SUCCESS, true);
693        }
694    
695        return new ConfigChangeResult(ResultCode.SUCCESS, false);
696      }
697    
698      /**
699       * {@inheritDoc}
700       */
701      public boolean isConfigurationChangeAcceptable(
702          ReplicationServerCfg configuration, List<Message> unacceptableReasons)
703      {
704        return true;
705      }
706    
707      /**
708       * {@inheritDoc}
709       */
710      @Override
711      public void initializeMonitorProvider(MonitorProviderCfg configuraiton)
712      {
713        // Nothing to do for now
714      }
715    
716      /**
717       * {@inheritDoc}
718       */
719      @Override
720      public String getMonitorInstanceName()
721      {
722        return "Replication Server " + this.replicationPort + " "
723               + replicationServerId;
724      }
725    
726      /**
727       * {@inheritDoc}
728       */
729      @Override
730      public long getUpdateInterval()
731      {
732        /* we don't wont to do polling on this monitor */
733        return 0;
734      }
735    
736      /**
737       * {@inheritDoc}
738       */
739      @Override
740      public void updateMonitorData()
741      {
742        // As long as getUpdateInterval() returns 0, this will never get called
743    
744      }
745    
746      /**
747       * {@inheritDoc}
748       */
749      @Override
750      public ArrayList<Attribute> getMonitorData()
751      {
752        /*
753         * publish the server id and the port number.
754         */
755        ArrayList<Attribute> attributes = new ArrayList<Attribute>();
756        attributes.add(new Attribute("replication server id",
757            String.valueOf(serverId)));
758        attributes.add(new Attribute("replication server port",
759            String.valueOf(replicationPort)));
760    
761        /*
762         * Add all the base DNs that are known by this replication server.
763         */
764        AttributeType baseType=
765          DirectoryServer.getAttributeType("base-dn", true);
766        LinkedHashSet<AttributeValue> baseValues =
767          new LinkedHashSet<AttributeValue>();
768        for (DN base : baseDNs.keySet())
769        {
770          baseValues.add(new AttributeValue(baseType, base. toString()));
771        }
772    
773        Attribute bases = new Attribute(baseType, "base-dn", baseValues);
774        attributes.add(bases);
775    
776        // Publish to monitor the generation ID by replicationServerDomain
777        AttributeType generationIdType=
778          DirectoryServer.getAttributeType("base-dn-generation-id", true);
779        LinkedHashSet<AttributeValue> generationIdValues =
780          new LinkedHashSet<AttributeValue>();
781        for (DN base : baseDNs.keySet())
782        {
783          long generationId=-1;
784          ReplicationServerDomain replicationServerDomain =
785                  getReplicationServerDomain(base, false);
786          if (replicationServerDomain != null)
787            generationId = replicationServerDomain.getGenerationId();
788          generationIdValues.add(new AttributeValue(generationIdType,
789              base.toString() + " " + generationId));
790        }
791        Attribute generationIds = new Attribute(generationIdType, "generation-id",
792            generationIdValues);
793        attributes.add(generationIds);
794    
795        return attributes;
796      }
797    
798      /**
799       * Get the value of generationId for the replication replicationServerDomain
800       * associated with the provided baseDN.
801       *
802       * @param baseDN The baseDN of the replicationServerDomain.
803       * @return The value of the generationID.
804       */
805      public long getGenerationId(DN baseDN)
806      {
807        ReplicationServerDomain rsd =
808                this.getReplicationServerDomain(baseDN, false);
809        if (rsd!=null)
810          return rsd.getGenerationId();
811        return -1;
812      }
813    
814      /**
815       * Get the serverId for this replication server.
816       *
817       * @return The value of the serverId.
818       *
819       */
820      public short getServerId()
821      {
822        return serverId;
823      }
824    
825      /**
826       * Creates the backend associated to this replication server.
827       * @throws ConfigException
828       */
829      private void createBackend()
830      throws ConfigException
831      {
832        try
833        {
834          String ldif = makeLdif(
835              "dn: ds-cfg-backend-id="+backendId+",cn=Backends,cn=config",
836              "objectClass: top",
837              "objectClass: ds-cfg-backend",
838              "ds-cfg-base-dn: dc="+backendId,
839              "ds-cfg-enabled: true",
840              "ds-cfg-writability-mode: enabled",
841              "ds-cfg-java-class: " +
842                "org.opends.server.replication.server.ReplicationBackend",
843              "ds-cfg-backend-id: " + backendId);
844    
845          LDIFImportConfig ldifImportConfig = new LDIFImportConfig(
846              new StringReader(ldif));
847          LDIFReader reader = new LDIFReader(ldifImportConfig);
848          Entry backendConfigEntry = reader.readEntry();
849          if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN))
850          {
851            // Add the replication backend
852            DirectoryServer.getConfigHandler().addEntry(backendConfigEntry, null);
853          }
854          ldifImportConfig.close();
855        }
856        catch(Exception e)
857        {
858          MessageBuilder mb = new MessageBuilder();
859          mb.append(e.getLocalizedMessage());
860          Message msg = ERR_CHECK_CREATE_REPL_BACKEND_FAILED.get(mb.toString());
861          throw new ConfigException(msg, e);
862    
863        }
864      }
865    
866      private static String makeLdif(String... lines)
867      {
868        StringBuilder buffer = new StringBuilder();
869        for (String line : lines) {
870          buffer.append(line).append(EOL);
871        }
872        // Append an extra line so we can append LDIF Strings.
873        buffer.append(EOL);
874        return buffer.toString();
875      }
876    
877      /**
878       * Do what needed when the config object related to this replication server
879       * is deleted from the server configuration.
880       */
881      public void remove()
882      {
883        if (debugEnabled())
884          TRACER.debugInfo("RS " +getMonitorInstanceName()+
885              " starts removing");
886    
887        shutdown();
888        removeBackend();
889    
890        DirectoryServer.deregisterBackupTaskListener(this);
891        DirectoryServer.deregisterRestoreTaskListener(this);
892        DirectoryServer.deregisterExportTaskListener(this);
893        DirectoryServer.deregisterImportTaskListener(this);
894      }
895    
896      /**
897       * Removes the backend associated to this Replication Server that has been
898       * created when this replication server was created.
899       */
900      protected void removeBackend()
901      {
902        try
903        {
904          if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN))
905          {
906            // Delete the replication backend
907            DirectoryServer.getConfigHandler().deleteEntry(backendConfigEntryDN,
908                null);
909          }
910        }
911        catch(Exception e)
912        {
913          MessageBuilder mb = new MessageBuilder();
914          mb.append(e.getLocalizedMessage());
915          Message msg = ERR_DELETE_REPL_BACKEND_FAILED.get(mb.toString());
916          logError(msg);
917        }
918      }
919      /**
920       * {@inheritDoc}
921       */
922      public void processBackupBegin(Backend backend, BackupConfig config)
923      {
924        // Nothing is needed at the moment
925      }
926    
927      /**
928       * {@inheritDoc}
929       */
930      public void processBackupEnd(Backend backend, BackupConfig config,
931                                   boolean successful)
932      {
933        // Nothing is needed at the moment
934      }
935    
936      /**
937       * {@inheritDoc}
938       */
939      public void processRestoreBegin(Backend backend, RestoreConfig config)
940      {
941        if (backend.getBackendID().equals(backendId))
942          shutdown();
943      }
944    
945      /**
946       * {@inheritDoc}
947       */
948      public void processRestoreEnd(Backend backend, RestoreConfig config,
949                                    boolean successful)
950      {
951        if (backend.getBackendID().equals(backendId))
952          initialize(this.replicationServerId, this.replicationPort);
953      }
954    
955      /**
956       * {@inheritDoc}
957       */
958      public void processImportBegin(Backend backend, LDIFImportConfig config)
959      {
960        // Nothing is needed at the moment
961      }
962    
963      /**
964       * {@inheritDoc}
965       */
966      public void processImportEnd(Backend backend, LDIFImportConfig config,
967                                   boolean successful)
968      {
969        // Nothing is needed at the moment
970      }
971    
972      /**
973       * {@inheritDoc}
974       */
975      public void processExportBegin(Backend backend, LDIFExportConfig config)
976      {
977        if (debugEnabled())
978          TRACER.debugInfo("RS " +getMonitorInstanceName()+
979              " Export starts");
980        if (backend.getBackendID().equals(backendId))
981        {
982          // Retrieves the backend related to this replicationServerDomain
983          // backend =
984          ReplicationBackend b =
985          (ReplicationBackend)DirectoryServer.getBackend(backendId);
986          b.setServer(this);
987        }
988      }
989    
990      /**
991       * {@inheritDoc}
992       */
993      public void processExportEnd(Backend backend, LDIFExportConfig config,
994                                   boolean successful)
995      {
996        // Nothing is needed at the moment
997      }
998    
999      /**
1000       * Returns an iterator on the list of replicationServerDomain.
1001       * Returns null if none.
1002       * @return the iterator.
1003       */
1004      public Iterator<ReplicationServerDomain> getCacheIterator()
1005      {
1006        if (!baseDNs.isEmpty())
1007          return baseDNs.values().iterator();
1008        else
1009          return null;
1010      }
1011    
1012      /**
1013       * Clears the Db associated with that server.
1014       */
1015      public void clearDb()
1016      {
1017        Iterator<ReplicationServerDomain> rcachei = getCacheIterator();
1018        if (rcachei != null)
1019        {
1020          while (rcachei.hasNext())
1021          {
1022            ReplicationServerDomain rsd = rcachei.next();
1023            rsd.clearDbs();
1024          }
1025        }
1026      }
1027    }