1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.directory.mitosis.service;
21
22
23 import org.apache.directory.mitosis.common.CSN;
24 import org.apache.directory.mitosis.common.Constants;
25 import org.apache.directory.mitosis.common.DefaultCSN;
26 import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
27 import org.apache.directory.mitosis.operation.Operation;
28 import org.apache.directory.mitosis.operation.OperationFactory;
29 import org.apache.directory.mitosis.service.protocol.codec.ReplicationServerProtocolCodecFactory;
30 import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientContextHandler;
31 import org.apache.directory.mitosis.service.protocol.handler.ReplicationServerContextHandler;
32 import org.apache.directory.mitosis.service.protocol.handler.ReplicationServerProtocolHandler;
33 import org.apache.directory.mitosis.store.ReplicationStore;
34 import org.apache.directory.server.constants.ServerDNConstants;
35 import org.apache.directory.server.core.CoreSession;
36 import org.apache.directory.server.core.DefaultCoreSession;
37 import org.apache.directory.server.core.DirectoryService;
38 import org.apache.directory.server.core.authn.LdapPrincipal;
39 import org.apache.directory.server.core.entry.ClonedServerEntry;
40 import org.apache.directory.server.core.entry.ServerEntry;
41 import org.apache.directory.server.core.filtering.EntryFilteringCursor;
42 import org.apache.directory.server.core.interceptor.BaseInterceptor;
43 import org.apache.directory.server.core.interceptor.Interceptor;
44 import org.apache.directory.server.core.interceptor.NextInterceptor;
45 import org.apache.directory.server.core.interceptor.context.AddOperationContext;
46 import org.apache.directory.server.core.interceptor.context.DeleteOperationContext;
47 import org.apache.directory.server.core.interceptor.context.EntryOperationContext;
48 import org.apache.directory.server.core.interceptor.context.GetMatchedNameOperationContext;
49 import org.apache.directory.server.core.interceptor.context.ListOperationContext;
50 import org.apache.directory.server.core.interceptor.context.LookupOperationContext;
51 import org.apache.directory.server.core.interceptor.context.ModifyOperationContext;
52 import org.apache.directory.server.core.interceptor.context.MoveAndRenameOperationContext;
53 import org.apache.directory.server.core.interceptor.context.MoveOperationContext;
54 import org.apache.directory.server.core.interceptor.context.OperationContext;
55 import org.apache.directory.server.core.interceptor.context.RenameOperationContext;
56 import org.apache.directory.server.core.interceptor.context.SearchOperationContext;
57 import org.apache.directory.server.core.partition.PartitionNexus;
58 import org.apache.directory.server.schema.registries.Registries;
59 import org.apache.directory.shared.ldap.constants.AuthenticationLevel;
60 import org.apache.directory.shared.ldap.constants.SchemaConstants;
61 import org.apache.directory.shared.ldap.entry.EntryAttribute;
62 import org.apache.directory.shared.ldap.entry.Value;
63 import org.apache.directory.shared.ldap.exception.LdapNameNotFoundException;
64 import org.apache.directory.shared.ldap.filter.ExprNode;
65 import org.apache.directory.shared.ldap.filter.FilterParser;
66 import org.apache.directory.shared.ldap.filter.PresenceNode;
67 import org.apache.directory.shared.ldap.message.AliasDerefMode;
68 import org.apache.directory.shared.ldap.name.LdapDN;
69 import org.apache.mina.common.IoAcceptor;
70 import org.apache.mina.filter.LoggingFilter;
71 import org.apache.mina.filter.codec.ProtocolCodecFilter;
72 import org.apache.mina.transport.socket.nio.SocketAcceptor;
73 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 import javax.naming.NameNotFoundException;
78 import javax.naming.NamingException;
79 import javax.naming.directory.SearchControls;
80
81 import java.net.InetSocketAddress;
82 import java.text.ParseException;
83 import java.util.ArrayList;
84 import java.util.List;
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 public class ReplicationInterceptor extends BaseInterceptor
147 {
148 private static final Logger LOG = LoggerFactory.getLogger( ReplicationInterceptor.class );
149
150
151 public static final String DEFAULT_SERVICE_NAME = "replicationService";
152
153
154 private static final String ENTRY_CSN_OID = "1.3.6.1.4.1.18060.0.4.1.2.30";
155 private static final String ENTRY_DELETED_OID = "1.3.6.1.4.1.18060.0.4.1.2.31";
156
157
158
159
160 private String name = DEFAULT_SERVICE_NAME;
161
162 private DirectoryService directoryService;
163 private ReplicationConfiguration configuration;
164 private PartitionNexus nexus;
165 private OperationFactory operationFactory;
166 private ReplicationStore store;
167 private IoAcceptor registry;
168 private final ClientConnectionManager clientConnectionManager = new ClientConnectionManager( this );
169 private Registries registries;
170
171
172
173
174
175 public ReplicationInterceptor()
176 {
177 }
178
179
180
181
182
183
184 public String getName()
185 {
186 return name;
187 }
188
189
190
191
192
193
194
195 public void setName(String name)
196 {
197 this.name = name;
198 }
199
200
201 public ReplicationConfiguration getConfiguration()
202 {
203 return configuration;
204 }
205
206
207 public void setConfiguration(ReplicationConfiguration configuration)
208 {
209 this.configuration = configuration;
210 }
211
212
213
214
215
216
217
218
219
220 public void init( DirectoryService directoryService ) throws Exception
221 {
222 configuration.validate();
223
224 this.directoryService = directoryService;
225 registries = directoryService.getRegistries();
226 nexus = directoryService.getPartitionNexus();
227 store = configuration.getStore();
228 operationFactory = new OperationFactory( directoryService, configuration );
229
230
231 store.open( directoryService, configuration );
232 boolean serviceStarted = false;
233 try
234 {
235 startNetworking();
236 serviceStarted = true;
237 }
238 catch ( Exception e )
239 {
240 throw new ReplicationServiceException( "Failed to initialize MINA ServiceRegistry.", e );
241 }
242 finally
243 {
244 if ( !serviceStarted )
245 {
246
247 store.close();
248 }
249 }
250
251 purgeAgedData();
252 }
253
254
255 private void startNetworking() throws Exception
256 {
257 registry = new SocketAcceptor();
258 SocketAcceptorConfig config = new SocketAcceptorConfig();
259 config.setReuseAddress( true );
260
261 config.getFilterChain().addLast( "protocol",
262 new ProtocolCodecFilter( new ReplicationServerProtocolCodecFactory() ) );
263
264 config.getFilterChain().addLast( "logger", new LoggingFilter() );
265
266
267 registry.bind( new InetSocketAddress( configuration.getServerPort() ), new ReplicationServerProtocolHandler(
268 this ), config );
269
270 clientConnectionManager.start( configuration );
271 }
272
273
274 public void destroy()
275 {
276 stopNetworking();
277 store.close();
278 }
279
280
281 private void stopNetworking()
282 {
283
284 try
285 {
286 clientConnectionManager.stop();
287 }
288 catch ( Exception e )
289 {
290 LOG.error( "[Replica-{}] Failed to stop the client connection manager.", configuration.getReplicaId() );
291 LOG.error( "Stop failure exception: ", e );
292 }
293 registry.unbindAll();
294 }
295
296
297
298
299
300 public void replicate()
301 {
302 LOG.info( "[Replica-{}] Forcing replication...", configuration.getReplicaId() );
303 this.clientConnectionManager.replicate();
304 }
305
306
307
308
309
310 public void interruptConnectors()
311 {
312 LOG.info( "[Replica-{}] Waking sleeping replicas...", configuration.getReplicaId() );
313 this.clientConnectionManager.interruptConnectors();
314 }
315
316
317
318
319
320
321
322
323
324
325
326
327 public void purgeAgedData() throws Exception
328 {
329 ServerEntry rootDSE = nexus.getRootDSE( null );
330 EntryAttribute namingContextsAttr = rootDSE.get( SchemaConstants.NAMING_CONTEXTS_AT );
331
332 if ( ( namingContextsAttr == null ) || ( namingContextsAttr.size() == 0 ) )
333 {
334 throw new NamingException( "No namingContexts attributes in rootDSE." );
335 }
336
337 CSN purgeCSN = new DefaultCSN( System.currentTimeMillis() - configuration.getLogMaxAge() * 1000L * 60L * 60L
338 * 24L,
339 "ZZZZZZZZZZZZZZZZ", Integer.MAX_VALUE );
340 ExprNode filter;
341
342 try
343 {
344 filter = FilterParser.parse( "(&(" + ENTRY_CSN_OID + "<=" + purgeCSN.toOctetString() + ")(" + ENTRY_DELETED_OID
345 + "=TRUE))" );
346 }
347 catch ( ParseException e )
348 {
349 throw ( NamingException ) new NamingException().initCause( e );
350 }
351
352
353 for ( Value<?> namingContext:namingContextsAttr )
354 {
355
356 LdapDN contextName;
357
358 contextName = new LdapDN( (String)namingContext.get() );
359
360 contextName.normalize( registries.getAttributeTypeRegistry().getNormalizerMapping() );
361 LOG.info( "[Replica-{}] Purging aged data under '{}'", configuration.getReplicaId(), contextName );
362 purgeAgedData( contextName, filter );
363 }
364
365 store.removeLogs( purgeCSN, false );
366 }
367
368
369 private void purgeAgedData( LdapDN contextName, ExprNode filter ) throws Exception
370 {
371 SearchControls ctrl = new SearchControls();
372 ctrl.setSearchScope( SearchControls.SUBTREE_SCOPE );
373 ctrl.setReturningAttributes( new String[] { "entryCSN", "entryDeleted" } );
374
375 LdapDN adminDn = new LdapDN( ServerDNConstants.ADMIN_SYSTEM_DN_NORMALIZED );
376 adminDn.normalize( registries.getAttributeTypeRegistry().getNormalizerMapping() );
377 CoreSession adminSession =
378 new DefaultCoreSession( new LdapPrincipal( adminDn, AuthenticationLevel.STRONG ), directoryService );
379
380 EntryFilteringCursor cursor = nexus.search(
381 new SearchOperationContext( adminSession, contextName, AliasDerefMode.DEREF_ALWAYS, filter, ctrl ) );
382
383 List<LdapDN> names = new ArrayList<LdapDN>();
384
385 try
386 {
387 while ( cursor.next() )
388 {
389 ServerEntry entry = cursor.get();
390 LdapDN name = entry.getDn();
391
392 if ( name.size() > contextName.size() )
393 {
394 names.add( name );
395 }
396 }
397 }
398 finally
399 {
400 cursor.close();
401 }
402
403 for ( LdapDN name : names )
404 {
405 try
406 {
407 name.normalize( registries.getAttributeTypeRegistry().getNormalizerMapping() );
408 ServerEntry entry = nexus.lookup( new LookupOperationContext( adminSession, name ) );
409 LOG.info( "[Replica-{}] Purge: " + name + " (" + entry + ')', configuration.getReplicaId() );
410 nexus.delete( new DeleteOperationContext( adminSession, name ) );
411 }
412 catch ( NamingException ex )
413 {
414 LOG.error( "[Replica-{}] Failed to fetch/delete: " + name, configuration.getReplicaId(), ex );
415 }
416 }
417 }
418
419
420 public void add( NextInterceptor nextInterceptor, AddOperationContext addContext ) throws Exception
421 {
422 Operation op = operationFactory.newAdd(
423 addContext.getDn(), addContext.getEntry() );
424 op.execute( nexus, store, addContext.getSession() );
425 }
426
427
428 @Override
429 public void delete( NextInterceptor next, DeleteOperationContext deleteContext ) throws Exception
430 {
431 Operation op = operationFactory.newDelete( deleteContext.getDn() );
432 op.execute( nexus, store, deleteContext.getSession() );
433 }
434
435
436 public void modify( NextInterceptor next, ModifyOperationContext modifyContext ) throws Exception
437 {
438 Operation op = operationFactory.newModify( modifyContext );
439 op.execute( nexus, store, modifyContext.getSession() );
440 }
441
442
443 @Override
444 public void move( NextInterceptor next, MoveOperationContext moveOpContext ) throws Exception
445 {
446 Operation op = operationFactory.newMove( moveOpContext.getDn(), moveOpContext.getParent() );
447 op.execute( nexus, store, moveOpContext.getSession() );
448 }
449
450
451 @Override
452 public void moveAndRename( NextInterceptor next, MoveAndRenameOperationContext moveAndRenameOpContext ) throws Exception
453 {
454 Operation op = operationFactory.newMove( moveAndRenameOpContext.getDn(),
455 moveAndRenameOpContext.getParent(), moveAndRenameOpContext.getNewRdn(),
456 moveAndRenameOpContext.getDelOldDn() );
457 op.execute( nexus, store, moveAndRenameOpContext.getSession() );
458 }
459
460
461 @Override
462 public void rename( NextInterceptor next, RenameOperationContext renameOpContext ) throws Exception
463 {
464 Operation op = operationFactory.newModifyRn( renameOpContext.getDn(), renameOpContext.getNewRdn(), renameOpContext.getDelOldDn() );
465 op.execute( nexus, store, renameOpContext.getSession() );
466 }
467
468
469 public boolean hasEntry( NextInterceptor nextInterceptor, EntryOperationContext entryContext ) throws Exception
470 {
471
472 boolean hasEntry = nextInterceptor.hasEntry( entryContext );
473
474
475 if ( hasEntry )
476 {
477
478 try
479 {
480 ServerEntry entry = nextInterceptor.lookup( new LookupOperationContext( entryContext.getSession(),
481 entryContext.getDn() ) );
482 hasEntry = !isDeleted( entry );
483 }
484 catch ( NameNotFoundException e )
485 {
486 hasEntry = false;
487 }
488 }
489
490 return hasEntry;
491 }
492
493
494 public ClonedServerEntry lookup( NextInterceptor nextInterceptor, LookupOperationContext lookupContext ) throws Exception
495 {
496 if ( lookupContext.getAttrsId() != null )
497 {
498 boolean found = false;
499
500 String[] attrIds = lookupContext.getAttrsIdArray();
501
502
503 for ( String attrId:attrIds )
504 {
505 if ( Constants.ENTRY_DELETED.equals( attrId ) )
506 {
507 found = true;
508 break;
509 }
510 }
511
512
513 if ( !found )
514 {
515 String[] newAttrIds = new String[attrIds.length + 1];
516 System.arraycopy( attrIds, 0, newAttrIds, 0, attrIds.length );
517 newAttrIds[attrIds.length] = Constants.ENTRY_DELETED;
518 lookupContext.setAttrsId( newAttrIds );
519 }
520 }
521
522 ClonedServerEntry entry = nextInterceptor.lookup( lookupContext );
523 ensureNotDeleted( lookupContext, entry );
524 return entry;
525 }
526
527
528 @Override
529 public EntryFilteringCursor list( NextInterceptor nextInterceptor, ListOperationContext opContext ) throws Exception
530 {
531 EntryFilteringCursor cursor = nextInterceptor.search(
532 new SearchOperationContext(
533 opContext.getSession(), opContext.getDn(), opContext.getAliasDerefMode(),
534 new PresenceNode( SchemaConstants.OBJECT_CLASS_AT_OID ),
535 new SearchControls() ) );
536
537 cursor.addEntryFilter( Constants.DELETED_ENTRIES_FILTER );
538 return cursor;
539 }
540
541
542 @Override
543 public EntryFilteringCursor search( NextInterceptor nextInterceptor, SearchOperationContext opContext )
544 throws Exception
545 {
546 SearchControls searchControls = opContext.getSearchControls();
547
548 if ( searchControls.getReturningAttributes() != null )
549 {
550 String[] oldAttrIds = searchControls.getReturningAttributes();
551 String[] newAttrIds = new String[oldAttrIds.length + 1];
552 System.arraycopy( oldAttrIds, 0, newAttrIds, 0, oldAttrIds.length );
553 newAttrIds[oldAttrIds.length] = Constants.ENTRY_DELETED.toLowerCase();
554 searchControls.setReturningAttributes( newAttrIds );
555 }
556
557 EntryFilteringCursor cursor = nextInterceptor.search( new SearchOperationContext( opContext.getSession(),
558 opContext.getDn(), opContext.getAliasDerefMode(), opContext.getFilter(), searchControls ) );
559 cursor.addEntryFilter( Constants.DELETED_ENTRIES_FILTER );
560 return cursor;
561 }
562
563
564 private void ensureNotDeleted( OperationContext opContext, ServerEntry entry ) throws Exception
565 {
566 if ( isDeleted( entry ) )
567 {
568 LdapNameNotFoundException e = new LdapNameNotFoundException( "Deleted entry: "
569 + opContext.getDn().getUpName() );
570 e.setResolvedName( nexus.getMatchedName(
571 new GetMatchedNameOperationContext( opContext.getSession(), opContext.getDn() ) ) );
572 throw e;
573 }
574 }
575
576
577 private boolean isDeleted( ServerEntry entry ) throws NamingException
578 {
579 if ( entry == null )
580 {
581 return true;
582 }
583
584 return entry.contains( Constants.ENTRY_DELETED, "TRUE" );
585 }
586
587
588 public DirectoryService getDirectoryService()
589 {
590 return directoryService;
591 }
592 }