1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.directory.mitosis.service.protocol.handler;
21
22
23 import org.apache.directory.mitosis.common.CSN;
24 import org.apache.directory.mitosis.common.CSNVector;
25 import org.apache.directory.mitosis.common.DefaultCSN;
26 import org.apache.directory.mitosis.common.Replica;
27 import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
28 import org.apache.directory.mitosis.operation.AddEntryOperation;
29 import org.apache.directory.mitosis.operation.Operation;
30 import org.apache.directory.mitosis.service.ReplicationContext;
31 import org.apache.directory.mitosis.service.ReplicationContext.State;
32 import org.apache.directory.mitosis.service.protocol.Constants;
33 import org.apache.directory.mitosis.service.protocol.message.BaseMessage;
34 import org.apache.directory.mitosis.service.protocol.message.BeginLogEntriesAckMessage;
35 import org.apache.directory.mitosis.service.protocol.message.BeginLogEntriesMessage;
36 import org.apache.directory.mitosis.service.protocol.message.EndLogEntriesAckMessage;
37 import org.apache.directory.mitosis.service.protocol.message.EndLogEntriesMessage;
38 import org.apache.directory.mitosis.service.protocol.message.LogEntryAckMessage;
39 import org.apache.directory.mitosis.service.protocol.message.LogEntryMessage;
40 import org.apache.directory.mitosis.service.protocol.message.LoginAckMessage;
41 import org.apache.directory.mitosis.service.protocol.message.LoginMessage;
42 import org.apache.directory.mitosis.store.ReplicationLogIterator;
43 import org.apache.directory.mitosis.store.ReplicationStore;
44 import org.apache.directory.server.constants.ServerDNConstants;
45 import org.apache.directory.server.core.CoreSession;
46 import org.apache.directory.server.core.DefaultCoreSession;
47 import org.apache.directory.server.core.authn.LdapPrincipal;
48 import org.apache.directory.server.core.entry.ServerEntry;
49 import org.apache.directory.server.core.filtering.EntryFilteringCursor;
50 import org.apache.directory.server.core.interceptor.context.SearchOperationContext;
51 import org.apache.directory.server.schema.registries.Registries;
52 import org.apache.directory.shared.ldap.constants.AuthenticationLevel;
53 import org.apache.directory.shared.ldap.constants.SchemaConstants;
54 import org.apache.directory.shared.ldap.entry.EntryAttribute;
55 import org.apache.directory.shared.ldap.entry.Value;
56 import org.apache.directory.shared.ldap.filter.PresenceNode;
57 import org.apache.directory.shared.ldap.message.AliasDerefMode;
58 import org.apache.directory.shared.ldap.name.LdapDN;
59 import org.apache.directory.shared.ldap.schema.OidNormalizer;
60 import org.apache.directory.shared.ldap.util.StringTools;
61 import org.apache.mina.common.IdleStatus;
62 import org.apache.mina.common.WriteFuture;
63 import org.apache.mina.util.SessionLog;
64
65 import javax.naming.directory.SearchControls;
66 import java.net.InetSocketAddress;
67 import java.util.Map;
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125 public class ReplicationClientContextHandler implements ReplicationContextHandler
126 {
127 public void contextBegin( ReplicationContext ctx ) throws Exception
128 {
129
130 LoginMessage m = new LoginMessage( ctx.getNextSequence(), ctx.getService().getConfiguration().getReplicaId() );
131 writeTimeLimitedMessage( ctx, m );
132
133
134 ctx.getSession().setWriteTimeout( ctx.getConfiguration().getResponseTimeout() );
135
136
137 ctx.getSession().setIdleTime( IdleStatus.BOTH_IDLE, ctx.getConfiguration().getReplicationInterval() );
138 }
139
140
141 public void contextEnd( ReplicationContext ctx ) throws Exception
142 {
143 }
144
145
146 public void messageReceived( ReplicationContext ctx, Object message ) throws Exception
147 {
148 ctx.cancelExpiration( ( ( BaseMessage ) message ).getSequence() );
149
150 if ( ctx.getState() == State.READY )
151 {
152 if ( message instanceof LogEntryAckMessage )
153 {
154 onLogEntryAck( ctx, ( LogEntryAckMessage ) message );
155 }
156 else if ( message instanceof BeginLogEntriesAckMessage )
157 {
158 onBeginLogEntriesAck( ctx.getDirectoryService().getRegistries(), ctx, ( BeginLogEntriesAckMessage ) message );
159 }
160 else if ( message instanceof EndLogEntriesAckMessage )
161 {
162
163 }
164 else
165 {
166 onUnexpectedMessage( ctx, message );
167 }
168 }
169 else
170 {
171 if ( message instanceof LoginAckMessage )
172 {
173 onLoginAck( ctx, ( LoginAckMessage ) message );
174 }
175 else
176 {
177 onUnexpectedMessage( ctx, message );
178 }
179 }
180 }
181
182
183 public void messageSent( ReplicationContext ctx, Object message ) throws Exception
184 {
185 }
186
187
188
189
190
191
192
193
194
195 public WriteFuture writeTimeLimitedMessage( ReplicationContext ctx, Object message )
196 {
197 ctx.scheduleExpiration( message );
198 return ctx.getSession().write( message );
199 }
200
201
202 public void exceptionCaught( ReplicationContext ctx, Throwable cause ) throws Exception
203 {
204 if ( SessionLog.isWarnEnabled( ctx.getSession() ) )
205 {
206 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
207 + "] Unexpected exception.", cause );
208 }
209 ctx.getSession().close();
210 }
211
212
213 public void contextIdle( ReplicationContext ctx, IdleStatus status ) throws Exception
214 {
215 beginReplication( ctx );
216 }
217
218
219 private void onLoginAck( ReplicationContext ctx, LoginAckMessage message )
220 {
221 if ( message.getResponseCode() != Constants.OK )
222 {
223 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
224 + "] Login attempt failed: " + message.getResponseCode() );
225 ctx.getSession().close();
226 return;
227 }
228
229 for ( Replica replica : ctx.getConfiguration().getPeerReplicas() )
230 {
231 if ( replica.getId().equals( message.getReplicaId() ) )
232 {
233 if ( replica.getAddress().getAddress().equals(
234 ( ( InetSocketAddress ) ctx.getSession().getRemoteAddress() ).getAddress() ) )
235 {
236 ctx.setPeer( replica );
237 ctx.setState( State.READY );
238 return;
239 }
240 else
241 {
242 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
243 + "] Peer address mismatches: " + ctx.getSession().getRemoteAddress() + " (expected: "
244 + replica.getAddress() );
245 ctx.getSession().close();
246 return;
247 }
248 }
249 }
250
251 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
252 + "] Unknown peer replica ID: " + message.getReplicaId() );
253 ctx.getSession().close();
254 }
255
256
257 public boolean beginReplication( ReplicationContext ctx )
258 {
259
260
261
262
263 if ( ctx.getState() == State.READY && ctx.getScheduledExpirations() <= 0
264 && ctx.getSession().getScheduledWriteRequests() <= 0 )
265 {
266
267 if ( SessionLog.isDebugEnabled( ctx.getSession() ) )
268 {
269 SessionLog.debug( ctx.getSession(), "(" + ctx.getConfiguration().getReplicaId() + "->"
270 + ( ctx.getPeer() != null ? ctx.getPeer().getId() : "null" ) + ") Beginning replication. " );
271 }
272 ctx.getSession().write( new BeginLogEntriesMessage( ctx.getNextSequence() ) );
273 return true;
274 }
275 else
276 {
277 if ( SessionLog.isDebugEnabled( ctx.getSession() ) )
278 {
279 SessionLog.debug( ctx.getSession(), "(" + ctx.getConfiguration().getReplicaId() + "->"
280 + ( ctx.getPeer() != null ? ctx.getPeer().getId() : "null" )
281 + ") Couldn't begin replication. State:" + ctx.getState() + ", scheduledExpirations:"
282 + ctx.getScheduledExpirations() + ", scheduledWriteRequests:"
283 + ctx.getSession().getScheduledWriteRequests() );
284 }
285 return false;
286 }
287 }
288
289
290 private void onLogEntryAck( ReplicationContext ctx, LogEntryAckMessage message ) throws Exception
291 {
292 if ( message.getResponseCode() != Constants.OK )
293 {
294 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
295 + "] Remote peer failed to execute a log entry." );
296 ctx.getSession().close();
297 }
298 }
299
300
301 private void onBeginLogEntriesAck( Registries registries, ReplicationContext ctx, BeginLogEntriesAckMessage message )
302 throws Exception
303 {
304
305 if ( message.getResponseCode() != Constants.OK )
306 {
307 return;
308 }
309
310 ReplicationStore store = ctx.getConfiguration().getStore();
311 CSNVector yourUV = message.getUpdateVector();
312 CSNVector myPV;
313 try
314 {
315 myPV = store.getPurgeVector();
316 }
317 catch ( Exception e )
318 {
319 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
320 + "] Failed to get update vector.", e );
321 ctx.getSession().close();
322 return;
323 }
324
325
326 try
327 {
328 if ( myPV.size() > 0 && yourUV.size() == 0 )
329 {
330 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
331 + "] Starting a whole DIT transfer." );
332 sendAllEntries( ctx );
333 }
334 else
335 {
336 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
337 + "] Starting a partial replication log transfer." );
338 sendReplicationLogs( registries, ctx, myPV, yourUV );
339 }
340 }
341 finally
342 {
343
344 ctx.getSession().write( new EndLogEntriesMessage( ctx.getNextSequence() ) );
345 }
346 }
347
348
349 private void sendAllEntries( ReplicationContext ctx ) throws Exception
350 {
351 ServerEntry rootDSE = ctx.getDirectoryService().getPartitionNexus().getRootDSE( null );
352
353 EntryAttribute namingContextsAttr = rootDSE.get( SchemaConstants.NAMING_CONTEXTS_AT );
354
355 if ( namingContextsAttr == null || namingContextsAttr.size() == 0 )
356 {
357 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
358 + "] No namingContexts attributes in rootDSE." );
359 return;
360 }
361
362
363 for ( Value<?> namingContext : namingContextsAttr )
364 {
365
366 LdapDN contextName;
367
368 contextName = new LdapDN( ( String ) namingContext.get() );
369
370 SessionLog.info( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
371 + "] Sending entries under '" + contextName + '\'' );
372
373 Map<String, OidNormalizer> mapping = ctx.getDirectoryService().getRegistries().getAttributeTypeRegistry()
374 .getNormalizerMapping();
375 contextName.normalize( mapping );
376 sendAllEntries( ctx, contextName );
377 }
378 }
379
380
381 private void sendAllEntries( ReplicationContext ctx, LdapDN contextName ) throws Exception
382 {
383
384 SearchControls ctrl = new SearchControls();
385 ctrl.setSearchScope( SearchControls.SUBTREE_SCOPE );
386
387 LdapDN adminDn = new LdapDN( ServerDNConstants.ADMIN_SYSTEM_DN_NORMALIZED );
388 adminDn.normalize( ctx.getDirectoryService().getRegistries()
389 .getAttributeTypeRegistry().getNormalizerMapping() );
390 CoreSession adminSession = new DefaultCoreSession(
391 new LdapPrincipal( adminDn, AuthenticationLevel.STRONG ), ctx.getDirectoryService() );
392
393 EntryFilteringCursor cursor = ctx.getDirectoryService().getPartitionNexus().search(
394 new SearchOperationContext( adminSession, contextName,
395 AliasDerefMode.DEREF_ALWAYS, new PresenceNode( SchemaConstants.OBJECT_CLASS_AT_OID ), ctrl ) );
396
397 try
398 {
399 while ( cursor.next() )
400 {
401 ServerEntry entry = cursor.get();
402
403
404 EntryAttribute entryCSNAttr = entry.get( org.apache.directory.mitosis.common.Constants.ENTRY_CSN );
405
406 if ( entryCSNAttr == null )
407 {
408 continue;
409 }
410
411
412 CSN csn;
413
414 try
415 {
416 Object val = entryCSNAttr.get();
417
418 if ( val instanceof byte[] )
419 {
420 csn = new DefaultCSN( StringTools.utf8ToString( ( byte[] ) val ) );
421 }
422 else
423 {
424 csn = new DefaultCSN( ( String ) val );
425 }
426 }
427 catch ( IllegalArgumentException ex )
428 {
429 SessionLog.warn( ctx.getSession(), "An entry with improper entryCSN: " + entry.getDn() );
430 continue;
431 }
432
433
434 LdapDN dn = entry.getDn();
435 dn.normalize( ctx.getDirectoryService().getRegistries().getAttributeTypeRegistry()
436 .getNormalizerMapping() );
437 Operation op = new AddEntryOperation( ctx.getDirectoryService().getRegistries(), csn, entry );
438
439
440 writeTimeLimitedMessage( ctx, new LogEntryMessage( ctx.getNextSequence(), op ) );
441 }
442 }
443 finally
444 {
445 cursor.close();
446 }
447 }
448
449
450 @SuppressWarnings("unchecked")
451 private void sendReplicationLogs( Registries registries, ReplicationContext ctx, CSNVector myPV, CSNVector yourUV )
452 {
453 for ( String replicaId : myPV.getReplicaIds() )
454 {
455 CSN myCSN = myPV.getCSN( replicaId );
456 CSN yourCSN = yourUV.getCSN( replicaId );
457
458 if ( yourCSN != null && ( myCSN == null || yourCSN.compareTo( myCSN ) < 0 ) )
459 {
460 SessionLog.warn( ctx.getSession(), "Remote update vector (" + yourUV
461 + ") is out-of-date. Full replication is required." );
462 ctx.getSession().close();
463 return;
464 }
465 }
466
467 ReplicationLogIterator logIt = ctx.getConfiguration().getStore().getLogs( yourUV, false );
468 try
469 {
470 while ( logIt.next() )
471 {
472 Operation op = logIt.getOperation( registries );
473 writeTimeLimitedMessage( ctx, new LogEntryMessage( ctx.getNextSequence(), op ) );
474 }
475 }
476 finally
477 {
478 logIt.close();
479 }
480 }
481
482
483 private void onUnexpectedMessage( ReplicationContext ctx, Object message )
484 {
485 SessionLog.warn( ctx.getSession(), "Unexpected message: " + message );
486 ctx.getSession().close();
487 }
488 }