001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.plugin;
028    import static org.opends.messages.ReplicationMessages.*;
029    import static org.opends.messages.ToolMessages.*;
030    import static org.opends.server.loggers.ErrorLogger.logError;
031    import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
032    import static org.opends.server.loggers.debug.DebugLogger.getTracer;
033    import static org.opends.server.replication.plugin.Historical.ENTRYUIDNAME;
034    import static org.opends.server.replication.protocol.OperationContext.*;
035    import static org.opends.server.util.ServerConstants.*;
036    import static org.opends.server.util.StaticUtils.createEntry;
037    import static org.opends.server.util.StaticUtils.getFileForPath;
038    import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
039    
040    import java.io.File;
041    import java.io.IOException;
042    import java.io.OutputStream;
043    import java.net.SocketTimeoutException;
044    import java.util.ArrayList;
045    import java.util.Collection;
046    import java.util.HashSet;
047    import java.util.LinkedHashMap;
048    import java.util.LinkedHashSet;
049    import java.util.LinkedList;
050    import java.util.List;
051    import java.util.NoSuchElementException;
052    import java.util.SortedMap;
053    import java.util.TreeMap;
054    import java.util.concurrent.LinkedBlockingQueue;
055    import java.util.concurrent.atomic.AtomicInteger;
056    import java.util.zip.Adler32;
057    import java.util.zip.CheckedOutputStream;
058    import java.util.zip.DataFormatException;
059    
060    import org.opends.messages.Message;
061    import org.opends.messages.MessageBuilder;
062    import org.opends.server.admin.server.ConfigurationChangeListener;
063    import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
064    import org.opends.server.admin.std.server.ReplicationDomainCfg;
065    import org.opends.server.api.AlertGenerator;
066    import org.opends.server.api.Backend;
067    import org.opends.server.api.DirectoryThread;
068    import org.opends.server.api.SynchronizationProvider;
069    import org.opends.server.backends.jeb.BackendImpl;
070    import org.opends.server.backends.task.Task;
071    import org.opends.server.config.ConfigException;
072    import org.opends.server.core.AddOperation;
073    import org.opends.server.core.DeleteOperation;
074    import org.opends.server.core.DirectoryServer;
075    import org.opends.server.core.LockFileManager;
076    import org.opends.server.core.ModifyDNOperation;
077    import org.opends.server.core.ModifyDNOperationBasis;
078    import org.opends.server.core.ModifyOperation;
079    import org.opends.server.core.ModifyOperationBasis;
080    import org.opends.server.loggers.debug.DebugTracer;
081    import org.opends.server.protocols.asn1.ASN1Exception;
082    import org.opends.server.protocols.asn1.ASN1OctetString;
083    import org.opends.server.protocols.internal.InternalClientConnection;
084    import org.opends.server.protocols.internal.InternalSearchOperation;
085    import org.opends.server.protocols.ldap.LDAPAttribute;
086    import org.opends.server.protocols.ldap.LDAPFilter;
087    import org.opends.server.protocols.ldap.LDAPModification;
088    import org.opends.server.replication.common.ChangeNumber;
089    import org.opends.server.replication.common.ChangeNumberGenerator;
090    import org.opends.server.replication.common.ServerState;
091    import org.opends.server.replication.protocol.AckMessage;
092    import org.opends.server.replication.protocol.AddContext;
093    import org.opends.server.replication.protocol.AddMsg;
094    import org.opends.server.replication.protocol.DeleteContext;
095    import org.opends.server.replication.protocol.DoneMessage;
096    import org.opends.server.replication.protocol.EntryMessage;
097    import org.opends.server.replication.protocol.ErrorMessage;
098    import org.opends.server.replication.protocol.HeartbeatMessage;
099    import org.opends.server.replication.protocol.InitializeRequestMessage;
100    import org.opends.server.replication.protocol.InitializeTargetMessage;
101    import org.opends.server.replication.protocol.ModifyContext;
102    import org.opends.server.replication.protocol.ModifyDNMsg;
103    import org.opends.server.replication.protocol.ModifyDnContext;
104    import org.opends.server.replication.protocol.OperationContext;
105    import org.opends.server.replication.protocol.ReplSessionSecurity;
106    import org.opends.server.replication.protocol.ReplicationMessage;
107    import org.opends.server.replication.protocol.ResetGenerationId;
108    import org.opends.server.replication.protocol.RoutableMessage;
109    import org.opends.server.replication.protocol.UpdateMessage;
110    import org.opends.server.tasks.InitializeTargetTask;
111    import org.opends.server.tasks.InitializeTask;
112    import org.opends.server.tasks.TaskUtils;
113    import org.opends.server.types.ExistingFileBehavior;
114    import org.opends.server.types.AbstractOperation;
115    import org.opends.server.types.Attribute;
116    import org.opends.server.types.AttributeType;
117    import org.opends.server.types.AttributeValue;
118    import org.opends.server.types.ConfigChangeResult;
119    import org.opends.server.types.Control;
120    import org.opends.server.types.DN;
121    import org.opends.server.types.DereferencePolicy;
122    import org.opends.server.types.DirectoryException;
123    import org.opends.server.types.Entry;
124    import org.opends.server.types.LDAPException;
125    import org.opends.server.types.LDIFExportConfig;
126    import org.opends.server.types.LDIFImportConfig;
127    import org.opends.server.types.Modification;
128    import org.opends.server.types.ModificationType;
129    import org.opends.server.types.Operation;
130    import org.opends.server.types.RDN;
131    import org.opends.server.types.RawModification;
132    import org.opends.server.types.ResultCode;
133    import org.opends.server.types.SearchFilter;
134    import org.opends.server.types.SearchResultEntry;
135    import org.opends.server.types.SearchScope;
136    import org.opends.server.types.SynchronizationProviderResult;
137    import org.opends.server.types.operation.PluginOperation;
138    import org.opends.server.types.operation.PostOperationOperation;
139    import org.opends.server.types.operation.PreOperationAddOperation;
140    import org.opends.server.types.operation.PreOperationDeleteOperation;
141    import org.opends.server.types.operation.PreOperationModifyDNOperation;
142    import org.opends.server.types.operation.PreOperationModifyOperation;
143    import org.opends.server.types.operation.PreOperationOperation;
144    import org.opends.server.workflowelement.localbackend.*;
145    
146    /**
147     *  This class implements the bulk part of the.of the Directory Server side
148     *  of the replication code.
149     *  It contains the root method for publishing a change,
150     *  processing a change received from the replicationServer service,
151     *  handle conflict resolution,
152     *  handle protocol messages from the replicationServer.
153     */
154    public class ReplicationDomain extends DirectoryThread
155           implements ConfigurationChangeListener<ReplicationDomainCfg>,
156                      AlertGenerator
157    {
158      /**
159       * The fully-qualified name of this class.
160       */
161      private static final String CLASS_NAME =
162           "org.opends.server.replication.plugin.ReplicationDomain";
163    
164      /**
165       * The attribute used to mark conflicting entries.
166       * The value of this attribute should be the dn that this entry was
167       * supposed to have when it was marked as conflicting.
168       */
169      public static final String DS_SYNC_CONFLICT = "ds-sync-conflict";
170    
171      /**
172       * The tracer object for the debug logger.
173       */
174      private static final DebugTracer TRACER = getTracer();
175    
176      private ReplicationMonitor monitor;
177    
178      private ReplicationBroker broker;
179      // Thread waiting for incoming update messages for this domain and pushing
180      // them to the global incoming update message queue for later processing by
181      // replay threads.
182      private ListenerThread listenerThread;
183      // The update to replay message queue where the listener thread is going to
184      // push incoming update messages.
185      private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
186      private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
187        new TreeMap<ChangeNumber, UpdateMessage>();
188      private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
189      private AtomicInteger numSentUpdates = new AtomicInteger(0);
190      private AtomicInteger numProcessedUpdates = new AtomicInteger();
191      private AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
192      private AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
193      private AtomicInteger numUnresolvedNamingConflicts = new AtomicInteger();
194      private int debugCount = 0;
195      private PersistentServerState state;
196      private int numReplayedPostOpCalled = 0;
197    
198      private int maxReceiveQueue = 0;
199      private int maxSendQueue = 0;
200      private int maxReceiveDelay = 0;
201      private int maxSendDelay = 0;
202    
203      private long generationId = -1;
204      private boolean generationIdSavedStatus = false;
205    
206      ChangeNumberGenerator generator;
207    
208      /**
209       * This object is used to store the list of update currently being
210       * done on the local database.
211       * Is is usefull to make sure that the local operations are sent in a
212       * correct order to the replication server and that the ServerState
213       * is not updated too early.
214       */
215      private PendingChanges pendingChanges;
216    
217      /**
218       * It contain the updates that were done on other servers, transmitted
219       * by the replication server and that are currently replayed.
220       * It is usefull to make sure that dependencies between operations
221       * are correctly fullfilled and to to make sure that the ServerState is
222       * not updated too early.
223       */
224      private RemotePendingChanges remotePendingChanges;
225    
226      /**
227       * The time in milliseconds between heartbeats from the replication
228       * server.  Zero means heartbeats are off.
229       */
230      private long heartbeatInterval = 0;
231      short serverId;
232    
233      // The context related to an import or export being processed
234      // Null when none is being processed.
235      private IEContext ieContext = null;
236    
237      private Collection<String> replicationServers;
238    
239      private DN baseDN;
240    
241      private boolean shutdown = false;
242    
243      private InternalClientConnection conn =
244          InternalClientConnection.getRootConnection();
245    
246      private boolean solveConflictFlag = true;
247    
248      private boolean disabled = false;
249      private boolean stateSavingDisabled = false;
250    
251      private int window = 100;
252    
253      /**
254       * The isolation policy that this domain is going to use.
255       * This field describes the behavior of the domain when an update is
256       * attempted and the domain could not connect to any Replication Server.
257       * Possible values are accept-updates or deny-updates, but other values
258       * may be added in the futur.
259       */
260      private IsolationPolicy isolationpolicy;
261    
262      /**
263       * The DN of the configuration entry of this domain.
264       */
265      private DN configDn;
266    
267      /**
268       * A boolean indicating if the thread used to save the persistentServerState
269       * is terminated.
270       */
271      private boolean done = true;
272    
273      /**
274       * This class contain the context related to an import or export
275       * launched on the domain.
276       */
277      private class IEContext
278      {
279        // The task that initiated the operation.
280        Task initializeTask;
281        // The input stream for the import
282        ReplLDIFInputStream ldifImportInputStream = null;
283        // The target in the case of an export
284        short exportTarget = RoutableMessage.UNKNOWN_SERVER;
285        // The source in the case of an import
286        short importSource = RoutableMessage.UNKNOWN_SERVER;
287    
288        // The total entry count expected to be processed
289        long entryCount = 0;
290        // The count for the entry not yet processed
291        long entryLeftCount = 0;
292    
293        boolean checksumOutput = false;
294    
295        // The exception raised when any
296        DirectoryException exception = null;
297        long checksumOutputValue = (long)0;
298    
299        /**
300         * Initializes the import/export counters with the provider value.
301         * @param count The value with which to initialize the counters.
302         */
303        public void setCounters(long total, long left)
304          throws DirectoryException
305        {
306          entryCount = total;
307          entryLeftCount = left;
308    
309          if (initializeTask != null)
310          {
311            if (initializeTask instanceof InitializeTask)
312            {
313              ((InitializeTask)initializeTask).setTotal(entryCount);
314              ((InitializeTask)initializeTask).setLeft(entryCount);
315            }
316            else if (initializeTask instanceof InitializeTargetTask)
317            {
318              ((InitializeTargetTask)initializeTask).setTotal(entryCount);
319              ((InitializeTargetTask)initializeTask).setLeft(entryCount);
320            }
321          }
322        }
323    
324        /**
325         * Update the counters of the task for each entry processed during
326         * an import or export.
327         */
328        public void updateCounters()
329          throws DirectoryException
330        {
331          entryLeftCount--;
332    
333          if (initializeTask != null)
334          {
335            if (initializeTask instanceof InitializeTask)
336            {
337              ((InitializeTask)initializeTask).setLeft(entryLeftCount);
338            }
339            else if (initializeTask instanceof InitializeTargetTask)
340            {
341              ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
342            }
343          }
344        }
345    
346        /**
347         * {@inheritDoc}
348         */
349        public String toString()
350        {
351          return new String("[ Entry count=" + this.entryCount +
352                            ", Entry left count=" + this.entryLeftCount + "]");
353        }
354      }
355    
356      /**
357       * This thread is launched when we want to export data to another server that
358       * has requested to be initialized with the data of our backend.
359       */
360      private class ExportThread extends DirectoryThread
361      {
362        // Id of server that will receive updates
363        private short target;
364    
365        /**
366         * Constructor for the ExportThread.
367         *
368         * @param target Id of server that will receive updates
369         */
370        public ExportThread(short target)
371        {
372          super("Export thread");
373          this.target = target;
374        }
375    
376        /**
377         * Run method for this class.
378         */
379        public void run()
380        {
381          if (debugEnabled())
382          {
383            TRACER.debugInfo("Export thread starting.");
384          }
385    
386          try
387          {
388            initializeRemote(target, target, null);
389          } catch (DirectoryException de)
390          {
391          // An error message has been sent to the peer
392          // Nothing more to do locally
393          }
394          if (debugEnabled())
395          {
396            TRACER.debugInfo("Export thread stopping.");
397          }
398        }
399      }
400    
401      /**
402       * Creates a new ReplicationDomain using configuration from configEntry.
403       *
404       * @param configuration    The configuration of this ReplicationDomain.
405       * @param updateToReplayQueue The queue for update messages to replay.
406       * @throws ConfigException In case of invalid configuration.
407       */
408      public ReplicationDomain(ReplicationDomainCfg configuration,
409        LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
410        throws ConfigException
411      {
412        super("replicationDomain_" + configuration.getBaseDN());
413    
414        // Read the configuration parameters.
415        replicationServers = configuration.getReplicationServer();
416        serverId = (short) configuration.getServerId();
417        baseDN = configuration.getBaseDN();
418        window  = configuration.getWindowSize();
419        heartbeatInterval = configuration.getHeartbeatInterval();
420        isolationpolicy = configuration.getIsolationPolicy();
421        configDn = configuration.dn();
422        this.updateToReplayQueue = updateToReplayQueue;
423    
424        /*
425         * Modify conflicts are solved for all suffixes but the schema suffix
426         * because we don't want to store extra information in the schema
427         * ldif files.
428         * This has no negative impact because the changes on schema should
429         * not produce conflicts.
430         */
431        if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0)
432        {
433          solveConflictFlag = false;
434        }
435        else
436        {
437          solveConflictFlag = true;
438        }
439    
440        /*
441         * Create a new Persistent Server State that will be used to store
442         * the last ChangeNmber seen from all LDAP servers in the topology.
443         */
444        state = new PersistentServerState(baseDN, serverId);
445    
446        /*
447         * Create a replication monitor object responsible for publishing
448         * monitoring information below cn=monitor.
449         */
450        monitor = new ReplicationMonitor(this);
451        DirectoryServer.registerMonitorProvider(monitor);
452    
453        Backend backend = retrievesBackend(baseDN);
454        if (backend == null)
455        {
456          throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(
457                                      baseDN.toNormalizedString()));
458        }
459    
460        try
461        {
462          generationId = loadGenerationId();
463        }
464        catch (DirectoryException e)
465        {
466          logError(ERR_LOADING_GENERATION_ID.get(
467              baseDN.toNormalizedString(), e.getLocalizedMessage()));
468        }
469    
470        /*
471         * create the broker object used to publish and receive changes
472         */
473        broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue,
474            maxReceiveDelay, maxSendQueue, maxSendDelay, window,
475            heartbeatInterval, generationId,
476            new ReplSessionSecurity(configuration));
477    
478        broker.start(replicationServers);
479    
480        /*
481         * ChangeNumberGenerator is used to create new unique ChangeNumbers
482         * for each operation done on this replication domain.
483         *
484         * The generator time is adjusted to the time of the last CN received from
485         * remote other servers.
486         */
487        generator =
488          new ChangeNumberGenerator(serverId, state);
489    
490        pendingChanges =
491          new PendingChanges(generator,
492                             broker, state);
493    
494        remotePendingChanges = new RemotePendingChanges(generator, state);
495    
496        // listen for changes on the configuration
497        configuration.addChangeListener(this);
498    
499        // register as an AltertGenerator
500        DirectoryServer.registerAlertGenerator(this);
501      }
502    
503    
504      /**
505       * Returns the base DN of this ReplicationDomain.
506       *
507       * @return The base DN of this ReplicationDomain
508       */
509      public DN getBaseDN()
510      {
511        return baseDN;
512      }
513    
514      /**
515       * Implement the  handleConflictResolution phase of the deleteOperation.
516       *
517       * @param deleteOperation The deleteOperation.
518       * @return A SynchronizationProviderResult indicating if the operation
519       *         can continue.
520       */
521      public SynchronizationProviderResult handleConflictResolution(
522             PreOperationDeleteOperation deleteOperation)
523      {
524        if ((!deleteOperation.isSynchronizationOperation())
525            && (!brokerIsConnected(deleteOperation)))
526        {
527          Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
528          return new SynchronizationProviderResult.StopProcessing(
529              ResultCode.UNWILLING_TO_PERFORM, msg);
530        }
531    
532        DeleteContext ctx =
533          (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT);
534        Entry deletedEntry = deleteOperation.getEntryToDelete();
535    
536        if (ctx != null)
537        {
538          /*
539           * This is a replication operation
540           * Check that the modified entry has the same entryuuid
541           * has was in the original message.
542           */
543          String operationEntryUUID = ctx.getEntryUid();
544          String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
545          if (!operationEntryUUID.equals(modifiedEntryUUID))
546          {
547            /*
548             * The changes entry is not the same entry as the one on
549             * the original change was performed.
550             * Probably the original entry was renamed and replaced with
551             * another entry.
552             * We must not let the change proceed, return a negative
553             * result and set the result code to NO_SUCH_OBJET.
554             * When the operation will return, the thread that started the
555             * operation will try to find the correct entry and restart a new
556             * operation.
557             */
558            return new SynchronizationProviderResult.StopProcessing(
559                ResultCode.NO_SUCH_OBJECT, null);
560          }
561        }
562        else
563        {
564          // There is no replication context attached to the operation
565          // so this is not a replication operation.
566          ChangeNumber changeNumber = generateChangeNumber(deleteOperation);
567          String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
568          ctx = new DeleteContext(changeNumber, modifiedEntryUUID);
569          deleteOperation.setAttachment(SYNCHROCONTEXT, ctx);
570        }
571        return new SynchronizationProviderResult.ContinueProcessing();
572      }
573    
574      /**
575       * Implement the  handleConflictResolution phase of the addOperation.
576       *
577       * @param addOperation The AddOperation.
578       * @return A SynchronizationProviderResult indicating if the operation
579       *         can continue.
580       */
581      public SynchronizationProviderResult handleConflictResolution(
582          PreOperationAddOperation addOperation)
583      {
584        if ((!addOperation.isSynchronizationOperation())
585            && (!brokerIsConnected(addOperation)))
586        {
587          Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
588          return new SynchronizationProviderResult.StopProcessing(
589              ResultCode.UNWILLING_TO_PERFORM, msg);
590        }
591    
592        if (addOperation.isSynchronizationOperation())
593        {
594          AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT);
595          /*
596           * If an entry with the same entry uniqueID already exist then
597           * this operation has already been replayed in the past.
598           */
599          String uuid = ctx.getEntryUid();
600          if (findEntryDN(uuid) != null)
601          {
602            return new SynchronizationProviderResult.StopProcessing(
603                ResultCode.CANCELED, null);
604          }
605    
606          /* The parent entry may have been renamed here since the change was done
607           * on the first server, and another entry have taken the former dn
608           * of the parent entry
609           */
610    
611          String parentUid = ctx.getParentUid();
612          // root entry have no parent,
613          // there is no need to check for it.
614          if (parentUid != null)
615          {
616            // There is a potential of perfs improvement here
617            // if we could avoid the following parent entry retrieval
618            DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
619    
620            if (parentDnFromCtx == null)
621            {
622              // The parent does not exist with the specified unique id
623              // stop the operation with NO_SUCH_OBJECT and let the
624              // conflict resolution or the dependency resolution solve this.
625              return new SynchronizationProviderResult.StopProcessing(
626                  ResultCode.NO_SUCH_OBJECT, null);
627            }
628            else
629            {
630              DN entryDN = addOperation.getEntryDN();
631              DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
632              if ((parentDnFromEntryDn != null)
633                  && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
634              {
635                // parentEntry has been renamed
636                // replication name conflict resolution is expected to fix that
637                // later in the flow
638                return new SynchronizationProviderResult.StopProcessing(
639                    ResultCode.NO_SUCH_OBJECT, null);
640              }
641            }
642          }
643        }
644        return new SynchronizationProviderResult.ContinueProcessing();
645      }
646    
647      /**
648       * Check that the broker associated to this ReplicationDomain has found
649       * a Replication Server and that this LDAP server is therefore able to
650       * process operations.
651       * If not set the ResultCode and the response message,
652       * interrupt the operation, and return false
653       *
654       * @param   op  The Operation that needs to be checked.
655       *
656       * @return  true when it OK to process the Operation, false otherwise.
657       *          When false is returned the resultCode and the reponse message
658       *          is also set in the Operation.
659       */
660      private boolean brokerIsConnected(PreOperationOperation op)
661      {
662        if (isolationpolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
663        {
664          // this policy imply that we always accept updates.
665          return true;
666        }
667        if (isolationpolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
668        {
669          // this isolation policy specifies that the updates are denied
670          // when the broker is not connected.
671          return broker.isConnected();
672        }
673        // we should never get there as the only possible policies are
674        // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
675        return true;
676      }
677    
678    
679      /**
680       * Implement the  handleConflictResolution phase of the ModifyDNOperation.
681       *
682       * @param modifyDNOperation The ModifyDNOperation.
683       * @return A SynchronizationProviderResult indicating if the operation
684       *         can continue.
685       */
686      public SynchronizationProviderResult handleConflictResolution(
687          PreOperationModifyDNOperation modifyDNOperation)
688      {
689        if ((!modifyDNOperation.isSynchronizationOperation())
690            && (!brokerIsConnected(modifyDNOperation)))
691        {
692          Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
693          return new SynchronizationProviderResult.StopProcessing(
694              ResultCode.UNWILLING_TO_PERFORM, msg);
695        }
696    
697        ModifyDnContext ctx =
698          (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT);
699        if (ctx != null)
700        {
701          /*
702           * This is a replication operation
703           * Check that the modified entry has the same entryuuid
704           * as was in the original message.
705           */
706          String modifiedEntryUUID =
707            Historical.getEntryUuid(modifyDNOperation.getOriginalEntry());
708          if (!modifiedEntryUUID.equals(ctx.getEntryUid()))
709          {
710            /*
711             * The modified entry is not the same entry as the one on
712             * the original change was performed.
713             * Probably the original entry was renamed and replaced with
714             * another entry.
715             * We must not let the change proceed, return a negative
716             * result and set the result code to NO_SUCH_OBJET.
717             * When the operation will return, the thread that started the
718             * operation will try to find the correct entry and restart a new
719             * operation.
720             */
721            return new SynchronizationProviderResult.StopProcessing(
722                ResultCode.NO_SUCH_OBJECT, null);
723          }
724          if (modifyDNOperation.getNewSuperior() != null)
725          {
726            /*
727             * Also check that the current id of the
728             * parent is the same as when the operation was performed.
729             */
730            String newParentId = findEntryId(modifyDNOperation.getNewSuperior());
731            if ((newParentId != null) &&
732                (!newParentId.equals(ctx.getNewParentId())))
733            {
734            return new SynchronizationProviderResult.StopProcessing(
735                ResultCode.NO_SUCH_OBJECT, null);
736            }
737          }
738        }
739        else
740        {
741          // There is no replication context attached to the operation
742          // so this is not a replication operation.
743          ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation);
744          String newParentId = null;
745          if (modifyDNOperation.getNewSuperior() != null)
746          {
747            newParentId = findEntryId(modifyDNOperation.getNewSuperior());
748          }
749    
750          Entry modifiedEntry = modifyDNOperation.getOriginalEntry();
751          String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
752          ctx = new ModifyDnContext(changeNumber, modifiedEntryUUID, newParentId);
753          modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx);
754        }
755        return new SynchronizationProviderResult.ContinueProcessing();
756      }
757    
758      /**
759       * Handle the conflict resolution.
760       * Called by the core server after locking the entry and before
761       * starting the actual modification.
762       * @param modifyOperation the operation
763       * @return code indicating is operation must proceed
764       */
765      public SynchronizationProviderResult handleConflictResolution(
766             PreOperationModifyOperation modifyOperation)
767      {
768        if ((!modifyOperation.isSynchronizationOperation())
769            && (!brokerIsConnected(modifyOperation)))
770        {
771          Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
772          return new SynchronizationProviderResult.StopProcessing(
773              ResultCode.UNWILLING_TO_PERFORM, msg);
774        }
775    
776        ModifyContext ctx =
777          (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT);
778    
779        Entry modifiedEntry = modifyOperation.getModifiedEntry();
780        if (ctx == null)
781        {
782          // There is no replication context attached to the operation
783          // so this is not a replication operation.
784          ChangeNumber changeNumber = generateChangeNumber(modifyOperation);
785          String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
786          if (modifiedEntryUUID == null)
787            modifiedEntryUUID = modifyOperation.getEntryDN().toString();
788          ctx = new ModifyContext(changeNumber, modifiedEntryUUID);
789          modifyOperation.setAttachment(SYNCHROCONTEXT, ctx);
790        }
791        else
792        {
793          // This is a replayed operation, it is necessary to
794          // - check if the entry has been renamed
795          // - check for conflicts
796          String modifiedEntryUUID = ctx.getEntryUid();
797          String currentEntryUUID = Historical.getEntryUuid(modifiedEntry);
798          if ((currentEntryUUID != null) &&
799              (!currentEntryUUID.equals(modifiedEntryUUID)))
800          {
801            /*
802             * The current modified entry is not the same entry as the one on
803             * the original modification was performed.
804             * Probably the original entry was renamed and replaced with
805             * another entry.
806             * We must not let the modification proceed, return a negative
807             * result and set the result code to NO_SUCH_OBJET.
808             * When the operation will return, the thread that started the
809             * operation will try to find the correct entry and restart a new
810             * operation.
811             */
812             return new SynchronizationProviderResult.StopProcessing(
813                  ResultCode.NO_SUCH_OBJECT, null);
814          }
815    
816          /*
817           * Solve the conflicts between modify operations
818           */
819          Historical historicalInformation = Historical.load(modifiedEntry);
820          modifyOperation.setAttachment(Historical.HISTORICAL,
821                                        historicalInformation);
822    
823          if (historicalInformation.replayOperation(modifyOperation, modifiedEntry))
824          {
825            numResolvedModifyConflicts.incrementAndGet();
826          }
827    
828          if (modifyOperation.getModifications().isEmpty())
829          {
830            /*
831             * This operation becomes a no-op due to conflict resolution
832             * stop the processing and send an OK result
833             */
834            return new SynchronizationProviderResult.StopProcessing(
835                ResultCode.SUCCESS, null);
836          }
837        }
838        return new SynchronizationProviderResult.ContinueProcessing();
839      }
840    
841      /**
842       * The preOperation phase for the add Operation.
843       * Its job is to generate the replication context associated to the
844       * operation. It is necessary to do it in this phase because contrary to
845       * the other operations, the entry uid is not set when the handleConflict
846       * phase is called.
847       *
848       * @param addOperation The Add Operation.
849       */
850      public void doPreOperation(PreOperationAddOperation addOperation)
851      {
852        AddContext ctx = new AddContext(generateChangeNumber(addOperation),
853            Historical.getEntryUuid(addOperation),
854            findEntryId(addOperation.getEntryDN().getParentDNInSuffix()));
855    
856        addOperation.setAttachment(SYNCHROCONTEXT, ctx);
857      }
858    
859      /**
860       * Receives an update message from the replicationServer.
861       * also responsible for updating the list of pending changes
862       * @return the received message - null if none
863       */
864      public UpdateMessage receive()
865      {
866        UpdateMessage update = null;
867    
868        while (update == null)
869        {
870          InitializeRequestMessage initMsg = null;
871          ReplicationMessage msg;
872          try
873          {
874            msg = broker.receive();
875            if (msg == null)
876            {
877              // The server is in the shutdown process
878              return null;
879            }
880    
881            if (debugEnabled())
882              if (!(msg instanceof HeartbeatMessage))
883                TRACER.debugVerbose("Message received <" + msg + ">");
884    
885            if (msg instanceof AckMessage)
886            {
887              AckMessage ack = (AckMessage) msg;
888              receiveAck(ack);
889            }
890            else if (msg instanceof InitializeRequestMessage)
891            {
892              // Another server requests us to provide entries
893              // for a total update
894              initMsg = (InitializeRequestMessage)msg;
895            }
896            else if (msg instanceof InitializeTargetMessage)
897            {
898              // Another server is exporting its entries to us
899              InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
900    
901              try
902              {
903                // This must be done while we are still holding the
904                // broker lock because we are now going to receive a
905                // bunch of entries from the remote server and we
906                // want the import thread to catch them and
907                // not the ListenerThread.
908                initialize(importMsg);
909              }
910              catch(DirectoryException de)
911              {
912                // Returns an error message to notify the sender
913                ErrorMessage errorMsg =
914                  new ErrorMessage(importMsg.getsenderID(),
915                      de.getMessageObject());
916                MessageBuilder mb = new MessageBuilder();
917                mb.append(de.getMessageObject());
918                TRACER.debugInfo(Message.toString(mb.toMessage()));
919                broker.publish(errorMsg);
920              }
921            }
922            else if (msg instanceof ErrorMessage)
923            {
924              if (ieContext != null)
925              {
926                // This is an error termination for the 2 following cases :
927                // - either during an export
928                // - or before an import really started
929                //   For example, when we publish a request and the
930                //  replicationServer did not find any import source.
931                abandonImportExport((ErrorMessage)msg);
932              }
933              else
934              {
935                /* We can receive an error message from the replication server
936                 * in the following cases :
937                 * - we connected with an incorrect generation id
938                 */
939                ErrorMessage errorMsg = (ErrorMessage)msg;
940                logError(ERR_ERROR_MSG_RECEIVED.get(
941                    errorMsg.getDetails()));
942              }
943            }
944            else if (msg instanceof UpdateMessage)
945            {
946              update = (UpdateMessage) msg;
947              receiveUpdate(update);
948            }
949          }
950          catch (SocketTimeoutException e)
951          {
952            // just retry
953          }
954          // Test if we have received and export request message and
955          // if that's the case handle it now.
956          // This must be done outside of the portion of code protected
957          // by the broker lock so that we keep receiveing update
958          // when we are doing and export and so that a possible
959          // closure of the socket happening when we are publishing the
960          // entries to the remote can be handled by the other
961          // replay thread when they call this method and therefore the
962          // broker.receive() method.
963          if (initMsg != null)
964          {
965            // Do this work in a thread to allow replay thread continue working
966            ExportThread exportThread = new ExportThread(initMsg.getsenderID());
967            exportThread.start();
968          }
969        }
970        return update;
971      }
972    
973      /**
974       * Do the necessary processing when an UpdateMessage was received.
975       *
976       * @param update The received UpdateMessage.
977       */
978      public void receiveUpdate(UpdateMessage update)
979      {
980        remotePendingChanges.putRemoteUpdate(update);
981        numRcvdUpdates.incrementAndGet();
982      }
983    
984      /**
985       * Do the necessary processing when an AckMessage is received.
986       *
987       * @param ack The AckMessage that was received.
988       */
989      public void receiveAck(AckMessage ack)
990      {
991        UpdateMessage update;
992        ChangeNumber changeNumber = ack.getChangeNumber();
993    
994        synchronized (waitingAckMsgs)
995        {
996          update = waitingAckMsgs.remove(changeNumber);
997        }
998        if (update != null)
999        {
1000          synchronized (update)
1001          {
1002            update.notify();
1003          }
1004        }
1005      }
1006    
1007      /**
1008       * Check if an operation must be synchronized.
1009       * Also update the list of pending changes and the server RUV
1010       * @param op the operation
1011       */
1012      public void synchronize(PostOperationOperation op)
1013      {
1014        ResultCode result = op.getResultCode();
1015        if ((result == ResultCode.SUCCESS) && op.isSynchronizationOperation())
1016        {
1017          numReplayedPostOpCalled++;
1018        }
1019        UpdateMessage msg = null;
1020    
1021        // Note that a failed non-replication operation might not have a change
1022        // number.
1023        ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op);
1024    
1025        boolean isAssured = isAssured(op);
1026    
1027        if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation()))
1028        {
1029          // Generate a replication message for a successful non-replication
1030          // operation.
1031          msg = UpdateMessage.generateMsg(op, isAssured);
1032    
1033          if (msg == null)
1034          {
1035            /*
1036             * This is an operation type that we do not know about
1037             * It should never happen.
1038             */
1039            pendingChanges.remove(curChangeNumber);
1040            Message message =
1041                ERR_UNKNOWN_TYPE.get(op.getOperationType().toString());
1042            logError(message);
1043            return;
1044          }
1045        }
1046    
1047        if (result == ResultCode.SUCCESS)
1048        {
1049          try
1050          {
1051            if (op.isSynchronizationOperation())
1052            {
1053              remotePendingChanges.commit(curChangeNumber);
1054            }
1055            else
1056            {
1057              pendingChanges.commit(curChangeNumber, msg);
1058            }
1059          }
1060          catch  (NoSuchElementException e)
1061          {
1062            Message message = ERR_OPERATION_NOT_FOUND_IN_PENDING.get(
1063                curChangeNumber.toString(), op.toString());
1064            logError(message);
1065            return;
1066          }
1067    
1068          if (msg != null && isAssured)
1069          {
1070            synchronized (waitingAckMsgs)
1071            {
1072              // Add the assured message to the list of update that are
1073              // waiting acknowledgements
1074              waitingAckMsgs.put(curChangeNumber, msg);
1075            }
1076          }
1077    
1078          if (generationIdSavedStatus != true)
1079          {
1080            this.saveGenerationId(generationId);
1081          }
1082        }
1083        else if (!op.isSynchronizationOperation())
1084        {
1085          // Remove an unsuccessful non-replication operation from the pending
1086          // changes list.
1087          if (curChangeNumber != null)
1088          {
1089            pendingChanges.remove(curChangeNumber);
1090          }
1091        }
1092    
1093        if (!op.isSynchronizationOperation())
1094        {
1095          int pushedChanges = pendingChanges.pushCommittedChanges();
1096          numSentUpdates.addAndGet(pushedChanges);
1097        }
1098    
1099        // Wait for acknowledgement of an assured message.
1100        if (msg != null && isAssured)
1101        {
1102          synchronized (msg)
1103          {
1104            while (waitingAckMsgs.containsKey(msg.getChangeNumber()))
1105            {
1106              // TODO : should have a configurable timeout to get
1107              // out of this loop
1108              try
1109              {
1110                msg.wait(1000);
1111              } catch (InterruptedException e)
1112              { }
1113            }
1114          }
1115        }
1116      }
1117    
1118      /**
1119       * get the number of updates received by the replication plugin.
1120       *
1121       * @return the number of updates received
1122       */
1123      public int getNumRcvdUpdates()
1124      {
1125        if (numRcvdUpdates != null)
1126          return numRcvdUpdates.get();
1127        else
1128          return 0;
1129      }
1130    
1131      /**
1132       * Get the number of updates sent by the replication plugin.
1133       *
1134       * @return the number of updates sent
1135       */
1136      public int getNumSentUpdates()
1137      {
1138        if (numSentUpdates != null)
1139          return numSentUpdates.get();
1140        else
1141          return 0;
1142      }
1143    
1144      /**
1145       * Get the number of updates in the pending list.
1146       *
1147       * @return The number of updates in the pending list
1148       */
1149      public int getPendingUpdatesCount()
1150      {
1151        if (pendingChanges != null)
1152          return pendingChanges.size();
1153        else
1154          return 0;
1155      }
1156    
1157      /**
1158       * Increment the number of processed updates.
1159       */
1160      public void incProcessedUpdates()
1161      {
1162        numProcessedUpdates.incrementAndGet();
1163      }
1164    
1165      /**
1166       * get the number of updates replayed by the replication.
1167       *
1168       * @return The number of updates replayed by the replication
1169       */
1170      public int getNumProcessedUpdates()
1171      {
1172        if (numProcessedUpdates != null)
1173          return numProcessedUpdates.get();
1174        else
1175          return 0;
1176      }
1177    
1178      /**
1179       * get the number of updates replayed successfully by the replication.
1180       *
1181       * @return The number of updates replayed successfully
1182       */
1183      public int getNumReplayedPostOpCalled()
1184      {
1185        return numReplayedPostOpCalled;
1186      }
1187    
1188      /**
1189       * get the ServerState.
1190       *
1191       * @return the ServerState
1192       */
1193      public ServerState getServerState()
1194      {
1195        return state;
1196      }
1197    
1198      /**
1199       * Get the debugCount.
1200       *
1201       * @return Returns the debugCount.
1202       */
1203      public int getDebugCount()
1204      {
1205        return debugCount;
1206      }
1207    
1208      /**
1209       * Send an Ack message.
1210       *
1211       * @param changeNumber The ChangeNumber for which the ack must be sent.
1212       */
1213      public void ack(ChangeNumber changeNumber)
1214      {
1215        broker.publish(new AckMessage(changeNumber));
1216      }
1217    
1218      /**
1219       * {@inheritDoc}
1220       */
1221      @Override
1222      public void run()
1223      {
1224        done = false;
1225    
1226        // Create the listener thread
1227        listenerThread = new ListenerThread(this, updateToReplayQueue);
1228        listenerThread.start();
1229    
1230        while (shutdown  == false)
1231        {
1232          try
1233          {
1234            synchronized (this)
1235            {
1236              this.wait(1000);
1237              if (!disabled && !stateSavingDisabled )
1238              {
1239                // save the RUV
1240                state.save();
1241              }
1242            }
1243          } catch (InterruptedException e)
1244          { }
1245        }
1246        state.save();
1247    
1248        done = true;
1249      }
1250    
1251      /**
1252       * Shutdown this ReplicationDomain.
1253       */
1254      public void shutdown()
1255      {
1256        // stop the flush thread
1257        shutdown = true;
1258    
1259        // Stop the listener thread
1260        if (listenerThread != null)
1261        {
1262          listenerThread.shutdown();
1263        }
1264    
1265        synchronized (this)
1266        {
1267          this.notify();
1268        }
1269    
1270        DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
1271    
1272        DirectoryServer.deregisterAlertGenerator(this);
1273    
1274        // stop the ReplicationBroker
1275        broker.stop();
1276    
1277        // Wait for the listener thread to stop
1278        if (listenerThread != null)
1279          listenerThread.waitForShutdown();
1280    
1281        // wait for completion of the persistentServerState thread.
1282        try
1283        {
1284          while (!done)
1285          {
1286            Thread.sleep(50);
1287          }
1288        } catch (InterruptedException e)
1289        {
1290          // stop waiting when interrupted.
1291        }
1292      }
1293    
1294      /**
1295       * Get the name of the replicationServer to which this domain is currently
1296       * connected.
1297       *
1298       * @return the name of the replicationServer to which this domain
1299       *         is currently connected.
1300       */
1301      public String getReplicationServer()
1302      {
1303        if (broker != null)
1304          return broker.getReplicationServer();
1305        else
1306          return "Not connected";
1307      }
1308    
1309      /**
1310       * Create and replay a synchronized Operation from an UpdateMessage.
1311       *
1312       * @param msg The UpdateMessage to be replayed.
1313       */
1314      public void replay(UpdateMessage msg)
1315      {
1316        Operation op = null;
1317        boolean done = false;
1318        boolean dependency = false;
1319        ChangeNumber changeNumber = null;
1320        int retryCount = 10;
1321        boolean firstTry = true;
1322    
1323        // Try replay the operation, then flush (replaying) any pending operation
1324        // whose dependency has been replayed until no more left.
1325        do
1326        {
1327          try
1328          {
1329            while ((!dependency) && (!done) && (retryCount-- > 0))
1330            {
1331              op = msg.createOperation(conn);
1332    
1333              op.setInternalOperation(true);
1334              op.setSynchronizationOperation(true);
1335              changeNumber = OperationContext.getChangeNumber(op);
1336              ((AbstractOperation) op).run();
1337    
1338              // Try replay the operation
1339              ResultCode result = op.getResultCode();
1340    
1341              if (result != ResultCode.SUCCESS)
1342              {
1343                if (op instanceof ModifyOperation)
1344                {
1345                  ModifyOperation newOp = (ModifyOperation) op;
1346                  dependency = remotePendingChanges.checkDependencies(newOp);
1347                  if ((!dependency) && (!firstTry))
1348                  {
1349                    done = solveNamingConflict(newOp, msg);
1350                  }
1351                } else if (op instanceof DeleteOperation)
1352                {
1353                  DeleteOperation newOp = (DeleteOperation) op;
1354                  dependency = remotePendingChanges.checkDependencies(newOp);
1355                  if ((!dependency) && (!firstTry))
1356                  {
1357                    done = solveNamingConflict(newOp, msg);
1358                  }
1359                } else if (op instanceof AddOperation)
1360                {
1361                  AddOperation newOp = (AddOperation) op;
1362                  AddMsg addMsg = (AddMsg) msg;
1363                  dependency = remotePendingChanges.checkDependencies(newOp);
1364                  if ((!dependency) && (!firstTry))
1365                  {
1366                    done = solveNamingConflict(newOp, addMsg);
1367                  }
1368                } else if (op instanceof ModifyDNOperationBasis)
1369                {
1370                  ModifyDNMsg newMsg = (ModifyDNMsg) msg;
1371                  dependency = remotePendingChanges.checkDependencies(newMsg);
1372                  if ((!dependency) && (!firstTry))
1373                  {
1374                    ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
1375                    done = solveNamingConflict(newOp, msg);
1376                  }
1377                } else
1378                {
1379                  done = true;  // unknown type of operation ?!
1380                }
1381                if (done)
1382                {
1383                  // the update became a dummy update and the result
1384                  // of the conflict resolution phase is to do nothing.
1385                  // however we still need to push this change to the serverState
1386                  updateError(changeNumber);
1387                }
1388              } else
1389              {
1390                done = true;
1391              }
1392              firstTry = false;
1393            }
1394    
1395            if (!done && !dependency)
1396            {
1397              // Continue with the next change but the servers could now become
1398              // inconsistent.
1399              // Let the repair tool know about this.
1400              Message message = ERR_LOOP_REPLAYING_OPERATION.get(op.toString(),
1401                op.getErrorMessage().toString());
1402              logError(message);
1403              numUnresolvedNamingConflicts.incrementAndGet();
1404    
1405              updateError(changeNumber);
1406            }
1407          } catch (ASN1Exception e)
1408          {
1409            Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
1410              String.valueOf(msg) + stackTraceToSingleLineString(e));
1411            logError(message);
1412          } catch (LDAPException e)
1413          {
1414            Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
1415              String.valueOf(msg) + stackTraceToSingleLineString(e));
1416            logError(message);
1417          } catch (DataFormatException e)
1418          {
1419            Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
1420              String.valueOf(msg) + stackTraceToSingleLineString(e));
1421            logError(message);
1422          } catch (Exception e)
1423          {
1424            if (changeNumber != null)
1425            {
1426              /*
1427               * An Exception happened during the replay process.
1428               * Continue with the next change but the servers will now start
1429               * to be inconsistent.
1430               * Let the repair tool know about this.
1431               */
1432              Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
1433                stackTraceToSingleLineString(e), op.toString());
1434              logError(message);
1435              updateError(changeNumber);
1436            } else
1437            {
1438              Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
1439                String.valueOf(msg) + stackTraceToSingleLineString(e));
1440              logError(message);
1441            }
1442          } finally
1443          {
1444            if (!dependency)
1445            {
1446              broker.updateWindowAfterReplay();
1447              if (msg.isAssured())
1448                ack(msg.getChangeNumber());
1449              incProcessedUpdates();
1450            }
1451          }
1452    
1453          // Now replay any pending update that had a dependency and whose
1454          // dependency has been replayed, do that until no more updates of that
1455          // type left...
1456          msg = remotePendingChanges.getNextUpdate();
1457    
1458          // Prepare restart of loop
1459          done = false;
1460          dependency = false;
1461          changeNumber = null;
1462          retryCount = 10;
1463          firstTry = true;
1464    
1465        } while (msg != null);
1466      }
1467    
1468      /**
1469       * This method is called when an error happens while replaying
1470       * an operation.
1471       * It is necessary because the postOperation does not always get
1472       * called when error or Exceptions happen during the operation replay.
1473       *
1474       * @param changeNumber the ChangeNumber of the operation with error.
1475       */
1476      public void updateError(ChangeNumber changeNumber)
1477      {
1478        remotePendingChanges.commit(changeNumber);
1479      }
1480    
1481      /**
1482       * Generate a new change number and insert it in the pending list.
1483       *
1484       * @param operation The operation for which the change number must be
1485       *                  generated.
1486       * @return The new change number.
1487       */
1488      private ChangeNumber generateChangeNumber(PluginOperation operation)
1489      {
1490        return pendingChanges.putLocalOperation(operation);
1491      }
1492    
1493    
1494      /**
1495       * Find the Unique Id of the entry with the provided DN by doing a
1496       * search of the entry and extracting its uniqueID from its attributes.
1497       *
1498       * @param dn The dn of the entry for which the unique Id is searched.
1499       *
1500       * @return The unique Id of the entry whith the provided DN.
1501       */
1502      private String findEntryId(DN dn)
1503      {
1504        if (dn == null)
1505          return null;
1506        try
1507        {
1508          LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
1509          attrs.add(ENTRYUIDNAME);
1510          InternalSearchOperation search = conn.processSearch(dn,
1511                SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES,
1512                0, 0, false,
1513                SearchFilter.createFilterFromString("objectclass=*"),
1514                attrs);
1515    
1516          if (search.getResultCode() == ResultCode.SUCCESS)
1517          {
1518            LinkedList<SearchResultEntry> result = search.getSearchEntries();
1519            if (!result.isEmpty())
1520            {
1521              SearchResultEntry resultEntry = result.getFirst();
1522              if (resultEntry != null)
1523              {
1524                return Historical.getEntryUuid(resultEntry);
1525              }
1526            }
1527          }
1528        } catch (DirectoryException e)
1529        {
1530          // never happens because the filter is always valid.
1531        }
1532        return null;
1533      }
1534    
1535      /**
1536       * find the current dn of an entry from its entry uuid.
1537       *
1538       * @param uuid the Entry Unique ID.
1539       * @return The curernt dn of the entry or null if there is no entry with
1540       *         the specified uuid.
1541       */
1542      private DN findEntryDN(String uuid)
1543      {
1544        try
1545        {
1546          InternalSearchOperation search = conn.processSearch(baseDN,
1547                SearchScope.WHOLE_SUBTREE,
1548                SearchFilter.createFilterFromString("entryuuid="+uuid));
1549          if (search.getResultCode() == ResultCode.SUCCESS)
1550          {
1551            LinkedList<SearchResultEntry> result = search.getSearchEntries();
1552            if (!result.isEmpty())
1553            {
1554              SearchResultEntry resultEntry = result.getFirst();
1555              if (resultEntry != null)
1556              {
1557                return resultEntry.getDN();
1558              }
1559            }
1560          }
1561        } catch (DirectoryException e)
1562        {
1563          // never happens because the filter is always valid.
1564        }
1565        return null;
1566      }
1567    
1568      /**
1569       * Solve a conflict detected when replaying a modify operation.
1570       *
1571       * @param op The operation that triggered the conflict detection.
1572       * @param msg The operation that triggered the conflict detection.
1573       * @return true if the process is completed, false if it must continue..
1574       */
1575      private boolean solveNamingConflict(ModifyOperation op,
1576          UpdateMessage msg)
1577      {
1578        ResultCode result = op.getResultCode();
1579        ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT);
1580        String entryUid = ctx.getEntryUid();
1581    
1582        if (result == ResultCode.NO_SUCH_OBJECT)
1583        {
1584          /*
1585           * The operation is a modification but
1586           * the entry has been renamed on a different master in the same time.
1587           * search if the entry has been renamed, and return the new dn
1588           * of the entry.
1589           */
1590          DN newdn = findEntryDN(entryUid);
1591          if (newdn != null)
1592          {
1593            // There is an entry with the same unique id as this modify operation
1594            // replay the modify using the current dn of this entry.
1595            msg.setDn(newdn.toString());
1596            numResolvedNamingConflicts.incrementAndGet();
1597            return false;
1598          }
1599          else
1600          {
1601            // This entry does not exist anymore.
1602            // It has probably been deleted, stop the processing of this operation
1603            numResolvedNamingConflicts.incrementAndGet();
1604            return true;
1605          }
1606        }
1607        else
1608        {
1609          // The other type of errors can not be caused by naming conflicts.
1610          // Log a message for the repair tool.
1611          Message message = ERR_ERROR_REPLAYING_OPERATION.get(
1612              op.toString(), ctx.getChangeNumber().toString(),
1613              result.toString(), op.getErrorMessage().toString());
1614          logError(message);
1615          return true;
1616        }
1617      }
1618    
1619     /**
1620      * Solve a conflict detected when replaying a delete operation.
1621      *
1622      * @param op The operation that triggered the conflict detection.
1623      * @param msg The operation that triggered the conflict detection.
1624      * @return true if the process is completed, false if it must continue..
1625      */
1626     private boolean solveNamingConflict(DeleteOperation op,
1627         UpdateMessage msg)
1628     {
1629       ResultCode result = op.getResultCode();
1630       DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
1631       String entryUid = ctx.getEntryUid();
1632    
1633       if (result == ResultCode.NO_SUCH_OBJECT)
1634       {
1635         /*
1636          * Find if the entry is still in the database.
1637          */
1638         DN currentDn = findEntryDN(entryUid);
1639         if (currentDn == null)
1640         {
1641           /*
1642            * The entry has already been deleted, either because this delete
1643            * has already been replayed or because another concurrent delete
1644            * has already done the job.
1645            * In any case, there is is nothing more to do.
1646            */
1647           numResolvedNamingConflicts.incrementAndGet();
1648           return true;
1649         }
1650         else
1651         {
1652           /*
1653            * This entry has been renamed, replay the delete using its new DN.
1654            */
1655           msg.setDn(currentDn.toString());
1656           numResolvedNamingConflicts.incrementAndGet();
1657           return false;
1658         }
1659       }
1660       else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF)
1661       {
1662         /*
1663          * This may happen when we replay a DELETE done on a master
1664          * but children of this entry have been added on another master.
1665          *
1666          * Rename all the children by adding entryuuid in dn and delete this entry.
1667          *
1668          * The action taken here must be consistent with the actions
1669          * done in the solveNamingConflict(AddOperation) method
1670          * when we are adding an entry whose parent entry has already been deleted.
1671          */
1672         findAndRenameChild(entryUid, op.getEntryDN(), op);
1673         numUnresolvedNamingConflicts.incrementAndGet();
1674         return false;
1675       }
1676       else
1677       {
1678         // The other type of errors can not be caused by naming conflicts.
1679         // Log a message for the repair tool.
1680         Message message = ERR_ERROR_REPLAYING_OPERATION.get(
1681             op.toString(), ctx.getChangeNumber().toString(),
1682             result.toString(), op.getErrorMessage().toString());
1683         logError(message);
1684         return true;
1685       }
1686     }
1687    
1688      /**
1689     * Solve a conflict detected when replaying a Modify DN operation.
1690     *
1691     * @param op The operation that triggered the conflict detection.
1692     * @param msg The operation that triggered the conflict detection.
1693     * @return true if the process is completed, false if it must continue.
1694     * @throws Exception When the operation is not valid.
1695     */
1696    private boolean solveNamingConflict(ModifyDNOperation op,
1697        UpdateMessage msg) throws Exception
1698    {
1699      ResultCode result = op.getResultCode();
1700      ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
1701      String entryUid = ctx.getEntryUid();
1702      String newSuperiorID = ctx.getNewParentId();
1703    
1704      /*
1705       * four possible cases :
1706       * - the modified entry has been renamed
1707       * - the new parent has been renamed
1708       * - the operation is replayed for the second time.
1709       * - the entry has been deleted
1710       * action :
1711       *  - change the target dn and the new parent dn and
1712       *        restart the operation,
1713       *  - don't do anything if the operation is replayed.
1714       */
1715    
1716      // Construct the new DN to use for the entry.
1717      DN entryDN = op.getEntryDN();
1718      DN newSuperior = findEntryDN(newSuperiorID);
1719      RDN newRDN = op.getNewRDN();
1720      DN parentDN;
1721    
1722      if (newSuperior == null)
1723      {
1724        parentDN = entryDN.getParent();
1725      }
1726      else
1727      {
1728        parentDN = newSuperior;
1729      }
1730    
1731      if ((parentDN == null) || parentDN.isNullDN())
1732      {
1733        /* this should never happen
1734         * can't solve any conflict in this case.
1735         */
1736        throw new Exception("operation parameters are invalid");
1737      }
1738    
1739      DN newDN = parentDN.concat(newRDN);
1740    
1741      // get the current DN of this entry in the database.
1742      DN currentDN = findEntryDN(entryUid);
1743    
1744      if (currentDN == null)
1745      {
1746        // The entry targetted by the Modify DN is not in the database
1747        // anymore.
1748        // This is a conflict between a delete and this modify DN.
1749        // The entry has been deleted anymore so we can safely assume
1750        // that the operation is completed.
1751        numResolvedNamingConflicts.incrementAndGet();
1752        return true;
1753      }
1754    
1755      // if the newDN and the current DN match then the operation
1756      // is a no-op (this was probably a second replay)
1757      // don't do anything.
1758      if (newDN.equals(currentDN))
1759      {
1760        numResolvedNamingConflicts.incrementAndGet();
1761        return true;
1762      }
1763    
1764      // If we could not find the new parent entry, we missed this entry
1765      // earlier or it has disappeared from the database
1766      // Log this information for the repair tool and mark the entry
1767      // as conflicting.
1768      // stop the processing.
1769      if (newSuperior == null)
1770      {
1771        markConflictEntry(op, currentDN, newDN);
1772        numUnresolvedNamingConflicts.incrementAndGet();
1773        return true;
1774      }
1775    
1776      if ((result == ResultCode.NO_SUCH_OBJECT) ||
1777          (result == ResultCode.OBJECTCLASS_VIOLATION))
1778      {
1779        /*
1780         * The entry or it's new parent has not been found
1781         * reconstruct the operation with the DN we just built
1782         */
1783        ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
1784        msg.setDn(currentDN.toString());
1785        modifyDnMsg.setNewSuperior(newSuperior.toString());
1786        numResolvedNamingConflicts.incrementAndGet();
1787        return false;
1788      }
1789      else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
1790      {
1791        /*
1792         * This may happen when two modifyDn operation
1793         * are done on different servers but with the same target DN
1794         * add the conflict object class to the entry
1795         * and rename it using its entryuuid.
1796         */
1797        ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
1798        markConflictEntry(op, op.getEntryDN(), newDN);
1799        modifyDnMsg.setNewRDN(generateConflictRDN(entryUid,
1800                              modifyDnMsg.getNewRDN()));
1801        modifyDnMsg.setNewSuperior(newSuperior.toString());
1802        numUnresolvedNamingConflicts.incrementAndGet();
1803        return false;
1804      }
1805      else
1806      {
1807        // The other type of errors can not be caused by naming conflicts.
1808        // Log a message for the repair tool.
1809        Message message = ERR_ERROR_REPLAYING_OPERATION.get(
1810            op.toString(), ctx.getChangeNumber().toString(),
1811            result.toString(), op.getErrorMessage().toString());
1812        logError(message);
1813        return true;
1814      }
1815    }
1816    
1817    
1818      /**
1819       * Solve a conflict detected when replaying a ADD operation.
1820       *
1821       * @param op The operation that triggered the conflict detection.
1822       * @param msg The message that triggered the conflict detection.
1823       * @return true if the process is completed, false if it must continue.
1824       * @throws Exception When the operation is not valid.
1825       */
1826      private boolean solveNamingConflict(AddOperation op,
1827          AddMsg msg) throws Exception
1828      {
1829        ResultCode result = op.getResultCode();
1830        AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT);
1831        String entryUid = ctx.getEntryUid();
1832        String parentUniqueId = ctx.getParentUid();
1833    
1834        if (result == ResultCode.NO_SUCH_OBJECT)
1835        {
1836          /*
1837           * This can happen if the parent has been renamed or deleted
1838           * find the parent dn and calculate a new dn for the entry
1839           */
1840          if (parentUniqueId == null)
1841          {
1842            /*
1843             * This entry is the base dn of the backend.
1844             * It is quite surprising that the operation result be NO_SUCH_OBJECT.
1845             * There is nothing more we can do except TODO log a
1846             * message for the repair tool to look at this problem.
1847             */
1848            return true;
1849          }
1850          DN parentDn = findEntryDN(parentUniqueId);
1851          if (parentDn == null)
1852          {
1853            /*
1854             * The parent has been deleted
1855             * rename the entry as a conflicting entry.
1856             * The action taken here must be consistent with the actions
1857             * done when in the solveNamingConflict(DeleteOperation) method
1858             * when we are deleting an entry that have some child entries.
1859             */
1860            addConflict(msg);
1861    
1862            msg.setDn(generateConflictRDN(entryUid,
1863                        op.getEntryDN().getRDN().toString()) + ","
1864                        + baseDN);
1865            // reset the parent uid so that the check done is the handleConflict
1866            // phase does not fail.
1867            msg.setParentUid(null);
1868            numUnresolvedNamingConflicts.incrementAndGet();
1869            return false;
1870          }
1871          else
1872          {
1873            RDN entryRdn = DN.decode(msg.getDn()).getRDN();
1874            msg.setDn(entryRdn + "," + parentDn);
1875            numResolvedNamingConflicts.incrementAndGet();
1876            return false;
1877          }
1878        }
1879        else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
1880        {
1881          /*
1882           * This can happen if
1883           *  - two adds are done on different servers but with the
1884           *    same target DN.
1885           *  - the same ADD is being replayed for the second time on this server.
1886           * if the nsunique ID already exist, assume this is a replay and
1887           *        don't do anything
1888           * if the entry unique id do not exist, generate conflict.
1889           */
1890          if (findEntryDN(entryUid) != null)
1891          {
1892            // entry already exist : this is a replay
1893            return true;
1894          }
1895          else
1896          {
1897            addConflict(msg);
1898            msg.setDn(generateConflictRDN(entryUid, msg.getDn()));
1899            numUnresolvedNamingConflicts.incrementAndGet();
1900            return false;
1901          }
1902        }
1903        else
1904        {
1905          // The other type of errors can not be caused by naming conflicts.
1906          // log a message for the repair tool.
1907          Message message = ERR_ERROR_REPLAYING_OPERATION.get(
1908              op.toString(), ctx.getChangeNumber().toString(),
1909              result.toString(), op.getErrorMessage().toString());
1910          logError(message);
1911          return true;
1912        }
1913      }
1914    
1915      /**
1916       * Find all the entries below the provided DN and rename them
1917       * so that they stay below the baseDn of this replicationDomain and
1918       * use the conflicting name and attribute.
1919       *
1920       * @param entryUid   The unique ID of the entry whose child must be renamed.
1921       * @param entryDN    The DN of the entry whose child must be renamed.
1922       * @param conflictOp The Operation that generated the conflict.
1923       */
1924      private void findAndRenameChild(
1925          String entryUid, DN entryDN, Operation conflictOp)
1926      {
1927        // Find an rename child entries.
1928        InternalClientConnection conn =
1929          InternalClientConnection.getRootConnection();
1930    
1931        try
1932        {
1933          LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
1934          attrs.add(ENTRYUIDNAME);
1935    
1936          SearchFilter ALLMATCH;
1937          ALLMATCH = SearchFilter.createFilterFromString("(objectClass=*)");
1938          InternalSearchOperation op =
1939              conn.processSearch(entryDN, SearchScope.SINGLE_LEVEL,
1940                  DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, ALLMATCH,
1941                  attrs);
1942    
1943          if (op.getResultCode() == ResultCode.SUCCESS)
1944          {
1945            LinkedList<SearchResultEntry> entries = op.getSearchEntries();
1946            if (entries != null)
1947            {
1948              for (SearchResultEntry entry : entries)
1949              {
1950                markConflictEntry(conflictOp, entry.getDN(), entryDN);
1951                renameConflictEntry(conflictOp, entry.getDN(),
1952                                    Historical.getEntryUuid(entry));
1953              }
1954            }
1955          }
1956          else
1957          {
1958            // log error and information for the REPAIR tool.
1959            MessageBuilder mb = new MessageBuilder();
1960            mb.append(ERR_CANNOT_RENAME_CONFLICT_ENTRY.get());
1961            mb.append(String.valueOf(entryDN));
1962            mb.append(" ");
1963            mb.append(String.valueOf(conflictOp));
1964            mb.append(" ");
1965            mb.append(String.valueOf(op.getResultCode()));
1966            logError(mb.toMessage());
1967          }
1968        } catch (DirectoryException e)
1969        {
1970          // log errror and information for the REPAIR tool.
1971          MessageBuilder mb = new MessageBuilder();
1972          mb.append(ERR_EXCEPTION_RENAME_CONFLICT_ENTRY.get());
1973          mb.append(String.valueOf(entryDN));
1974          mb.append(" ");
1975          mb.append(String.valueOf(conflictOp));
1976          mb.append(" ");
1977          mb.append(e.getLocalizedMessage());
1978          logError(mb.toMessage());
1979        }
1980      }
1981    
1982    
1983      /**
1984       * Rename an entry that was conflicting so that it stays below the
1985       * baseDN of the replicationDomain.
1986       *
1987       * @param conflictOp The Operation that caused the conflict.
1988       * @param dn         The DN of the entry to be renamed.
1989       * @param uid        The uniqueID of the entry to be renamed.
1990       */
1991      private void renameConflictEntry(Operation conflictOp, DN dn, String uid)
1992      {
1993        InternalClientConnection conn =
1994          InternalClientConnection.getRootConnection();
1995    
1996        ModifyDNOperation newOp = conn.processModifyDN(
1997            dn, generateDeleteConflictDn(uid, dn),false, baseDN);
1998    
1999        if (newOp.getResultCode() != ResultCode.SUCCESS)
2000        {
2001          // log information for the repair tool.
2002          MessageBuilder mb = new MessageBuilder();
2003          mb.append(ERR_CANNOT_RENAME_CONFLICT_ENTRY.get());
2004          mb.append(String.valueOf(dn));
2005          mb.append(" ");
2006          mb.append(String.valueOf(conflictOp));
2007          mb.append(" ");
2008          mb.append(String.valueOf(newOp.getResultCode()));
2009          logError(mb.toMessage());
2010        }
2011      }
2012    
2013    
2014      /**
2015       * Generate a modification to add the conflict attribute to an entry
2016       * whose Dn is now conflicting with another entry.
2017       *
2018       * @param op        The operation causing the conflict.
2019       * @param currentDN The current DN of the operation to mark as conflicting.
2020       * @param conflictDN     The newDn on which the conflict happened.
2021       */
2022      private void markConflictEntry(Operation op, DN currentDN, DN conflictDN)
2023      {
2024        // create new internal modify operation and run it.
2025        InternalClientConnection conn =
2026          InternalClientConnection.getRootConnection();
2027    
2028        AttributeType attrType =
2029          DirectoryServer.getAttributeType(DS_SYNC_CONFLICT, true);
2030        LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
2031        values.add(new AttributeValue(attrType, conflictDN.toString()));
2032        Attribute attr = new Attribute(attrType, DS_SYNC_CONFLICT, values);
2033        List<Modification> mods = new ArrayList<Modification>();
2034        Modification mod = new Modification(ModificationType.REPLACE, attr);
2035        mods.add(mod);
2036        ModifyOperation newOp = conn.processModify(currentDN, mods);
2037        if (newOp.getResultCode() != ResultCode.SUCCESS)
2038        {
2039          // Log information for the repair tool.
2040          MessageBuilder mb = new MessageBuilder();
2041          mb.append(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE.get());
2042          mb.append(String.valueOf(op));
2043          mb.append(" ");
2044          mb.append(String.valueOf(newOp.getResultCode()));
2045          logError(mb.toMessage());
2046        }
2047    
2048        // Generate an alert to let the administratot know that some
2049        // conflict could not be solved.
2050        Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(conflictDN.toString());
2051        DirectoryServer.sendAlertNotification(this,
2052            ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
2053      }
2054    
2055      /**
2056       * Add the conflict attribute to an entry that could
2057       * not be added because it is conflicting with another entry.
2058       *
2059       * @param msg            The conflicting Add Operation.
2060       *
2061       * @throws ASN1Exception When an encoding error happenned manipulating the
2062       *                       msg.
2063       */
2064      private void addConflict(AddMsg msg) throws ASN1Exception
2065      {
2066        // Generate an alert to let the administratot know that some
2067        // conflict could not be solved.
2068        Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(msg.getDn());
2069        DirectoryServer.sendAlertNotification(this,
2070            ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
2071    
2072        // Add the conflict attribute
2073        msg.addAttribute(DS_SYNC_CONFLICT, msg.getDn());
2074      }
2075    
2076      /**
2077       * Generate the Dn to use for a conflicting entry.
2078       *
2079       * @param entryUid The unique identifier of the entry involved in the
2080       * conflict.
2081       * @param rdn Original rdn.
2082       * @return The generated RDN for a conflicting entry.
2083       */
2084      private String generateConflictRDN(String entryUid, String rdn)
2085      {
2086        return "entryuuid=" + entryUid + "+" + rdn;
2087      }
2088    
2089      /**
2090       * Generate the RDN to use for a conflicting entry whose father was deleted.
2091       *
2092       * @param entryUid The unique identifier of the entry involved in the
2093       *                 conflict.
2094       * @param dn       The original DN of the entry.
2095       *
2096       * @return The generated RDN for a conflicting entry.
2097       * @throws DirectoryException
2098       */
2099      private RDN generateDeleteConflictDn(String entryUid, DN dn)
2100      {
2101        String newRDN =  "entryuuid=" + entryUid + "+" + dn.getRDN();
2102        RDN rdn = null;
2103        try
2104        {
2105          rdn = RDN.decode(newRDN);
2106        } catch (DirectoryException e)
2107        {
2108          // cannot happen
2109        }
2110        return rdn;
2111      }
2112    
2113      /**
2114       * Check if an operation must be processed as an assured operation.
2115       *
2116       * @param op the operation to be checked.
2117       * @return true if the operations must be processed as an assured operation.
2118       */
2119      private boolean isAssured(PostOperationOperation op)
2120      {
2121        // TODO : should have a filtering mechanism for checking
2122        // operation that are assured and operations that are not.
2123        return false;
2124      }
2125    
2126      /**
2127       * Get the maximum receive window size.
2128       *
2129       * @return The maximum receive window size.
2130       */
2131      public int getMaxRcvWindow()
2132      {
2133        if (broker != null)
2134          return broker.getMaxRcvWindow();
2135        else
2136          return 0;
2137      }
2138    
2139      /**
2140       * Get the current receive window size.
2141       *
2142       * @return The current receive window size.
2143       */
2144      public int getCurrentRcvWindow()
2145      {
2146        if (broker != null)
2147          return broker.getCurrentRcvWindow();
2148        else
2149          return 0;
2150      }
2151    
2152      /**
2153       * Get the maximum send window size.
2154       *
2155       * @return The maximum send window size.
2156       */
2157      public int getMaxSendWindow()
2158      {
2159        if (broker != null)
2160          return broker.getMaxSendWindow();
2161        else
2162          return 0;
2163      }
2164    
2165      /**
2166       * Get the current send window size.
2167       *
2168       * @return The current send window size.
2169       */
2170      public int getCurrentSendWindow()
2171      {
2172        if (broker != null)
2173          return broker.getCurrentSendWindow();
2174        else
2175          return 0;
2176      }
2177    
2178      /**
2179       * Get the number of times the replication connection was lost.
2180       * @return The number of times the replication connection was lost.
2181       */
2182      public int getNumLostConnections()
2183      {
2184        if (broker != null)
2185          return broker.getNumLostConnections();
2186        else
2187          return 0;
2188      }
2189    
2190      /**
2191       * Get the number of modify conflicts successfully resolved.
2192       * @return The number of modify conflicts successfully resolved.
2193       */
2194      public int getNumResolvedModifyConflicts()
2195      {
2196        return numResolvedModifyConflicts.get();
2197      }
2198    
2199      /**
2200       * Get the number of namign conflicts successfully resolved.
2201       * @return The number of naming conflicts successfully resolved.
2202       */
2203      public int getNumResolvedNamingConflicts()
2204      {
2205        return numResolvedNamingConflicts.get();
2206      }
2207    
2208      /**
2209       * Get the number of unresolved conflicts.
2210       * @return The number of unresolved conflicts.
2211       */
2212      public int getNumUnresolvedNamingConflicts()
2213      {
2214        return numUnresolvedNamingConflicts.get();
2215      }
2216    
2217      /**
2218       * Get the server ID.
2219       * @return The server ID.
2220       */
2221      public int getServerId()
2222      {
2223        return serverId;
2224      }
2225    
2226      /**
2227       * Check if the domain solve conflicts.
2228       *
2229       * @return a boolean indicating if the domain should sove conflicts.
2230       */
2231      public boolean solveConflict()
2232      {
2233        return solveConflictFlag;
2234      }
2235    
2236      /**
2237       * Disable the replication on this domain.
2238       * The session to the replication server will be stopped.
2239       * The domain will not be destroyed but call to the pre-operation
2240       * methods will result in failure.
2241       * The listener thread will be destroyed.
2242       * The monitor informations will still be accessible.
2243       */
2244      public void disable()
2245      {
2246        state.save();
2247        state.clearInMemory();
2248        disabled = true;
2249    
2250        // Stop the listener thread
2251        listenerThread.shutdown();
2252    
2253        broker.stop(); // This will cut the session and wake up the listener
2254    
2255        // Wait for the listener thread to stop
2256        listenerThread.waitForShutdown();
2257      }
2258    
2259      /**
2260       * Do what necessary when the data have changed : load state, load
2261       * generation Id.
2262       * @exception DirectoryException Thrown when an error occurs.
2263      */
2264      protected void loadDataState()
2265      throws DirectoryException
2266      {
2267        state.clearInMemory();
2268        state.loadState();
2269        generator.adjust(state.getMaxChangeNumber(serverId));
2270        // Retrieves the generation ID associated with the data imported
2271        generationId = loadGenerationId();
2272      }
2273    
2274      /**
2275       * Enable back the domain after a previous disable.
2276       * The domain will connect back to a replication Server and
2277       * will recreate threads to listen for messages from the Sycnhronization
2278       * server.
2279       * The generationId will be retrieved or computed if necessary.
2280       * The ServerState will also be read again from the local database.
2281       */
2282      public void enable()
2283      {
2284        try
2285        {
2286          loadDataState();
2287        }
2288        catch (Exception e)
2289        {
2290          /* TODO should mark that replicationServer service is
2291           * not available, log an error and retry upon timeout
2292           * should we stop the modifications ?
2293           */
2294          logError(ERR_LOADING_GENERATION_ID.get(
2295              baseDN.toNormalizedString(), e.getLocalizedMessage()));
2296          return;
2297        }
2298    
2299        // After an on-line import, the value of the generationId is new
2300        // and it is necessary for the broker to send this new value as part
2301        // of the serverStart message.
2302        broker.setGenerationId(generationId);
2303    
2304        broker.start(replicationServers);
2305    
2306        // Create the listener thread
2307        listenerThread = new ListenerThread(this, updateToReplayQueue);
2308        listenerThread.start();
2309    
2310        disabled = false;
2311      }
2312    
2313      /**
2314       * Compute the data generationId associated with the current data present
2315       * in the backend for this domain.
2316       * @return The computed generationId.
2317       * @throws DirectoryException When an error occurs.
2318       */
2319      public long computeGenerationId() throws DirectoryException
2320      {
2321        Backend backend = retrievesBackend(baseDN);
2322        long bec = backend.numSubordinates(baseDN, true) + 1;
2323        this.acquireIEContext();
2324        ieContext.checksumOutput = true;
2325        ieContext.entryCount = (bec<1000?bec:1000);
2326        ieContext.entryLeftCount = ieContext.entryCount;
2327        exportBackend();
2328        long genId = ieContext.checksumOutputValue;
2329    
2330        if (debugEnabled())
2331          TRACER.debugInfo("Computed generationId: #entries=" + bec +
2332                   " generationId=" + ieContext.checksumOutputValue);
2333        ieContext.checksumOutput = false;
2334        this.releaseIEContext();
2335        return genId;
2336      }
2337    
2338      /**
2339       * Returns the generationId set for this domain.
2340       *
2341       * @return The generationId.
2342       */
2343      public long getGenerationId()
2344      {
2345        return generationId;
2346      }
2347    
2348      /**
2349       * The attribute name used to store the state in the backend.
2350       */
2351      protected static final String REPLICATION_GENERATION_ID =
2352        "ds-sync-generation-id";
2353    
2354      /**
2355       * Stores the value of the generationId.
2356       * @param generationId The value of the generationId.
2357       * @return a ResultCode indicating if the method was successfull.
2358       */
2359      public ResultCode saveGenerationId(long generationId)
2360      {
2361        // The generationId is stored in the root entry of the domain.
2362        ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
2363    
2364        ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>();
2365        ASN1OctetString value = new ASN1OctetString(Long.toString(generationId));
2366        values.add(value);
2367    
2368        LDAPAttribute attr =
2369          new LDAPAttribute(REPLICATION_GENERATION_ID, values);
2370        LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
2371        ArrayList<RawModification> mods = new ArrayList<RawModification>(1);
2372        mods.add(mod);
2373    
2374        ModifyOperationBasis op =
2375          new ModifyOperationBasis(conn, InternalClientConnection.nextOperationID(),
2376              InternalClientConnection.nextMessageID(),
2377              new ArrayList<Control>(0), asn1BaseDn,
2378              mods);
2379        op.setInternalOperation(true);
2380        op.setSynchronizationOperation(true);
2381        op.setDontSynchronize(true);
2382    
2383        op.run();
2384    
2385        ResultCode result = op.getResultCode();
2386        if (result != ResultCode.SUCCESS)
2387        {
2388          generationIdSavedStatus = false;
2389          if (result != ResultCode.NO_SUCH_OBJECT)
2390          {
2391            // The case where the backend is empty (NO_SUCH_OBJECT)
2392            // is not an error case.
2393            Message message = ERR_UPDATING_GENERATION_ID.get(
2394                op.getResultCode().getResultCodeName() + " " +
2395                op.getErrorMessage(),
2396                baseDN.toString());
2397            logError(message);
2398          }
2399        }
2400        else
2401        {
2402          generationIdSavedStatus = true;
2403        }
2404        return result;
2405      }
2406    
2407    
2408      /**
2409       * Load the GenerationId from the root entry of the domain
2410       * from the REPLICATION_GENERATION_ID attribute in database
2411       * to memory, or compute it if not found.
2412       *
2413       * @return generationId The retrieved value of generationId
2414       * @throws DirectoryException When an error occurs.
2415       */
2416      public long loadGenerationId()
2417      throws DirectoryException
2418      {
2419        long generationId=-1;
2420    
2421        if (debugEnabled())
2422          TRACER.debugInfo(
2423              "Attempt to read generation ID from DB " + baseDN.toString());
2424    
2425        ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
2426        boolean found = false;
2427        LDAPFilter filter;
2428        try
2429        {
2430          filter = LDAPFilter.decode("objectclass=*");
2431        }
2432        catch (LDAPException e)
2433        {
2434          // can not happen
2435          return -1;
2436        }
2437    
2438        /*
2439         * Search the database entry that is used to periodically
2440         * save the ServerState
2441         */
2442        InternalSearchOperation search = null;
2443        LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
2444        attributes.add(REPLICATION_GENERATION_ID);
2445        search = conn.processSearch(asn1BaseDn,
2446            SearchScope.BASE_OBJECT,
2447            DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
2448            filter,attributes);
2449        if (((search.getResultCode() != ResultCode.SUCCESS)) &&
2450            ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
2451        {
2452          Message message = ERR_SEARCHING_GENERATION_ID.get(
2453              search.getResultCode().getResultCodeName() + " " +
2454              search.getErrorMessage(),
2455              baseDN.toString());
2456          logError(message);
2457        }
2458    
2459        SearchResultEntry resultEntry = null;
2460        if (search.getResultCode() == ResultCode.SUCCESS)
2461        {
2462          LinkedList<SearchResultEntry> result = search.getSearchEntries();
2463          resultEntry = result.getFirst();
2464          if (resultEntry != null)
2465          {
2466            AttributeType synchronizationGenIDType =
2467              DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
2468            List<Attribute> attrs =
2469              resultEntry.getAttribute(synchronizationGenIDType);
2470            if (attrs != null)
2471            {
2472              Attribute attr = attrs.get(0);
2473              LinkedHashSet<AttributeValue> values = attr.getValues();
2474              if (values.size()>1)
2475              {
2476                Message message = ERR_LOADING_GENERATION_ID.get(
2477                    baseDN.toString(), "#Values=" + values.size() +
2478                    " Must be exactly 1 in entry " +
2479                    resultEntry.toLDIFString());
2480                logError(message);
2481              }
2482              else if (values.size() == 1)
2483              {
2484                found=true;
2485                try
2486                {
2487                  generationId = Long.decode(values.iterator().next().
2488                      getStringValue());
2489                }
2490                catch(Exception e)
2491                {
2492                  Message message = ERR_LOADING_GENERATION_ID.get(
2493                    baseDN.toString(), e.getLocalizedMessage());
2494                  logError(message);
2495                }
2496              }
2497            }
2498          }
2499        }
2500    
2501        if (!found)
2502        {
2503          generationId = computeGenerationId();
2504          saveGenerationId(generationId);
2505    
2506          if (debugEnabled())
2507            TRACER.debugInfo("Generation ID created for domain base DN=" +
2508                baseDN.toString() +
2509                " generationId=" + generationId);
2510        }
2511        else
2512        {
2513          generationIdSavedStatus = true;
2514          if (debugEnabled())
2515            TRACER.debugInfo(
2516                "Generation ID successfully read from domain base DN=" + baseDN +
2517                " generationId=" + generationId);
2518        }
2519        return generationId;
2520      }
2521    
2522      /**
2523       * Reset the generationId of this domain in the whole topology.
2524       * A message is sent to the Replication Servers for them to reset
2525       * their change dbs.
2526       *
2527       * @param generationIdNewValue The new value of the generation Id.
2528       */
2529      public void resetGenerationId(Long generationIdNewValue)
2530      {
2531        if (debugEnabled())
2532          TRACER.debugInfo(
2533              this.getName() + "resetGenerationId" + generationIdNewValue);
2534    
2535        ResetGenerationId genIdMessage = null;
2536        if (generationIdNewValue == null)
2537        {
2538          genIdMessage = new ResetGenerationId(this.generationId);
2539        }
2540        else
2541        {
2542          genIdMessage = new ResetGenerationId(generationIdNewValue);
2543        }
2544        broker.publish(genIdMessage);
2545      }
2546    
2547      /**
2548       * Do whatever is needed when a backup is started.
2549       * We need to make sure that the serverState is correclty save.
2550       */
2551      public void backupStart()
2552      {
2553        state.save();
2554      }
2555    
2556      /**
2557       * Do whatever is needed when a backup is finished.
2558       */
2559      public void backupEnd()
2560      {
2561        // Nothing is needed at the moment
2562      }
2563    
2564      /*
2565       * Total Update >>
2566       */
2567    
2568      /**
2569       * Receives bytes related to an entry in the context of an import to
2570       * initialize the domain (called by ReplLDIFInputStream).
2571       *
2572       * @return The bytes. Null when the Done or Err message has been received
2573       */
2574      public byte[] receiveEntryBytes()
2575      {
2576        ReplicationMessage msg;
2577        while (true)
2578        {
2579          try
2580          {
2581            msg = broker.receive();
2582    
2583            if (debugEnabled())
2584              TRACER.debugVerbose(
2585                  " sid:" + this.serverId +
2586                  " base DN:" + this.baseDN +
2587                  " Import EntryBytes received " + msg);
2588            if (msg == null)
2589            {
2590              // The server is in the shutdown process
2591              return null;
2592            }
2593    
2594            if (msg instanceof EntryMessage)
2595            {
2596              EntryMessage entryMsg = (EntryMessage)msg;
2597              byte[] entryBytes = entryMsg.getEntryBytes();
2598              ieContext.updateCounters();
2599              return entryBytes;
2600            }
2601            else if (msg instanceof DoneMessage)
2602            {
2603              // This is the normal termination of the import
2604              // No error is stored and the import is ended
2605              // by returning null
2606              return null;
2607            }
2608            else if (msg instanceof ErrorMessage)
2609            {
2610              // This is an error termination during the import
2611              // The error is stored and the import is ended
2612              // by returning null
2613              ErrorMessage errorMsg = (ErrorMessage)msg;
2614              ieContext.exception = new DirectoryException(
2615                                          ResultCode.OTHER,
2616                                          errorMsg.getDetails());
2617              return null;
2618            }
2619            else
2620            {
2621              // Other messages received during an import are trashed
2622            }
2623          }
2624          catch(Exception e)
2625          {
2626            // TODO: i18n
2627            ieContext.exception = new DirectoryException(ResultCode.OTHER,
2628                Message.raw("received an unexpected message type" +
2629                    e.getLocalizedMessage()));
2630          }
2631        }
2632      }
2633    
2634      /**
2635       * Processes an error message received while an import/export is
2636       * on going.
2637       * @param errorMsg The error message received.
2638       */
2639      protected void abandonImportExport(ErrorMessage errorMsg)
2640      {
2641        // FIXME TBD Treat the case where the error happens while entries
2642        // are being exported
2643    
2644        if (debugEnabled())
2645          TRACER.debugVerbose(
2646              " abandonImportExport:" + this.serverId +
2647              " base DN:" + this.baseDN +
2648              " Error Msg received " + errorMsg);
2649    
2650        if (ieContext != null)
2651        {
2652          ieContext.exception = new DirectoryException(ResultCode.OTHER,
2653              errorMsg.getDetails());
2654    
2655          if (ieContext.initializeTask instanceof InitializeTask)
2656          {
2657            // Update the task that initiated the import
2658            ((InitializeTask)ieContext.initializeTask).
2659            updateTaskCompletionState(ieContext.exception);
2660    
2661            releaseIEContext();
2662          }
2663        }
2664      }
2665    
2666      /**
2667       * Clears all the entries from the JE backend determined by the
2668       * be id passed into the method.
2669       *
2670       * @param  createBaseEntry  Indicate whether to automatically create the base
2671       *                          entry and add it to the backend.
2672       * @param beID  The be id to clear.
2673       * @param dn   The suffix of the backend to create if the the createBaseEntry
2674       *             boolean is true.
2675       * @throws Exception  If an unexpected problem occurs.
2676       */
2677      public static void clearJEBackend(boolean createBaseEntry, String beID,
2678          String dn) throws Exception
2679      {
2680        BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID);
2681    
2682        // FIXME Should setBackendEnabled be part of TaskUtils ?
2683        TaskUtils.disableBackend(beID);
2684    
2685        try
2686        {
2687          String lockFile = LockFileManager.getBackendLockFileName(backend);
2688          StringBuilder failureReason = new StringBuilder();
2689    
2690          if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason))
2691          {
2692            throw new RuntimeException(failureReason.toString());
2693          }
2694    
2695          try
2696          {
2697            backend.clearBackend();
2698          }
2699          finally
2700          {
2701            LockFileManager.releaseLock(lockFile, failureReason);
2702          }
2703        }
2704        finally
2705        {
2706          TaskUtils.enableBackend(beID);
2707        }
2708    
2709        if (createBaseEntry)
2710        {
2711          DN baseDN = DN.decode(dn);
2712          Entry e = createEntry(baseDN);
2713          backend = (BackendImpl)DirectoryServer.getBackend(beID);
2714          backend.addEntry(e, null);
2715        }
2716      }
2717    
2718      /**
2719       * Export the entries from the backend.
2720       * The ieContext must have been set before calling.
2721       *
2722       * @throws DirectoryException when an error occurred
2723       */
2724      protected void exportBackend()
2725      throws DirectoryException
2726      {
2727        Backend backend = retrievesBackend(this.baseDN);
2728    
2729        //  Acquire a shared lock for the backend.
2730        try
2731        {
2732          String lockFile = LockFileManager.getBackendLockFileName(backend);
2733          StringBuilder failureReason = new StringBuilder();
2734          if (! LockFileManager.acquireSharedLock(lockFile, failureReason))
2735          {
2736            Message message = ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(
2737                backend.getBackendID(), String.valueOf(failureReason));
2738            logError(message);
2739            throw new DirectoryException(
2740                ResultCode.OTHER, message, null);
2741          }
2742        }
2743        catch (Exception e)
2744        {
2745          Message message =
2746              ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(
2747                      backend.getBackendID(), e.getLocalizedMessage());
2748          logError(message);
2749          throw new DirectoryException(
2750              ResultCode.OTHER, message, null);
2751        }
2752    
2753        OutputStream os;
2754        ReplLDIFOutputStream ros;
2755    
2756        if (ieContext.checksumOutput)
2757        {
2758          ros = new ReplLDIFOutputStream(this, ieContext.entryCount);
2759          os = new CheckedOutputStream(ros, new Adler32());
2760          try
2761          {
2762            os.write((Long.toString(backend.numSubordinates(baseDN, true) + 1)).
2763                getBytes());
2764          }
2765          catch(Exception e)
2766          {
2767            // Should never happen
2768          }
2769        }
2770        else
2771        {
2772          ros = new ReplLDIFOutputStream(this, (short)-1);
2773          os = ros;
2774        }
2775        LDIFExportConfig exportConfig = new LDIFExportConfig(os);
2776    
2777        // baseDN branch is the only one included in the export
2778        List<DN> includeBranches = new ArrayList<DN>(1);
2779        includeBranches.add(this.baseDN);
2780        exportConfig.setIncludeBranches(includeBranches);
2781    
2782        // For the checksum computing mode, only consider the 'stable' attributes
2783        if (ieContext.checksumOutput)
2784        {
2785          String includeAttributeStrings[] =
2786            {"objectclass", "sn", "cn", "entryuuid"};
2787          HashSet<AttributeType> includeAttributes;
2788          includeAttributes = new HashSet<AttributeType>();
2789          for (String attrName : includeAttributeStrings)
2790          {
2791            AttributeType attrType  = DirectoryServer.getAttributeType(attrName);
2792            if (attrType == null)
2793            {
2794              attrType = DirectoryServer.getDefaultAttributeType(attrName);
2795            }
2796            includeAttributes.add(attrType);
2797          }
2798          exportConfig.setIncludeAttributes(includeAttributes);
2799        }
2800    
2801        //  Launch the export.
2802        try
2803        {
2804          backend.exportLDIF(exportConfig);
2805        }
2806        catch (DirectoryException de)
2807        {
2808          if ((ieContext != null) && (ieContext.checksumOutput) &&
2809              (ros.getNumExportedEntries() >= ieContext.entryCount))
2810          {
2811            // This is the normal end when computing the generationId
2812            // We can interrupt the export only by an IOException
2813          }
2814          else
2815          {
2816            Message message =
2817              ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject());
2818            logError(message);
2819            throw new DirectoryException(
2820                ResultCode.OTHER, message, null);
2821          }
2822        }
2823        catch (Exception e)
2824        {
2825          Message message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(
2826              stackTraceToSingleLineString(e));
2827          logError(message);
2828          throw new DirectoryException(
2829              ResultCode.OTHER, message, null);
2830        }
2831        finally
2832        {
2833    
2834          if ((ieContext != null) && (ieContext.checksumOutput))
2835          {
2836            ieContext.checksumOutputValue =
2837             ((CheckedOutputStream)os).getChecksum().getValue();
2838          }
2839          else
2840          {
2841            // Clean up after the export by closing the export config.
2842            // Will also flush the export and export the remaining entries.
2843            // This is a real export where writer has been initialized.
2844            exportConfig.close();
2845          }
2846    
2847          //  Release the shared lock on the backend.
2848          try
2849          {
2850            String lockFile = LockFileManager.getBackendLockFileName(backend);
2851            StringBuilder failureReason = new StringBuilder();
2852            if (! LockFileManager.releaseLock(lockFile, failureReason))
2853            {
2854              Message message = WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(
2855                  backend.getBackendID(), String.valueOf(failureReason));
2856              logError(message);
2857              throw new DirectoryException(
2858                  ResultCode.OTHER, message, null);
2859            }
2860          }
2861          catch (Exception e)
2862          {
2863            Message message = WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(
2864                backend.getBackendID(), stackTraceToSingleLineString(e));
2865            logError(message);
2866            throw new DirectoryException(
2867                ResultCode.OTHER, message, null);
2868          }
2869        }
2870      }
2871    
2872      /**
2873       * Retrieves the backend related to the domain.
2874       *
2875       * @return The backend of that domain.
2876       * @param baseDN The baseDN to retrieve the backend
2877       */
2878      protected static Backend retrievesBackend(DN baseDN)
2879      {
2880        // Retrieves the backend related to this domain
2881        return DirectoryServer.getBackend(baseDN);
2882      }
2883    
2884      /**
2885       * Get the internal broker to perform some operations on it.
2886       *
2887       * @return The broker for this domain.
2888       */
2889      ReplicationBroker getBroker()
2890      {
2891        return broker;
2892      }
2893    
2894      /**
2895       * Exports an entry in LDIF format.
2896       *
2897       * @param  lDIFEntry The entry to be exported..
2898       *
2899       * @throws IOException when an error occurred.
2900       */
2901      public void exportLDIFEntry(String lDIFEntry) throws IOException
2902      {
2903        // If an error was raised - like receiving an ErrorMessage
2904        // we just let down the export.
2905        if (ieContext.exception != null)
2906        {
2907          IOException ioe = new IOException(ieContext.exception.getMessage());
2908          ieContext = null;
2909          throw ioe;
2910        }
2911    
2912        if (ieContext.checksumOutput == false)
2913        {
2914          EntryMessage entryMessage = new EntryMessage(
2915            serverId, ieContext.exportTarget, lDIFEntry.getBytes());
2916          broker.publish(entryMessage);
2917        }
2918        try
2919        {
2920          ieContext.updateCounters();
2921        }
2922        catch (DirectoryException de)
2923        {
2924          throw new IOException(de.getMessage());
2925        }
2926      }
2927    
2928      /**
2929       * Initializes this domain from another source server.
2930       *
2931       * @param source The source from which to initialize
2932       * @param initTask The task that launched the initialization
2933       *                 and should be updated of its progress.
2934       * @throws DirectoryException when an error occurs
2935       */
2936      public void initializeFromRemote(short source, Task initTask)
2937      throws DirectoryException
2938      {
2939        if (debugEnabled())
2940          TRACER.debugInfo("Entering initializeFromRemote");
2941    
2942        acquireIEContext();
2943        ieContext.initializeTask = initTask;
2944    
2945        InitializeRequestMessage initializeMsg = new InitializeRequestMessage(
2946            baseDN, serverId, source);
2947    
2948        // Publish Init request msg
2949        broker.publish(initializeMsg);
2950    
2951        // .. we expect to receive entries or err after that
2952      }
2953    
2954      /**
2955       * Verifies that the given string represents a valid source
2956       * from which this server can be initialized.
2957       * @param sourceString The string representing the source
2958       * @return The source as a short value
2959       * @throws DirectoryException if the string is not valid
2960       */
2961      public short decodeSource(String sourceString)
2962      throws DirectoryException
2963      {
2964        short  source = 0;
2965        Throwable cause = null;
2966        try
2967        {
2968          source = Integer.decode(sourceString).shortValue();
2969          if ((source >= -1) && (source != serverId))
2970          {
2971            // TODO Verifies serverID is in the domain
2972            // We shold check here that this is a server implied
2973            // in the current domain.
2974            return source;
2975          }
2976        }
2977        catch(Exception e)
2978        {
2979          cause = e;
2980        }
2981    
2982        ResultCode resultCode = ResultCode.OTHER;
2983        Message message = ERR_INVALID_IMPORT_SOURCE.get();
2984        if (cause != null)
2985        {
2986          throw new DirectoryException(
2987              resultCode, message, cause);
2988        }
2989        else
2990        {
2991          throw new DirectoryException(
2992              resultCode, message);
2993        }
2994      }
2995    
2996      /**
2997       * Verifies that the given string represents a valid source
2998       * from which this server can be initialized.
2999       * @param targetString The string representing the source
3000       * @return The source as a short value
3001       * @throws DirectoryException if the string is not valid
3002       */
3003      public short decodeTarget(String targetString)
3004      throws DirectoryException
3005      {
3006        short  target = 0;
3007        Throwable cause;
3008        if (targetString.equalsIgnoreCase("all"))
3009        {
3010          return RoutableMessage.ALL_SERVERS;
3011        }
3012    
3013        // So should be a serverID
3014        try
3015        {
3016          target = Integer.decode(targetString).shortValue();
3017          if (target >= 0)
3018          {
3019            // FIXME Could we check now that it is a know server in the domain ?
3020          }
3021          return target;
3022        }
3023        catch(Exception e)
3024        {
3025          cause = e;
3026        }
3027        ResultCode resultCode = ResultCode.OTHER;
3028        Message message = ERR_INVALID_EXPORT_TARGET.get();
3029    
3030        if (cause != null)
3031          throw new DirectoryException(
3032              resultCode, message, cause);
3033        else
3034          throw new DirectoryException(
3035              resultCode, message);
3036    
3037      }
3038    
3039      private synchronized void acquireIEContext()
3040      throws DirectoryException
3041      {
3042        if (ieContext != null)
3043        {
3044          // Rejects 2 simultaneous exports
3045          Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
3046          throw new DirectoryException(ResultCode.OTHER,
3047              message);
3048        }
3049    
3050        ieContext = new IEContext();
3051      }
3052    
3053      private synchronized void releaseIEContext()
3054      {
3055        ieContext = null;
3056      }
3057    
3058      /**
3059       * Process the initialization of some other server or servers in the topology
3060       * specified by the target argument.
3061       * @param target The target that should be initialized
3062       * @param initTask The task that triggers this initialization and that should
3063       *                 be updated with its progress.
3064       *
3065       * @exception DirectoryException When an error occurs.
3066       */
3067      public void initializeRemote(short target, Task initTask)
3068      throws DirectoryException
3069      {
3070        initializeRemote(target, serverId, initTask);
3071      }
3072    
3073      /**
3074       * Process the initialization of some other server or servers in the topology
3075       * specified by the target argument when this initialization specifying the
3076       * server that requests the initialization.
3077       *
3078       * @param target The target that should be initialized.
3079       * @param requestorID The server that initiated the export.
3080       * @param initTask The task that triggers this initialization and that should
3081       *  be updated with its progress.
3082       *
3083       * @exception DirectoryException When an error occurs.
3084       */
3085      public void initializeRemote(short target, short requestorID, Task initTask)
3086      throws DirectoryException
3087      {
3088        try
3089        {
3090          Backend backend = retrievesBackend(this.baseDN);
3091    
3092          if (!backend.supportsLDIFExport())
3093          {
3094            Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
3095                                backend.getBackendID().toString());
3096            logError(message);
3097            throw new DirectoryException(ResultCode.OTHER, message);
3098          }
3099    
3100          acquireIEContext();
3101    
3102          // The number of entries to be exported is the number of entries under
3103          // the base DN entry and the base entry itself.
3104          long entryCount = backend.numSubordinates(baseDN, true) + 1;
3105          ieContext.exportTarget = target;
3106          if (initTask != null)
3107          {
3108            ieContext.initializeTask = initTask;
3109          }
3110          ieContext.setCounters(entryCount, entryCount);
3111    
3112          // Send start message to the peer
3113          InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
3114              baseDN, serverId, ieContext.exportTarget, requestorID, entryCount);
3115    
3116          broker.publish(initializeMessage);
3117    
3118          exportBackend();
3119    
3120          // Notify the peer of the success
3121          DoneMessage doneMsg = new DoneMessage(serverId,
3122              initializeMessage.getDestination());
3123          broker.publish(doneMsg);
3124    
3125          releaseIEContext();
3126        }
3127        catch(DirectoryException de)
3128        {
3129          // Notify the peer of the failure
3130          ErrorMessage errorMsg =
3131            new ErrorMessage(target,
3132                             de.getMessageObject());
3133          broker.publish(errorMsg);
3134    
3135          releaseIEContext();
3136    
3137          throw(de);
3138        }
3139      }
3140    
3141      /**
3142       * Process backend before import.
3143       * @param backend The backend.
3144       * @throws Exception
3145       */
3146      private void preBackendImport(Backend backend)
3147      throws Exception
3148      {
3149        // Stop saving state
3150        stateSavingDisabled = true;
3151    
3152        // FIXME setBackendEnabled should be part of TaskUtils ?
3153        TaskUtils.disableBackend(backend.getBackendID());
3154    
3155        // Acquire an exclusive lock for the backend.
3156        String lockFile = LockFileManager.getBackendLockFileName(backend);
3157        StringBuilder failureReason = new StringBuilder();
3158        if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
3159        {
3160          Message message = ERR_INIT_CANNOT_LOCK_BACKEND.get(
3161                              backend.getBackendID(),
3162                              String.valueOf(failureReason));
3163          logError(message);
3164          throw new DirectoryException(ResultCode.OTHER, message);
3165        }
3166      }
3167    
3168      /**
3169       * Initializes the domain's backend with received entries.
3170       * @param initializeMessage The message that initiated the import.
3171       * @exception DirectoryException Thrown when an error occurs.
3172       */
3173      protected void initialize(InitializeTargetMessage initializeMessage)
3174      throws DirectoryException
3175      {
3176        LDIFImportConfig importConfig = null;
3177        DirectoryException de = null;
3178    
3179        Backend backend = retrievesBackend(baseDN);
3180    
3181        try
3182        {
3183          if (!backend.supportsLDIFImport())
3184          {
3185            Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get(
3186                backend.getBackendID().toString());
3187            logError(message);
3188            de = new DirectoryException(ResultCode.OTHER, message);
3189          }
3190          else
3191          {
3192            if (initializeMessage.getRequestorID() == serverId)
3193            {
3194              // The import responds to a request we did so the IEContext
3195              // is already acquired
3196            }
3197            else
3198            {
3199              acquireIEContext();
3200            }
3201    
3202            ieContext.importSource = initializeMessage.getsenderID();
3203            ieContext.entryLeftCount = initializeMessage.getEntryCount();
3204            ieContext.setCounters(initializeMessage.getEntryCount(),
3205                initializeMessage.getEntryCount());
3206    
3207            preBackendImport(backend);
3208    
3209            ieContext.ldifImportInputStream = new ReplLDIFInputStream(this);
3210            importConfig =
3211              new LDIFImportConfig(ieContext.ldifImportInputStream);
3212            List<DN> includeBranches = new ArrayList<DN>();
3213            includeBranches.add(this.baseDN);
3214            importConfig.setIncludeBranches(includeBranches);
3215            importConfig.setAppendToExistingData(false);
3216    
3217            // TODO How to deal with rejected entries during the import
3218            importConfig.writeRejectedEntries(
3219              getFileForPath("logs" + File.separator +
3220                  "replInitRejectedEntries").getAbsolutePath(),
3221              ExistingFileBehavior.OVERWRITE);
3222    
3223            // Process import
3224            backend.importLDIF(importConfig);
3225    
3226            stateSavingDisabled = false;
3227          }
3228        }
3229        catch(Exception e)
3230        {
3231          de = new DirectoryException(ResultCode.OTHER,
3232              Message.raw(e.getLocalizedMessage()));
3233        }
3234        finally
3235        {
3236          if ((ieContext != null)  && (ieContext.exception != null))
3237            de = ieContext.exception;
3238    
3239          // Cleanup
3240          if (importConfig != null)
3241          {
3242            importConfig.close();
3243    
3244            // Re-enable backend
3245            closeBackendImport(backend);
3246    
3247            backend = retrievesBackend(baseDN);
3248          }
3249    
3250          // Update the task that initiated the import
3251          if ((ieContext != null ) && (ieContext.initializeTask != null))
3252          {
3253            ((InitializeTask)ieContext.initializeTask).
3254            updateTaskCompletionState(de);
3255          }
3256          releaseIEContext();
3257        }
3258        // Sends up the root error.
3259        if (de != null)
3260        {
3261          throw de;
3262        }
3263        else
3264        {
3265          loadDataState();
3266    
3267          if (debugEnabled())
3268            TRACER.debugInfo(
3269                "After import, the replication plugin restarts connections" +
3270                " to all RSs to provide new generation ID=" + generationId);
3271          broker.setGenerationId(generationId);
3272    
3273          // Re-exchange generationID and state with RS
3274          broker.reStart();
3275        }
3276      }
3277    
3278      /**
3279       * Make post import operations.
3280       * @param backend The backend implied in the import.
3281       * @exception DirectoryException Thrown when an error occurs.
3282       */
3283      protected void closeBackendImport(Backend backend)
3284      throws DirectoryException
3285      {
3286        String lockFile = LockFileManager.getBackendLockFileName(backend);
3287        StringBuilder failureReason = new StringBuilder();
3288    
3289        // Release lock
3290        if (!LockFileManager.releaseLock(lockFile, failureReason))
3291        {
3292          Message message = WARN_LDIFIMPORT_CANNOT_UNLOCK_BACKEND.get(
3293              backend.getBackendID(), String.valueOf(failureReason));
3294          logError(message);
3295          throw new DirectoryException(ResultCode.OTHER, message);
3296        }
3297    
3298        TaskUtils.enableBackend(backend.getBackendID());
3299      }
3300    
3301      /**
3302       * Retrieves a replication domain based on the baseDN.
3303       *
3304       * @param baseDN The baseDN of the domain to retrieve
3305       * @return The domain retrieved
3306       * @throws DirectoryException When an error occurred or no domain
3307       * match the provided baseDN.
3308       */
3309      public static ReplicationDomain retrievesReplicationDomain(DN baseDN)
3310      throws DirectoryException
3311      {
3312        ReplicationDomain replicationDomain = null;
3313    
3314        // Retrieves the domain
3315        DirectoryServer.getSynchronizationProviders();
3316        for (SynchronizationProvider provider :
3317          DirectoryServer.getSynchronizationProviders())
3318        {
3319          if (!( provider instanceof MultimasterReplication))
3320          {
3321            Message message = ERR_INVALID_PROVIDER.get();
3322            throw new DirectoryException(ResultCode.OTHER,
3323                message);
3324          }
3325    
3326          // From the domainDN retrieves the replication domain
3327          ReplicationDomain sdomain =
3328            MultimasterReplication.findDomain(baseDN, null);
3329          if (sdomain == null)
3330          {
3331            break;
3332          }
3333          if (replicationDomain != null)
3334          {
3335            // Should never happen
3336            Message message = ERR_MULTIPLE_MATCHING_DOMAIN.get();
3337            throw new DirectoryException(ResultCode.OTHER,
3338                message);
3339          }
3340          replicationDomain = sdomain;
3341        }
3342    
3343        if (replicationDomain == null)
3344        {
3345          MessageBuilder mb = new MessageBuilder(ERR_NO_MATCHING_DOMAIN.get());
3346          mb.append(" ");
3347          mb.append(String.valueOf(baseDN));
3348          throw new DirectoryException(ResultCode.OTHER,
3349             mb.toMessage());
3350        }
3351        return replicationDomain;
3352      }
3353    
3354      /**
3355       * Returns the backend associated to this domain.
3356       * @return The associated backend.
3357       */
3358      public Backend getBackend()
3359      {
3360        return retrievesBackend(baseDN);
3361      }
3362    
3363      /**
3364       * Returns a boolean indiciating if an import or export is currently
3365       * processed.
3366       * @return The status
3367       */
3368      public boolean ieRunning()
3369      {
3370        return (ieContext != null);
3371      }
3372      /*
3373       * <<Total Update
3374       */
3375    
3376    
3377      /**
3378       * Push the modifications contain the in given parameter has
3379       * a modification that would happen on a local server.
3380       * The modifications are not applied to the local database,
3381       * historical information is not updated but a ChangeNumber
3382       * is generated and the ServerState associated to this domain is
3383       * updated.
3384       * @param modifications The modification to push
3385       */
3386      public void synchronizeModifications(List<Modification> modifications)
3387      {
3388        ModifyOperation opBasis =
3389          new ModifyOperationBasis(InternalClientConnection.getRootConnection(),
3390                              InternalClientConnection.nextOperationID(),
3391                              InternalClientConnection.nextMessageID(),
3392                              null, DirectoryServer.getSchemaDN(),
3393                              modifications);
3394        LocalBackendModifyOperation op = new LocalBackendModifyOperation(opBasis);
3395    
3396        ChangeNumber cn = generateChangeNumber(op);
3397        OperationContext ctx = new ModifyContext(cn, "schema");
3398        op.setAttachment(SYNCHROCONTEXT, ctx);
3399        op.setResultCode(ResultCode.SUCCESS);
3400        synchronize(op);
3401      }
3402    
3403      /**
3404       * Check if the provided configuration is acceptable for add.
3405       *
3406       * @param configuration The configuration to check.
3407       * @param unacceptableReasons When the configuration is not acceptable, this
3408       *                            table is use to return the reasons why this
3409       *                            configuration is not acceptbale.
3410       *
3411       * @return true if the configuration is acceptable, false other wise.
3412       */
3413      public static boolean isConfigurationAcceptable(
3414          ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
3415      {
3416        // Check that there is not already a domain with the same DN
3417        DN dn = configuration.getBaseDN();
3418        if (MultimasterReplication.findDomain(dn,null) != null)
3419        {
3420          Message message = ERR_SYNC_INVALID_DN.get();
3421          unacceptableReasons.add(message);
3422          return false;
3423        }
3424    
3425        // Check that the base DN is configured as a base-dn of the directory server
3426        if (retrievesBackend(dn) == null)
3427        {
3428          Message message = ERR_UNKNOWN_DN.get(dn.toString());
3429          unacceptableReasons.add(message);
3430          return false;
3431        }
3432        return true;
3433      }
3434    
3435      /**
3436       * {@inheritDoc}
3437       */
3438      public ConfigChangeResult applyConfigurationChange(
3439             ReplicationDomainCfg configuration)
3440      {
3441        // server id and base dn are readonly.
3442        // isolationPolicy can be set immediately and will apply
3443        // to the next updates.
3444        // The other parameters needs to be renegociated with the ReplicationServer.
3445        // so that requires restarting the session with the ReplicationServer.
3446        replicationServers = configuration.getReplicationServer();
3447        window = configuration.getWindowSize();
3448        heartbeatInterval = configuration.getHeartbeatInterval();
3449        broker.changeConfig(replicationServers, maxReceiveQueue, maxReceiveDelay,
3450                            maxSendQueue, maxSendDelay, window, heartbeatInterval);
3451        isolationpolicy = configuration.getIsolationPolicy();
3452    
3453        return new ConfigChangeResult(ResultCode.SUCCESS, false);
3454      }
3455    
3456      /**
3457       * {@inheritDoc}
3458       */
3459      public boolean isConfigurationChangeAcceptable(
3460             ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
3461      {
3462        return true;
3463      }
3464    
3465      /**
3466       * {@inheritDoc}
3467       */
3468      public LinkedHashMap<String, String> getAlerts()
3469      {
3470        LinkedHashMap<String,String> alerts = new LinkedHashMap<String,String>();
3471    
3472        alerts.put(ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT,
3473                   ALERT_DESCRIPTION_REPLICATION_UNRESOLVED_CONFLICT);
3474        return alerts;
3475      }
3476    
3477      /**
3478       * {@inheritDoc}
3479       */
3480      public String getClassName()
3481      {
3482        return CLASS_NAME;
3483    
3484      }
3485    
3486      /**
3487       * {@inheritDoc}
3488       */
3489      public DN getComponentEntryDN()
3490      {
3491        return configDn;
3492      }
3493    
3494      /**
3495       * Check if the domain is connected to a ReplicationServer.
3496       *
3497       * @return true if the server is connected, false if not.
3498       */
3499      public boolean isConnected()
3500      {
3501        if (broker != null)
3502          return broker.isConnected();
3503        else
3504          return false;
3505      }
3506    
3507      /**
3508       * Determine whether the connection to the replication server is encrypted.
3509       * @return true if the connection is encrypted, false otherwise.
3510       */
3511      public boolean isSessionEncrypted()
3512      {
3513        if (broker != null)
3514          return broker.isSessionEncrypted();
3515        else
3516          return false;
3517      }
3518    }