001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.plugin;
028    
029    import java.util.ArrayList;
030    import static org.opends.server.replication.plugin.
031    ReplicationRepairRequestControl.*;
032    
033    import java.util.HashMap;
034    import java.util.List;
035    import java.util.Map;
036    import java.util.concurrent.LinkedBlockingQueue;
037    
038    import org.opends.messages.Message;
039    import org.opends.server.admin.server.ConfigurationAddListener;
040    import org.opends.server.admin.server.ConfigurationChangeListener;
041    import org.opends.server.admin.server.ConfigurationDeleteListener;
042    import org.opends.server.admin.std.server.ReplicationDomainCfg;
043    import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
044    import org.opends.server.api.Backend;
045    import org.opends.server.api.BackupTaskListener;
046    import org.opends.server.api.ExportTaskListener;
047    import org.opends.server.api.ImportTaskListener;
048    import org.opends.server.api.RestoreTaskListener;
049    import org.opends.server.api.SynchronizationProvider;
050    import org.opends.server.config.ConfigException;
051    import org.opends.server.core.DirectoryServer;
052    import org.opends.server.types.BackupConfig;
053    import org.opends.server.types.ConfigChangeResult;
054    import org.opends.server.types.Control;
055    import org.opends.server.types.DN;
056    import org.opends.server.types.DirectoryException;
057    import org.opends.server.types.Entry;
058    import org.opends.server.types.LDIFExportConfig;
059    import org.opends.server.types.LDIFImportConfig;
060    import org.opends.server.types.Modification;
061    import org.opends.server.types.Operation;
062    import org.opends.server.types.RestoreConfig;
063    import org.opends.server.types.ResultCode;
064    import org.opends.server.types.SynchronizationProviderResult;
065    import org.opends.server.types.operation.PluginOperation;
066    import org.opends.server.types.operation.PostOperationAddOperation;
067    import org.opends.server.types.operation.PostOperationDeleteOperation;
068    import org.opends.server.types.operation.PostOperationModifyDNOperation;
069    import org.opends.server.types.operation.PostOperationModifyOperation;
070    import org.opends.server.types.operation.PostOperationOperation;
071    import org.opends.server.types.operation.PreOperationAddOperation;
072    import org.opends.server.types.operation.PreOperationDeleteOperation;
073    import org.opends.server.types.operation.PreOperationModifyDNOperation;
074    import org.opends.server.types.operation.PreOperationModifyOperation;
075    
076    /**
077     * This class is used to load the Replication code inside the JVM
078     * and to trigger initialization of the replication.
079     *
080     * It also extends the SynchronizationProvider class in order to have some
081     * replication code running during the operation process
082     * as pre-op, conflictRsolution, and post-op.
083     */
084    public class MultimasterReplication
085           extends SynchronizationProvider<ReplicationSynchronizationProviderCfg>
086           implements ConfigurationAddListener<ReplicationDomainCfg>,
087                      ConfigurationDeleteListener<ReplicationDomainCfg>,
088                      ConfigurationChangeListener
089                      <ReplicationSynchronizationProviderCfg>,
090                      BackupTaskListener, RestoreTaskListener, ImportTaskListener,
091                      ExportTaskListener
092    {
093      private ReplicationServerListener replicationServerListener = null;
094      private static Map<DN, ReplicationDomain> domains =
095        new HashMap<DN, ReplicationDomain>() ;
096    
097      /**
098       * The queue of received update messages, to be treated by the ReplayThread
099       * threads.
100       */
101      private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
102        new LinkedBlockingQueue<UpdateToReplay>();
103    
104      /**
105       * The list of ReplayThread threads.
106       */
107      private static List<ReplayThread> replayThreads =
108        new ArrayList<ReplayThread>();
109    
110      /**
111       * The configurable number of replay threads.
112       */
113      private static int replayThreadNumber = 10;
114    
115      private boolean isRegistered = false;
116    
117      /**
118       * Finds the domain for a given DN.
119       *
120       * @param dn         The DN for which the domain must be returned.
121       * @param pluginOp   An optional operation for which the check is done.
122       *                   Can be null is the request has no associated operation.
123       * @return           The domain for this DN.
124       */
125      public static ReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
126      {
127        /*
128         * Don't run the special replication code on Operation that are
129         * specifically marked as don't synchronize.
130         */
131        if ((pluginOp != null) && (pluginOp instanceof Operation))
132        {
133            Operation op = ((Operation) pluginOp);
134    
135            if (op.dontSynchronize())
136              return null;
137    
138            /*
139             * Check if the provided operation is a repair operation and set
140             * the synchronization flags if necessary.
141             * The repair operations are tagged as synchronization operations
142             * so that the core server let the operation modify the entryuuid
143             * and ds-sync-hist attributes.
144             * They are also tagged as dontSynchronize so that the replication
145             * code running later do not generate ChnageNumber, solve conflicts
146             * and forward the operation to the replication server.
147             */
148            for (Control c : op.getRequestControls())
149            {
150              if (c.getOID().equals(OID_REPLICATION_REPAIR_CONTROL))
151              {
152                op.setSynchronizationOperation(true);
153                op.setDontSynchronize(true);
154                // remove this control from the list of controls since
155                // it has now been processed and the local backend will
156                // fail if it finds a control that it does not know about and
157                // that is marked as critical.
158                List<Control> controls = op.getRequestControls();
159                controls.remove(c);
160                return null;
161              }
162            }
163        }
164    
165    
166        ReplicationDomain domain = null;
167        DN temp = dn;
168        do
169        {
170          domain = domains.get(temp);
171          temp = temp.getParentDNInSuffix();
172          if (temp == null)
173          {
174            break;
175          }
176        } while (domain == null);
177    
178        return domain;
179      }
180    
181      /**
182       * Creates a new domain from its configEntry, do the
183       * necessary initialization and starts it so that it is
184       * fully operational when this method returns.
185       * @param configuration The entry whith the configuration of this domain.
186       * @return The domain created.
187       * @throws ConfigException When the configuration is not valid.
188       */
189      public static ReplicationDomain createNewDomain(
190          ReplicationDomainCfg configuration)
191          throws ConfigException
192      {
193        ReplicationDomain domain;
194        domain = new ReplicationDomain(configuration, updateToReplayQueue);
195    
196        if (domains.size() == 0)
197        {
198          /*
199           * Create the threads that will process incoming update messages
200           */
201          createReplayThreads();
202        }
203    
204        domains.put(domain.getBaseDN(), domain);
205        return domain;
206      }
207    
208      /**
209       * Deletes a domain.
210       * @param dn : the base DN of the domain to delete.
211       */
212      public static void deleteDomain(DN dn)
213      {
214        ReplicationDomain domain = domains.remove(dn);
215    
216        if (domain != null)
217          domain.shutdown();
218    
219        // No replay threads running if no replication need
220        if (domains.size() == 0) {
221          stopReplayThreads();
222        }
223      }
224    
225      /**
226       * {@inheritDoc}
227       */
228      @Override
229      public void initializeSynchronizationProvider(
230          ReplicationSynchronizationProviderCfg configuration)
231      throws ConfigException
232      {
233        domains.clear();
234        replicationServerListener = new ReplicationServerListener(configuration);
235    
236        // Register as an add and delete listener with the root configuration so we
237        // can be notified if Multimaster domain entries are added or removed.
238        configuration.addReplicationDomainAddListener(this);
239        configuration.addReplicationDomainDeleteListener(this);
240    
241        // Register as a root configuration listener so that we can be notified if
242        // number of replay threads is changed and apply changes.
243        configuration.addReplicationChangeListener(this);
244    
245        replayThreadNumber = configuration.getNumUpdateReplayThreads();
246    
247        //  Create the list of domains that are already defined.
248        for (String name : configuration.listReplicationDomains())
249        {
250          ReplicationDomainCfg domain = configuration.getReplicationDomain(name);
251          createNewDomain(domain);
252        }
253    
254        /*
255         * If any schema changes were made with the server offline, then handle them
256         * now.
257         */
258        List<Modification> offlineSchemaChanges =
259             DirectoryServer.getOfflineSchemaChanges();
260        if ((offlineSchemaChanges != null) && (! offlineSchemaChanges.isEmpty()))
261        {
262          processSchemaChange(offlineSchemaChanges);
263        }
264    
265        DirectoryServer.registerBackupTaskListener(this);
266        DirectoryServer.registerRestoreTaskListener(this);
267        DirectoryServer.registerExportTaskListener(this);
268        DirectoryServer.registerImportTaskListener(this);
269    
270        DirectoryServer.registerSupportedControl(
271            ReplicationRepairRequestControl.OID_REPLICATION_REPAIR_CONTROL);
272      }
273    
274      /**
275       * Create the threads that will wait for incoming update messages.
276       */
277      private synchronized static void createReplayThreads()
278      {
279        replayThreads.clear();
280    
281        for (int i = 0; i < replayThreadNumber; i++)
282        {
283          ReplayThread replayThread = new ReplayThread(updateToReplayQueue);
284          replayThread.start();
285          replayThreads.add(replayThread);
286        }
287      }
288    
289      /**
290       * Stope the threads that are waiting for incoming update messages.
291       */
292      private synchronized static void stopReplayThreads()
293      {
294        //  stop the replay threads
295        for (ReplayThread replayThread : replayThreads)
296        {
297          replayThread.shutdown();
298        }
299    
300        for (ReplayThread replayThread : replayThreads)
301        {
302          replayThread.waitForShutdown();
303        }
304        replayThreads.clear();
305      }
306    
307      /**
308       * {@inheritDoc}
309       */
310      public boolean isConfigurationAddAcceptable(
311          ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
312      {
313        return ReplicationDomain.isConfigurationAcceptable(
314          configuration, unacceptableReasons);
315      }
316    
317      /**
318       * {@inheritDoc}
319       */
320      public ConfigChangeResult applyConfigurationAdd(
321         ReplicationDomainCfg configuration)
322      {
323        try
324        {
325          ReplicationDomain rd = createNewDomain(configuration);
326          if (isRegistered)
327          {
328            rd.start();
329          }
330          return new ConfigChangeResult(ResultCode.SUCCESS, false);
331        } catch (ConfigException e)
332        {
333          // we should never get to this point because the configEntry has
334          // already been validated in configAddisAcceptable
335          return new ConfigChangeResult(ResultCode.CONSTRAINT_VIOLATION, false);
336        }
337      }
338    
339      /**
340       * {@inheritDoc}
341       */
342      @Override
343      public void doPostOperation(PostOperationAddOperation addOperation)
344      {
345        DN dn = addOperation.getEntryDN();
346        genericPostOperation(addOperation, dn);
347      }
348    
349    
350      /**
351       * {@inheritDoc}
352       */
353      @Override
354      public void doPostOperation(PostOperationDeleteOperation deleteOperation)
355      {
356        DN dn = deleteOperation.getEntryDN();
357        genericPostOperation(deleteOperation, dn);
358      }
359    
360      /**
361       * {@inheritDoc}
362       */
363      @Override
364      public void doPostOperation(PostOperationModifyDNOperation modifyDNOperation)
365      {
366        DN dn = modifyDNOperation.getEntryDN();
367        genericPostOperation(modifyDNOperation, dn);
368      }
369    
370      /**
371       * {@inheritDoc}
372       */
373      @Override
374      public void doPostOperation(PostOperationModifyOperation modifyOperation)
375      {
376        DN dn = modifyOperation.getEntryDN();
377        genericPostOperation(modifyOperation, dn);
378      }
379    
380      /**
381       * {@inheritDoc}
382       */
383      @Override
384      public SynchronizationProviderResult handleConflictResolution(
385          PreOperationModifyOperation modifyOperation)
386      {
387        ReplicationDomain domain =
388          findDomain(modifyOperation.getEntryDN(), modifyOperation);
389        if (domain == null)
390          return new SynchronizationProviderResult.ContinueProcessing();
391    
392        return domain.handleConflictResolution(modifyOperation);
393      }
394    
395      /**
396       * {@inheritDoc}
397       */
398      @Override
399      public SynchronizationProviderResult handleConflictResolution(
400          PreOperationAddOperation addOperation) throws DirectoryException
401      {
402        ReplicationDomain domain =
403          findDomain(addOperation.getEntryDN(), addOperation);
404        if (domain == null)
405          return new SynchronizationProviderResult.ContinueProcessing();
406    
407        return domain.handleConflictResolution(addOperation);
408      }
409    
410      /**
411       * {@inheritDoc}
412       */
413      @Override
414      public SynchronizationProviderResult handleConflictResolution(
415          PreOperationDeleteOperation deleteOperation) throws DirectoryException
416      {
417        ReplicationDomain domain =
418          findDomain(deleteOperation.getEntryDN(), deleteOperation);
419        if (domain == null)
420          return new SynchronizationProviderResult.ContinueProcessing();
421    
422        return domain.handleConflictResolution(deleteOperation);
423      }
424    
425      /**
426       * {@inheritDoc}
427       */
428      @Override
429      public SynchronizationProviderResult handleConflictResolution(
430          PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
431      {
432        ReplicationDomain domain =
433          findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
434        if (domain == null)
435          return new SynchronizationProviderResult.ContinueProcessing();
436    
437        return domain.handleConflictResolution(modifyDNOperation);
438      }
439    
440      /**
441       * {@inheritDoc}
442       */
443      @Override
444      public SynchronizationProviderResult
445             doPreOperation(PreOperationModifyOperation modifyOperation)
446      {
447        DN operationDN = modifyOperation.getEntryDN();
448        ReplicationDomain domain = findDomain(operationDN, modifyOperation);
449    
450        if ((domain == null) || (!domain.solveConflict()))
451          return new SynchronizationProviderResult.ContinueProcessing();
452    
453        Historical historicalInformation = (Historical)
454                                modifyOperation.getAttachment(
455                                        Historical.HISTORICAL);
456        if (historicalInformation == null)
457        {
458          Entry entry = modifyOperation.getModifiedEntry();
459          historicalInformation = Historical.load(entry);
460          modifyOperation.setAttachment(Historical.HISTORICAL,
461                  historicalInformation);
462        }
463    
464        historicalInformation.generateState(modifyOperation);
465    
466        return new SynchronizationProviderResult.ContinueProcessing();
467      }
468    
469      /**
470       * {@inheritDoc}
471       */
472      @Override
473      public SynchronizationProviderResult doPreOperation(
474             PreOperationDeleteOperation deleteOperation) throws DirectoryException
475      {
476        return new SynchronizationProviderResult.ContinueProcessing();
477      }
478    
479      /**
480       * {@inheritDoc}
481       */
482      @Override
483      public SynchronizationProviderResult doPreOperation(
484             PreOperationModifyDNOperation modifyDNOperation)
485             throws DirectoryException
486      {
487        return new SynchronizationProviderResult.ContinueProcessing();
488      }
489    
490      /**
491       * {@inheritDoc}
492       */
493      @Override
494      public SynchronizationProviderResult doPreOperation(
495             PreOperationAddOperation addOperation)
496      {
497        ReplicationDomain domain =
498          findDomain(addOperation.getEntryDN(), addOperation);
499        if (domain == null)
500          return new SynchronizationProviderResult.ContinueProcessing();
501    
502        if (!addOperation.isSynchronizationOperation())
503          domain.doPreOperation(addOperation);
504    
505        return new SynchronizationProviderResult.ContinueProcessing();
506      }
507    
508    
509      /**
510       * {@inheritDoc}
511       */
512      @Override
513      public void finalizeSynchronizationProvider()
514      {
515        isRegistered = false;
516    
517        // shutdown all the domains
518        for (ReplicationDomain domain : domains.values())
519        {
520          domain.shutdown();
521        }
522        domains.clear();
523    
524        // Stop replay threads
525        stopReplayThreads();
526    
527        // shutdown the ReplicationServer Service if necessary
528        if (replicationServerListener != null)
529          replicationServerListener.shutdown();
530    
531        DirectoryServer.deregisterBackupTaskListener(this);
532        DirectoryServer.deregisterRestoreTaskListener(this);
533        DirectoryServer.deregisterExportTaskListener(this);
534        DirectoryServer.deregisterImportTaskListener(this);
535      }
536    
537      /**
538       * This method is called whenever the server detects a modification
539       * of the schema done by directly modifying the backing files
540       * of the schema backend.
541       * Call the schema Domain if it exists.
542       *
543       * @param  modifications  The list of modifications that was
544       *                                      applied to the schema.
545       *
546       */
547      @Override
548      public void processSchemaChange(List<Modification> modifications)
549      {
550        ReplicationDomain domain =
551          findDomain(DirectoryServer.getSchemaDN(), null);
552        if (domain != null)
553          domain.synchronizeModifications(modifications);
554      }
555    
556      /**
557       * {@inheritDoc}
558       */
559      public void processBackupBegin(Backend backend, BackupConfig config)
560      {
561        for (DN dn : backend.getBaseDNs())
562        {
563          ReplicationDomain domain = findDomain(dn, null);
564          if (domain != null)
565            domain.backupStart();
566        }
567      }
568    
569      /**
570       * {@inheritDoc}
571       */
572      public void processBackupEnd(Backend backend, BackupConfig config,
573                                   boolean successful)
574      {
575        for (DN dn : backend.getBaseDNs())
576        {
577          ReplicationDomain domain = findDomain(dn, null);
578          if (domain != null)
579            domain.backupEnd();
580        }
581      }
582    
583      /**
584       * {@inheritDoc}
585       */
586      public void processRestoreBegin(Backend backend, RestoreConfig config)
587      {
588        for (DN dn : backend.getBaseDNs())
589        {
590          ReplicationDomain domain = findDomain(dn, null);
591          if (domain != null)
592            domain.disable();
593        }
594      }
595    
596      /**
597       * {@inheritDoc}
598       */
599      public void processRestoreEnd(Backend backend, RestoreConfig config,
600                                    boolean successful)
601      {
602        for (DN dn : backend.getBaseDNs())
603        {
604          ReplicationDomain domain = findDomain(dn, null);
605          if (domain != null)
606            domain.enable();
607        }
608      }
609    
610      /**
611       * {@inheritDoc}
612       */
613      public void processImportBegin(Backend backend, LDIFImportConfig config)
614      {
615        for (DN dn : backend.getBaseDNs())
616        {
617          ReplicationDomain domain = findDomain(dn, null);
618          if (domain != null)
619            domain.disable();
620        }
621      }
622    
623      /**
624       * {@inheritDoc}
625       */
626      public void processImportEnd(Backend backend, LDIFImportConfig config,
627                                   boolean successful)
628      {
629        for (DN dn : backend.getBaseDNs())
630        {
631          ReplicationDomain domain = findDomain(dn, null);
632          if (domain != null)
633            domain.enable();
634        }
635      }
636    
637      /**
638       * {@inheritDoc}
639       */
640      public void processExportBegin(Backend backend, LDIFExportConfig config)
641      {
642        for (DN dn : backend.getBaseDNs())
643        {
644          ReplicationDomain domain = findDomain(dn, null);
645          if (domain != null)
646            domain.backupStart();
647        }
648      }
649    
650      /**
651       * {@inheritDoc}
652       */
653      public void processExportEnd(Backend backend, LDIFExportConfig config,
654                                   boolean successful)
655      {
656        for (DN dn : backend.getBaseDNs())
657        {
658          ReplicationDomain domain = findDomain(dn, null);
659          if (domain != null)
660            domain.backupEnd();
661        }
662      }
663    
664      /**
665       * {@inheritDoc}
666       */
667      public ConfigChangeResult applyConfigurationDelete(
668          ReplicationDomainCfg configuration)
669      {
670        deleteDomain(configuration.getBaseDN());
671    
672        return new ConfigChangeResult(ResultCode.SUCCESS, false);
673      }
674    
675      /**
676       * {@inheritDoc}
677       */
678      public boolean isConfigurationDeleteAcceptable(
679          ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
680      {
681        return true;
682      }
683    
684      /**
685       * Generic code for all the postOperation entry point.
686       *
687       * @param operation The Operation for which the post-operation is called.
688       * @param dn The Dn for which the post-operation is called.
689       */
690      private void genericPostOperation(PostOperationOperation operation, DN dn)
691      {
692        ReplicationDomain domain = findDomain(dn, operation);
693        if (domain == null)
694          return;
695    
696        domain.synchronize(operation);
697    
698        return;
699      }
700    
701      /**
702       * Returns the replication server listener associated to that Multimaster
703       * Replication.
704       * @return the listener.
705       */
706      public ReplicationServerListener getReplicationServerListener()
707      {
708        return replicationServerListener;
709      }
710    
711      /**
712       * {@inheritDoc}
713       */
714      public boolean
715        isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
716        configuration,
717        List<Message> unacceptableReasons)
718      {
719        return true;
720      }
721    
722      /**
723       * {@inheritDoc}
724       */
725      public ConfigChangeResult
726        applyConfigurationChange
727        (ReplicationSynchronizationProviderCfg configuration)
728      {
729        int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
730    
731        // Stop threads then restart new number of threads
732        stopReplayThreads();
733        replayThreadNumber = numUpdateRepayThread;
734        if (domains.size() > 0)
735        {
736          createReplayThreads();
737        }
738    
739        return new ConfigChangeResult(ResultCode.SUCCESS, false);
740      }
741    
742      /**
743       * {@inheritDoc}
744       */
745      public void completeSynchronizationProvider()
746      {
747        isRegistered = true;
748    
749        // start all the domains
750        for (ReplicationDomain domain : domains.values())
751        {
752          domain.start();
753        }
754      }
755    }