View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.directory.mitosis.service;
21  
22  
23  import org.apache.directory.mitosis.common.CSN;
24  import org.apache.directory.mitosis.common.Constants;
25  import org.apache.directory.mitosis.common.DefaultCSN;
26  import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
27  import org.apache.directory.mitosis.operation.Operation;
28  import org.apache.directory.mitosis.operation.OperationFactory;
29  import org.apache.directory.mitosis.service.protocol.codec.ReplicationServerProtocolCodecFactory;
30  import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientContextHandler;
31  import org.apache.directory.mitosis.service.protocol.handler.ReplicationServerContextHandler;
32  import org.apache.directory.mitosis.service.protocol.handler.ReplicationServerProtocolHandler;
33  import org.apache.directory.mitosis.store.ReplicationStore;
34  import org.apache.directory.server.constants.ServerDNConstants;
35  import org.apache.directory.server.core.CoreSession;
36  import org.apache.directory.server.core.DefaultCoreSession;
37  import org.apache.directory.server.core.DirectoryService;
38  import org.apache.directory.server.core.authn.LdapPrincipal;
39  import org.apache.directory.server.core.entry.ClonedServerEntry;
40  import org.apache.directory.server.core.entry.ServerEntry;
41  import org.apache.directory.server.core.filtering.EntryFilteringCursor;
42  import org.apache.directory.server.core.interceptor.BaseInterceptor;
43  import org.apache.directory.server.core.interceptor.Interceptor;
44  import org.apache.directory.server.core.interceptor.NextInterceptor;
45  import org.apache.directory.server.core.interceptor.context.AddOperationContext;
46  import org.apache.directory.server.core.interceptor.context.DeleteOperationContext;
47  import org.apache.directory.server.core.interceptor.context.EntryOperationContext;
48  import org.apache.directory.server.core.interceptor.context.GetMatchedNameOperationContext;
49  import org.apache.directory.server.core.interceptor.context.ListOperationContext;
50  import org.apache.directory.server.core.interceptor.context.LookupOperationContext;
51  import org.apache.directory.server.core.interceptor.context.ModifyOperationContext;
52  import org.apache.directory.server.core.interceptor.context.MoveAndRenameOperationContext;
53  import org.apache.directory.server.core.interceptor.context.MoveOperationContext;
54  import org.apache.directory.server.core.interceptor.context.OperationContext;
55  import org.apache.directory.server.core.interceptor.context.RenameOperationContext;
56  import org.apache.directory.server.core.interceptor.context.SearchOperationContext;
57  import org.apache.directory.server.core.partition.PartitionNexus;
58  import org.apache.directory.server.schema.registries.Registries;
59  import org.apache.directory.shared.ldap.constants.AuthenticationLevel;
60  import org.apache.directory.shared.ldap.constants.SchemaConstants;
61  import org.apache.directory.shared.ldap.entry.EntryAttribute;
62  import org.apache.directory.shared.ldap.entry.Value;
63  import org.apache.directory.shared.ldap.exception.LdapNameNotFoundException;
64  import org.apache.directory.shared.ldap.filter.ExprNode;
65  import org.apache.directory.shared.ldap.filter.FilterParser;
66  import org.apache.directory.shared.ldap.filter.PresenceNode;
67  import org.apache.directory.shared.ldap.message.AliasDerefMode;
68  import org.apache.directory.shared.ldap.name.LdapDN;
69  import org.apache.mina.common.IoAcceptor;
70  import org.apache.mina.filter.LoggingFilter;
71  import org.apache.mina.filter.codec.ProtocolCodecFilter;
72  import org.apache.mina.transport.socket.nio.SocketAcceptor;
73  import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
74  import org.slf4j.Logger;
75  import org.slf4j.LoggerFactory;
76  
77  import javax.naming.NameNotFoundException;
78  import javax.naming.NamingException;
79  import javax.naming.directory.SearchControls;
80  
81  import java.net.InetSocketAddress;
82  import java.text.ParseException;
83  import java.util.ArrayList;
84  import java.util.List;
85  
86  
87  /**
88   * An {@link Interceptor} that intercepts LDAP operations and propagates the
89   * changes occurred by the operations into other {@link ReplicaId}s so the DIT
90   * of each {@link ReplicaId} in the cluster has the same content without any
91   * conflict.
92   * <p>
93   * Once an operation is invoked, this interceptor transforms it into one or
94   * more operations that makes the requested operation more proper and robust
95   * for replication.  The transformation process is actually just calling a
96   * respective factory method in {@link OperationFactory}.  The methods in
97   * {@link OperationFactory} returns a new {@link Operation} instance.
98   * <p>
99   * The newly created {@link Operation} is used for three purposes.
100  * <ul>
101  * <li>To perform the requested operation to the local {@link PartitionNexus}
102  * <li>To store the created {@link Operation} itself to
103  *     {@link ReplicationStore} so that it can be retrieved later by
104  *     {@link ReplicationLogCleanJob} and {@link ReplicationClientContextHandler}
105  * <li>To transfer itself to other {@link ReplicaId}s via TCP/IP communication
106  *     between {@link ReplicationClientContextHandler} and
107  *     {@link ReplicationServerContextHandler}
108  * </ul>
109  * The first two actions (modifying the local DIT and storing the
110  * {@link Operation} to {@link ReplicationStore}) are performed automatically
111  * when
112  * {@link Operation#execute(PartitionNexus, ReplicationStore, Registries)}
113  * method is invoked.  {@link ReplicationInterceptor} always call it instead of
114  * forwarding the requested operation to the next {@link Interceptor}.
115  * <p>
116  * The last action takes place by {@link ReplicationClientContextHandler},
117  * which handles TCP/IP connection managed by {@link ClientConnectionManager}.
118  * <p>
119  * There are two special attributes in the entries to be replicated:
120  * <ul>
121  * <li><tt>entryCSN</tt> - stores {@link CSN} of the entry.  This attribute is
122  *     used to compare the incoming operation from other replica is still
123  *     valid.  If the local <tt>entryCSN</tt> value is bigger then that of the
124  *     incoming operation, it means conflict, and therefore an appropriate
125  *     conflict resolution mechanism should get engaged.</li>
126  * <li><tt>entryDeleted</tt> - is <tt>TRUE</tt> if and only if the entry is
127  *     deleted.  The entry is not deleted immediately by a delete operation
128  *     because <tt>entryCSN</tt> attribute should be retained for certain
129  *     amount of time to determine whether the incoming change LOG, which
130  *     affects an entry with the same DN, is a conflict (modification on a
131  *     deleted entry) or not (creation of a new entry). You can purge old
132  *     deleted entries and related change logs in {@link ReplicationStore} by
133  *     calling {@link #purgeAgedData()}, or they will be purged automatically
134  *     by periodic manner as you configured with {@link ReplicationConfiguration}.
135  *     by calling {@link ReplicationConfiguration#setLogMaxAge(int)}.
136  *     Because of this attribute, <tt>lookup</tt> and <tt>search</tt>
137  *     operations are overrided to ignore entries with <tt>entryDeleted</tt>
138  *     set to <tt>TRUE</tt>.</li>
139  * </ul>
140  *
141  * @org.apache.xbean.XBean
142  *
143  * @author The Apache Directory Project (dev@directory.apache.org)
144  * @version $Rev$, $Date$
145  */
146 public class ReplicationInterceptor extends BaseInterceptor
147 {
148     private static final Logger LOG = LoggerFactory.getLogger( ReplicationInterceptor.class );
149 
150     /** The service name */
151     public static final String DEFAULT_SERVICE_NAME = "replicationService";
152 
153 
154     private static final String ENTRY_CSN_OID = "1.3.6.1.4.1.18060.0.4.1.2.30";
155     private static final String ENTRY_DELETED_OID = "1.3.6.1.4.1.18060.0.4.1.2.31";
156 
157     /**
158      * default name is the service name?
159      */
160     private String name = DEFAULT_SERVICE_NAME;
161 
162     private DirectoryService directoryService;
163     private ReplicationConfiguration configuration;
164     private PartitionNexus nexus;
165     private OperationFactory operationFactory;
166     private ReplicationStore store;
167     private IoAcceptor registry;
168     private final ClientConnectionManager clientConnectionManager = new ClientConnectionManager( this );
169     private Registries registries;
170 
171 
172     /**
173      * Creates a new instance of ReplicationInterceptor.
174      */    
175     public ReplicationInterceptor()
176     {
177     }
178 
179     /**
180      * This interceptor has configuration so it might be useful to allow several instances in a chain.
181      * 
182      * @return configured name for this interceptor.
183      */
184     public String getName() 
185     {
186         return name;
187     }
188 
189         
190     /**
191      * Set the name for this service instance
192      *
193      * @param name The new name
194      */
195     public void setName(String name) 
196     {
197         this.name = name;
198     }
199     
200 
201     public ReplicationConfiguration getConfiguration()
202     {
203         return configuration;
204     }
205 
206 
207     public void setConfiguration(ReplicationConfiguration configuration) 
208     {
209         this.configuration = configuration;
210     }
211 
212     
213     /**
214      * Initialize the Replication service. We have to check that the configuration
215      * is valid, initialize a store for pending operations, and start the communication
216      * with the other LDAP servers.
217      * 
218      * @param directoryService the DirectoryService instance 
219      */
220     public void init( DirectoryService directoryService ) throws Exception
221     {
222         configuration.validate();
223         // and then preserve frequently used ones
224         this.directoryService = directoryService;
225         registries = directoryService.getRegistries();
226         nexus = directoryService.getPartitionNexus();
227         store = configuration.getStore();
228         operationFactory = new OperationFactory( directoryService, configuration );
229 
230         // Initialize store and service
231         store.open( directoryService, configuration );
232         boolean serviceStarted = false;
233         try
234         {
235             startNetworking();
236             serviceStarted = true;
237         }
238         catch ( Exception e )
239         {
240             throw new ReplicationServiceException( "Failed to initialize MINA ServiceRegistry.", e );
241         }
242         finally
243         {
244             if ( !serviceStarted )
245             {
246                 // roll back
247                 store.close();
248             }
249         }
250 
251         purgeAgedData();
252     }
253 
254 
255     private void startNetworking() throws Exception
256     {
257         registry = new SocketAcceptor();
258         SocketAcceptorConfig config = new SocketAcceptorConfig();
259         config.setReuseAddress( true );
260 
261         config.getFilterChain().addLast( "protocol",
262             new ProtocolCodecFilter( new ReplicationServerProtocolCodecFactory() ) );
263 
264         config.getFilterChain().addLast( "logger", new LoggingFilter() );
265 
266         // bind server protocol provider
267         registry.bind( new InetSocketAddress( configuration.getServerPort() ), new ReplicationServerProtocolHandler(
268             this ), config );
269 
270         clientConnectionManager.start( configuration );
271     }
272 
273 
274     public void destroy()
275     {
276         stopNetworking();
277         store.close();
278     }
279 
280 
281     private void stopNetworking()
282     {
283         // close all open connections, deactivate all filters and service registry
284         try
285         {
286             clientConnectionManager.stop();
287         }
288         catch ( Exception e )
289         {
290             LOG.error( "[Replica-{}] Failed to stop the client connection manager.", configuration.getReplicaId() );
291             LOG.error( "Stop failure exception: ", e );
292         }
293         registry.unbindAll();
294     }
295 
296 
297     /**
298      * Forces this context to send replication data to the peer replica immediately.
299      */
300     public void replicate()
301     {
302         LOG.info( "[Replica-{}] Forcing replication...", configuration.getReplicaId() );
303         this.clientConnectionManager.replicate();
304     }
305 
306     
307     /**
308      * Wake the sleeping (unconnected) replicas.
309      */
310     public void interruptConnectors()
311     {
312         LOG.info( "[Replica-{}] Waking sleeping replicas...", configuration.getReplicaId() );
313         this.clientConnectionManager.interruptConnectors();
314     }
315 
316 
317     /**
318      * Purges old replication logs and the old entries marked as 'deleted'
319      * (i.e. {@link Constants#ENTRY_DELETED} is <tt>TRUE</tt>).  This method
320      * should be called periodically to make sure the size of the DIT and
321      * {@link ReplicationStore} increase limitlessly.
322      *
323      * @see ReplicationConfiguration#setLogMaxAge(int)
324      * @see ReplicationLogCleanJob
325      * @throws javax.naming.NamingException on error
326      */
327     public void purgeAgedData() throws Exception
328     {
329         ServerEntry rootDSE = nexus.getRootDSE( null );
330         EntryAttribute namingContextsAttr = rootDSE.get( SchemaConstants.NAMING_CONTEXTS_AT );
331         
332         if ( ( namingContextsAttr == null ) || ( namingContextsAttr.size() == 0 ) )
333         {
334             throw new NamingException( "No namingContexts attributes in rootDSE." );
335         }
336 
337         CSN purgeCSN = new DefaultCSN( System.currentTimeMillis() - configuration.getLogMaxAge() * 1000L * 60L * 60L
338             * 24L, // convert days to millis
339             "ZZZZZZZZZZZZZZZZ", Integer.MAX_VALUE );
340         ExprNode filter;
341 
342         try
343         {
344             filter = FilterParser.parse( "(&(" + ENTRY_CSN_OID + "<=" + purgeCSN.toOctetString() + ")(" + ENTRY_DELETED_OID
345                 + "=TRUE))" );
346         }
347         catch ( ParseException e )
348         {
349             throw ( NamingException ) new NamingException().initCause( e );
350         }
351 
352         // Iterate all context partitions to send all entries of them.
353         for ( Value<?> namingContext:namingContextsAttr )
354         {
355             // Convert attribute value to JNDI name.
356             LdapDN contextName;
357             
358             contextName = new LdapDN( (String)namingContext.get() );
359 
360             contextName.normalize( registries.getAttributeTypeRegistry().getNormalizerMapping() );
361             LOG.info( "[Replica-{}] Purging aged data under '{}'", configuration.getReplicaId(), contextName );
362             purgeAgedData( contextName, filter );
363         }
364 
365         store.removeLogs( purgeCSN, false );
366     }
367 
368 
369     private void purgeAgedData( LdapDN contextName, ExprNode filter ) throws Exception
370     {
371         SearchControls ctrl = new SearchControls();
372         ctrl.setSearchScope( SearchControls.SUBTREE_SCOPE );
373         ctrl.setReturningAttributes( new String[] { "entryCSN", "entryDeleted" } );
374 
375         LdapDN adminDn = new LdapDN( ServerDNConstants.ADMIN_SYSTEM_DN_NORMALIZED );
376         adminDn.normalize( registries.getAttributeTypeRegistry().getNormalizerMapping() );
377         CoreSession adminSession = 
378             new DefaultCoreSession( new LdapPrincipal( adminDn, AuthenticationLevel.STRONG ), directoryService );
379 
380         EntryFilteringCursor cursor = nexus.search(
381             new SearchOperationContext( adminSession, contextName, AliasDerefMode.DEREF_ALWAYS, filter, ctrl ) );
382 
383         List<LdapDN> names = new ArrayList<LdapDN>();
384         
385         try
386         {
387             while ( cursor.next() )
388             {
389             	ServerEntry entry = cursor.get();
390                 LdapDN name = entry.getDn();
391                 
392                 if ( name.size() > contextName.size() )
393                 {
394                     names.add( name );
395                 }
396             }
397         }
398         finally
399         {
400             cursor.close();
401         }
402 
403         for ( LdapDN name : names )
404         {
405             try
406             {
407                 name.normalize( registries.getAttributeTypeRegistry().getNormalizerMapping() );
408                 ServerEntry entry = nexus.lookup( new LookupOperationContext( adminSession, name ) );
409                 LOG.info( "[Replica-{}] Purge: " + name + " (" + entry + ')', configuration.getReplicaId() );
410                 nexus.delete( new DeleteOperationContext( adminSession, name ) );
411             }
412             catch ( NamingException ex )
413             {
414                 LOG.error( "[Replica-{}] Failed to fetch/delete: " + name, configuration.getReplicaId(), ex );
415             }
416         }
417     }
418 
419 
420     public void add( NextInterceptor nextInterceptor, AddOperationContext addContext ) throws Exception
421     {
422         Operation op = operationFactory.newAdd( 
423             addContext.getDn(), addContext.getEntry() );
424         op.execute( nexus, store, addContext.getSession() );
425     }
426 
427 
428     @Override
429     public void delete( NextInterceptor next, DeleteOperationContext deleteContext ) throws Exception
430     {
431         Operation op = operationFactory.newDelete( deleteContext.getDn() );
432         op.execute( nexus, store, deleteContext.getSession() );
433     }
434 
435 
436     public void modify( NextInterceptor next, ModifyOperationContext modifyContext ) throws Exception
437     {
438         Operation op = operationFactory.newModify( modifyContext );
439         op.execute( nexus, store, modifyContext.getSession() );
440     }
441 
442 
443     @Override
444     public void move( NextInterceptor next, MoveOperationContext moveOpContext ) throws Exception
445     {
446         Operation op = operationFactory.newMove( moveOpContext.getDn(), moveOpContext.getParent() );
447         op.execute( nexus, store, moveOpContext.getSession() );
448     }
449 
450 
451     @Override
452     public void moveAndRename( NextInterceptor next, MoveAndRenameOperationContext moveAndRenameOpContext ) throws Exception
453     {
454         Operation op = operationFactory.newMove( moveAndRenameOpContext.getDn(),
455                 moveAndRenameOpContext.getParent(), moveAndRenameOpContext.getNewRdn(),
456                 moveAndRenameOpContext.getDelOldDn() );
457         op.execute( nexus, store, moveAndRenameOpContext.getSession() );
458     }
459 
460 
461     @Override
462     public void rename( NextInterceptor next, RenameOperationContext renameOpContext ) throws Exception
463     {
464         Operation op = operationFactory.newModifyRn( renameOpContext.getDn(), renameOpContext.getNewRdn(), renameOpContext.getDelOldDn() );
465         op.execute( nexus, store, renameOpContext.getSession() );
466     }
467 
468 
469     public boolean hasEntry( NextInterceptor nextInterceptor, EntryOperationContext entryContext ) throws Exception
470     {
471         // Ask others first.
472         boolean hasEntry = nextInterceptor.hasEntry( entryContext );
473 
474         // If the entry exists,
475         if ( hasEntry )
476         {
477             // Check DELETED attribute.
478             try
479             {
480                 ServerEntry entry = nextInterceptor.lookup( new LookupOperationContext( entryContext.getSession(), 
481                     entryContext.getDn() ) );
482                 hasEntry = !isDeleted( entry );
483             }
484             catch ( NameNotFoundException e )
485             {
486                 hasEntry = false;
487             }
488         }
489 
490         return hasEntry;
491     }
492 
493 
494     public ClonedServerEntry lookup( NextInterceptor nextInterceptor, LookupOperationContext lookupContext ) throws Exception
495     {
496         if ( lookupContext.getAttrsId() != null )
497         {
498             boolean found = false;
499 
500             String[] attrIds = lookupContext.getAttrsIdArray();
501 
502             // Look for 'entryDeleted' attribute is in attrIds.
503             for ( String attrId:attrIds )
504             {
505                 if ( Constants.ENTRY_DELETED.equals( attrId ) )
506                 {
507                     found = true;
508                     break;
509                 }
510             }
511 
512             // If not exists, add one.
513             if ( !found )
514             {
515                 String[] newAttrIds = new String[attrIds.length + 1];
516                 System.arraycopy( attrIds, 0, newAttrIds, 0, attrIds.length );
517                 newAttrIds[attrIds.length] = Constants.ENTRY_DELETED;
518                 lookupContext.setAttrsId( newAttrIds );
519             }
520         }
521 
522         ClonedServerEntry entry = nextInterceptor.lookup( lookupContext );
523         ensureNotDeleted( lookupContext, entry );
524         return entry;
525     }
526 
527 
528     @Override
529     public EntryFilteringCursor list( NextInterceptor nextInterceptor, ListOperationContext opContext ) throws Exception
530     {
531     	EntryFilteringCursor cursor = nextInterceptor.search(
532 	            new SearchOperationContext(
533 	                opContext.getSession(), opContext.getDn(), opContext.getAliasDerefMode(),
534 	                new PresenceNode( SchemaConstants.OBJECT_CLASS_AT_OID ),
535 	                new SearchControls() ) );
536 
537     	cursor.addEntryFilter( Constants.DELETED_ENTRIES_FILTER );
538     	return cursor;
539     }
540 
541 
542     @Override
543     public EntryFilteringCursor search( NextInterceptor nextInterceptor, SearchOperationContext opContext ) 
544         throws Exception
545     {
546         SearchControls searchControls = opContext.getSearchControls();
547 
548         if ( searchControls.getReturningAttributes() != null )
549         {
550             String[] oldAttrIds = searchControls.getReturningAttributes();
551             String[] newAttrIds = new String[oldAttrIds.length + 1];
552             System.arraycopy( oldAttrIds, 0, newAttrIds, 0, oldAttrIds.length );
553             newAttrIds[oldAttrIds.length] = Constants.ENTRY_DELETED.toLowerCase();
554             searchControls.setReturningAttributes( newAttrIds );
555         }
556 
557     	EntryFilteringCursor cursor = nextInterceptor.search( new SearchOperationContext( opContext.getSession(), 
558     	    opContext.getDn(), opContext.getAliasDerefMode(), opContext.getFilter(), searchControls ) );
559     	cursor.addEntryFilter( Constants.DELETED_ENTRIES_FILTER );
560     	return cursor;
561     }
562 
563 
564     private void ensureNotDeleted( OperationContext opContext, ServerEntry entry ) throws Exception 
565     {
566         if ( isDeleted( entry ) )
567         {
568             LdapNameNotFoundException e = new LdapNameNotFoundException( "Deleted entry: " 
569                 + opContext.getDn().getUpName() );
570             e.setResolvedName( nexus.getMatchedName( 
571                 new GetMatchedNameOperationContext( opContext.getSession(), opContext.getDn() ) ) );
572             throw e;
573         }
574     }
575 
576 
577     private boolean isDeleted( ServerEntry entry ) throws NamingException
578     {
579         if ( entry == null )
580         {
581             return true;
582         }
583 
584         return entry.contains( Constants.ENTRY_DELETED, "TRUE" );
585     }
586 
587 
588     public DirectoryService getDirectoryService()
589     {
590         return directoryService;
591     }
592 }