View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.directory.mitosis.service.protocol.handler;
21  
22  
23  import org.apache.directory.mitosis.common.CSN;
24  import org.apache.directory.mitosis.common.CSNVector;
25  import org.apache.directory.mitosis.common.DefaultCSN;
26  import org.apache.directory.mitosis.common.Replica;
27  import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
28  import org.apache.directory.mitosis.operation.AddEntryOperation;
29  import org.apache.directory.mitosis.operation.Operation;
30  import org.apache.directory.mitosis.service.ReplicationContext;
31  import org.apache.directory.mitosis.service.ReplicationContext.State;
32  import org.apache.directory.mitosis.service.protocol.Constants;
33  import org.apache.directory.mitosis.service.protocol.message.BaseMessage;
34  import org.apache.directory.mitosis.service.protocol.message.BeginLogEntriesAckMessage;
35  import org.apache.directory.mitosis.service.protocol.message.BeginLogEntriesMessage;
36  import org.apache.directory.mitosis.service.protocol.message.EndLogEntriesAckMessage;
37  import org.apache.directory.mitosis.service.protocol.message.EndLogEntriesMessage;
38  import org.apache.directory.mitosis.service.protocol.message.LogEntryAckMessage;
39  import org.apache.directory.mitosis.service.protocol.message.LogEntryMessage;
40  import org.apache.directory.mitosis.service.protocol.message.LoginAckMessage;
41  import org.apache.directory.mitosis.service.protocol.message.LoginMessage;
42  import org.apache.directory.mitosis.store.ReplicationLogIterator;
43  import org.apache.directory.mitosis.store.ReplicationStore;
44  import org.apache.directory.server.constants.ServerDNConstants;
45  import org.apache.directory.server.core.CoreSession;
46  import org.apache.directory.server.core.DefaultCoreSession;
47  import org.apache.directory.server.core.authn.LdapPrincipal;
48  import org.apache.directory.server.core.entry.ServerEntry;
49  import org.apache.directory.server.core.filtering.EntryFilteringCursor;
50  import org.apache.directory.server.core.interceptor.context.SearchOperationContext;
51  import org.apache.directory.server.schema.registries.Registries;
52  import org.apache.directory.shared.ldap.constants.AuthenticationLevel;
53  import org.apache.directory.shared.ldap.constants.SchemaConstants;
54  import org.apache.directory.shared.ldap.entry.EntryAttribute;
55  import org.apache.directory.shared.ldap.entry.Value;
56  import org.apache.directory.shared.ldap.filter.PresenceNode;
57  import org.apache.directory.shared.ldap.message.AliasDerefMode;
58  import org.apache.directory.shared.ldap.name.LdapDN;
59  import org.apache.directory.shared.ldap.schema.OidNormalizer;
60  import org.apache.directory.shared.ldap.util.StringTools;
61  import org.apache.mina.common.IdleStatus;
62  import org.apache.mina.common.WriteFuture;
63  import org.apache.mina.util.SessionLog;
64  
65  import javax.naming.directory.SearchControls;
66  import java.net.InetSocketAddress;
67  import java.util.Map;
68  
69  
70  /**
71   * {@link ReplicationContextHandler} that implements client-side replication
72   * logic which sends any changes out-of-date to server.  The following is
73   * the detailed protocol flow and the description of the replication logic
74   * execution.
75   * <ul>
76   * <li><tt>ClientConnectionManager</tt> connects the client to the server.</li>
77   * <li>The client sends {@link LoginMessage} to the server.</li>
78   * <li>The server responds with {@link LoginAckMessage} to the client
79   *     <ul>
80   *     <li>Unless the response code is {@link Constants#OK}, disconnect.
81   *         Next connection attempt is performed by
82   *         <tt>ClientConnectionManager</tt> later.</li>
83   *     <li>Otherwise, the state of the {@link ReplicationContext} changes to
84   *         {@link State#READY}, and proceed.</li>
85   *     </ul></li>
86   * <li>The client tries to transfer the data that server needs from
87   *     in {@link ReplicationStore} periodically using
88   *     {@link #contextIdle(ReplicationContext, IdleStatus)} event,
89   *     which is implemented using <tt>sessionIdle</tt> event in MINA. 
90   *     <ul>
91   *     <li>The client sends a {@link BeginLogEntriesMessage} to the server.</li>
92   *     <li>The server responds with {@link BeginLogEntriesAckMessage}.
93   *         <ul>
94   *         <li>If the response code is {@link Constants#OK},
95   *             <ul>
96   *             <li>{@link BeginLogEntriesAckMessage} contains a
97   *                 Update Vector (UV) of the server. The client compares
98   *                 the received UV and the client's Purge Vector (PV).
99   *                 <ul>
100  *                 <li>If the PV is greater than the UV, this means the client
101  *                     can't send all operation logs that server needs to get
102  *                     synchronized.  This usually means that the server has
103  *                     been offline for too long time and got out-of-sync
104  *                     finally due to the log-purging process of the client
105  *                     side (see {@link ReplicationConfiguration#getLogMaxAge()}).
106  *                     The clients sends all entries in the DIT to the server,
107  *                     and the server overwrites its current DIT with the
108  *                     received entries.</li>
109  *                 <li>Otherwise, the client sends only the changed part since
110  *                     the last synchronization by querying its
111  *                     {@link ReplicationStore} by calling
112  *                     {@link ReplicationStore#getLogs(CSNVector, boolean)}.</li>
113  *                 <li>The data transfer is very simple.  It's asynchronous
114  *                     request-response exchange.  The client sends {@link LogEntryMessage},
115  *                     and then the server responds with {@link LogEntryAckMessage}.</li>
116  *             </ul></li>
117  *         <li>If the response code is not {@link Constants#OK}, retry later.</li>
118  *         </ul></li>
119  *     </ul></li>
120  * </ul>
121  *
122  * @author The Apache Directory Project (dev@directory.apache.org)
123  * @version $Rev: 116 $, $Date: 2006-09-18 13:47:53Z $
124  */
125 public class ReplicationClientContextHandler implements ReplicationContextHandler
126 {
127     public void contextBegin( ReplicationContext ctx ) throws Exception
128     {
129         // Send a login message.
130         LoginMessage m = new LoginMessage( ctx.getNextSequence(), ctx.getService().getConfiguration().getReplicaId() );
131         writeTimeLimitedMessage( ctx, m );
132 
133         // Set write timeout
134         ctx.getSession().setWriteTimeout( ctx.getConfiguration().getResponseTimeout() );
135 
136         // Check update vector of the remote peer periodically.
137         ctx.getSession().setIdleTime( IdleStatus.BOTH_IDLE, ctx.getConfiguration().getReplicationInterval() );
138     }
139 
140 
141     public void contextEnd( ReplicationContext ctx ) throws Exception
142     {
143     }
144 
145 
146     public void messageReceived( ReplicationContext ctx, Object message ) throws Exception
147     {
148         ctx.cancelExpiration( ( ( BaseMessage ) message ).getSequence() );
149 
150         if ( ctx.getState() == State.READY )
151         {
152             if ( message instanceof LogEntryAckMessage )
153             {
154                 onLogEntryAck( ctx, ( LogEntryAckMessage ) message );
155             }
156             else if ( message instanceof BeginLogEntriesAckMessage )
157             {
158                 onBeginLogEntriesAck( ctx.getDirectoryService().getRegistries(), ctx, ( BeginLogEntriesAckMessage ) message );
159             }
160             else if ( message instanceof EndLogEntriesAckMessage )
161             {
162                 // Do nothing
163             }
164             else
165             {
166                 onUnexpectedMessage( ctx, message );
167             }
168         }
169         else
170         {
171             if ( message instanceof LoginAckMessage )
172             {
173                 onLoginAck( ctx, ( LoginAckMessage ) message );
174             }
175             else
176             {
177                 onUnexpectedMessage( ctx, message );
178             }
179         }
180     }
181 
182 
183     public void messageSent( ReplicationContext ctx, Object message ) throws Exception
184     {
185     }
186 
187 
188     /**
189      * A helper to write a message and schedule that message for expiration.
190      *
191      * @param ctx the replication context
192      * @param message the message to replicate
193      * @return the write future to block on this replication message transmission
194      */
195     public WriteFuture writeTimeLimitedMessage( ReplicationContext ctx, Object message )
196     {
197         ctx.scheduleExpiration( message );
198         return ctx.getSession().write( message );
199     }
200 
201 
202     public void exceptionCaught( ReplicationContext ctx, Throwable cause ) throws Exception
203     {
204         if ( SessionLog.isWarnEnabled( ctx.getSession() ) )
205         {
206             SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
207                 + "] Unexpected exception.", cause );
208         }
209         ctx.getSession().close();
210     }
211 
212 
213     public void contextIdle( ReplicationContext ctx, IdleStatus status ) throws Exception
214     {
215         beginReplication( ctx );
216     }
217 
218 
219     private void onLoginAck( ReplicationContext ctx, LoginAckMessage message )
220     {
221         if ( message.getResponseCode() != Constants.OK )
222         {
223             SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
224                 + "] Login attempt failed: " + message.getResponseCode() );
225             ctx.getSession().close();
226             return;
227         }
228 
229         for ( Replica replica : ctx.getConfiguration().getPeerReplicas() )
230         {
231             if ( replica.getId().equals( message.getReplicaId() ) )
232             {
233                 if ( replica.getAddress().getAddress().equals(
234                     ( ( InetSocketAddress ) ctx.getSession().getRemoteAddress() ).getAddress() ) )
235                 {
236                     ctx.setPeer( replica );
237                     ctx.setState( State.READY );
238                     return;
239                 }
240                 else
241                 {
242                     SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
243                         + "] Peer address mismatches: " + ctx.getSession().getRemoteAddress() + " (expected: "
244                         + replica.getAddress() );
245                     ctx.getSession().close();
246                     return;
247                 }
248             }
249         }
250 
251         SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
252             + "] Unknown peer replica ID: " + message.getReplicaId() );
253         ctx.getSession().close();
254     }
255 
256 
257     public boolean beginReplication( ReplicationContext ctx )
258     {
259         // If this cilent is logged in, all responses for sent messages
260         // (LogEntryMessages) is received, and no write request is pending,
261         // it means previous replication process ended or this is the
262         // first replication attempt.
263         if ( ctx.getState() == State.READY && ctx.getScheduledExpirations() <= 0
264             && ctx.getSession().getScheduledWriteRequests() <= 0 )
265         {
266             // Initiate replication process asking update vector.
267             if ( SessionLog.isDebugEnabled( ctx.getSession() ) )
268             {
269                 SessionLog.debug( ctx.getSession(), "(" + ctx.getConfiguration().getReplicaId() + "->"
270                     + ( ctx.getPeer() != null ? ctx.getPeer().getId() : "null" ) + ") Beginning replication. " );
271             }
272             ctx.getSession().write( new BeginLogEntriesMessage( ctx.getNextSequence() ) );
273             return true;
274         }
275         else
276         {
277             if ( SessionLog.isDebugEnabled( ctx.getSession() ) )
278             {
279                 SessionLog.debug( ctx.getSession(), "(" + ctx.getConfiguration().getReplicaId() + "->"
280                     + ( ctx.getPeer() != null ? ctx.getPeer().getId() : "null" )
281                     + ") Couldn't begin replication.  State:" + ctx.getState() + ", scheduledExpirations:"
282                     + ctx.getScheduledExpirations() + ", scheduledWriteRequests:"
283                     + ctx.getSession().getScheduledWriteRequests() );
284             }
285             return false;
286         }
287     }
288 
289 
290     private void onLogEntryAck( ReplicationContext ctx, LogEntryAckMessage message ) throws Exception
291     {
292         if ( message.getResponseCode() != Constants.OK )
293         {
294             SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
295                 + "] Remote peer failed to execute a log entry." );
296             ctx.getSession().close();
297         }
298     }
299 
300 
301     private void onBeginLogEntriesAck( Registries registries, ReplicationContext ctx, BeginLogEntriesAckMessage message )
302         throws Exception
303     {
304         // Start transaction only when the server says OK.
305         if ( message.getResponseCode() != Constants.OK )
306         {
307             return;
308         }
309 
310         ReplicationStore store = ctx.getConfiguration().getStore();
311         CSNVector yourUV = message.getUpdateVector();
312         CSNVector myPV;
313         try
314         {
315             myPV = store.getPurgeVector();
316         }
317         catch ( Exception e )
318         {
319             SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
320                 + "] Failed to get update vector.", e );
321             ctx.getSession().close();
322             return;
323         }
324 
325         // Do full-DIT transfer if the peer is new and I'm not new.
326         try
327         {
328             if ( myPV.size() > 0 && yourUV.size() == 0 )
329             {
330                 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
331                     + "] Starting a whole DIT transfer." );
332                 sendAllEntries( ctx );
333             }
334             else
335             {
336                 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
337                     + "] Starting a partial replication log transfer." );
338                 sendReplicationLogs( registries, ctx, myPV, yourUV );
339             }
340         }
341         finally
342         {
343             // Send EngLogEntries message to release the remote peer resources.
344             ctx.getSession().write( new EndLogEntriesMessage( ctx.getNextSequence() ) );
345         }
346     }
347 
348 
349     private void sendAllEntries( ReplicationContext ctx ) throws Exception
350     {
351         ServerEntry rootDSE = ctx.getDirectoryService().getPartitionNexus().getRootDSE( null );
352 
353         EntryAttribute namingContextsAttr = rootDSE.get( SchemaConstants.NAMING_CONTEXTS_AT );
354 
355         if ( namingContextsAttr == null || namingContextsAttr.size() == 0 )
356         {
357             SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
358                 + "] No namingContexts attributes in rootDSE." );
359             return;
360         }
361 
362         // Iterate all context partitions to send all entries of them.
363         for ( Value<?> namingContext : namingContextsAttr )
364         {
365             // Convert attribute value to JNDI name.
366             LdapDN contextName;
367 
368             contextName = new LdapDN( ( String ) namingContext.get() );
369 
370             SessionLog.info( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
371                 + "] Sending entries under '" + contextName + '\'' );
372 
373             Map<String, OidNormalizer> mapping = ctx.getDirectoryService().getRegistries().getAttributeTypeRegistry()
374                 .getNormalizerMapping();
375             contextName.normalize( mapping );
376             sendAllEntries( ctx, contextName );
377         }
378     }
379 
380 
381     private void sendAllEntries( ReplicationContext ctx, LdapDN contextName ) throws Exception
382     {
383         // Retrieve all subtree including the base entry
384         SearchControls ctrl = new SearchControls();
385         ctrl.setSearchScope( SearchControls.SUBTREE_SCOPE );
386 
387         LdapDN adminDn = new LdapDN( ServerDNConstants.ADMIN_SYSTEM_DN_NORMALIZED );
388         adminDn.normalize( ctx.getDirectoryService().getRegistries()
389             .getAttributeTypeRegistry().getNormalizerMapping() );
390         CoreSession adminSession = new DefaultCoreSession( 
391             new LdapPrincipal( adminDn, AuthenticationLevel.STRONG ), ctx.getDirectoryService() );
392 
393         EntryFilteringCursor cursor = ctx.getDirectoryService().getPartitionNexus().search(
394             new SearchOperationContext( adminSession, contextName,
395                 AliasDerefMode.DEREF_ALWAYS, new PresenceNode( SchemaConstants.OBJECT_CLASS_AT_OID ), ctrl ) );
396 
397         try
398         {
399             while ( cursor.next() )
400             {
401                 ServerEntry entry = cursor.get();
402 
403                 // Skip entries without entryCSN attribute.
404                 EntryAttribute entryCSNAttr = entry.get( org.apache.directory.mitosis.common.Constants.ENTRY_CSN );
405 
406                 if ( entryCSNAttr == null )
407                 {
408                     continue;
409                 }
410 
411                 // Get entryCSN of the entry.  Skip if entryCSN value is invalid. 
412                 CSN csn;
413 
414                 try
415                 {
416                     Object val = entryCSNAttr.get();
417 
418                     if ( val instanceof byte[] )
419                     {
420                         csn = new DefaultCSN( StringTools.utf8ToString( ( byte[] ) val ) );
421                     }
422                     else
423                     {
424                         csn = new DefaultCSN( ( String ) val );
425                     }
426                 }
427                 catch ( IllegalArgumentException ex )
428                 {
429                     SessionLog.warn( ctx.getSession(), "An entry with improper entryCSN: " + entry.getDn() );
430                     continue;
431                 }
432 
433                 // Convert the entry into AddEntryOperation log.
434                 LdapDN dn = entry.getDn();
435                 dn.normalize( ctx.getDirectoryService().getRegistries().getAttributeTypeRegistry()
436                     .getNormalizerMapping() );
437                 Operation op = new AddEntryOperation( ctx.getDirectoryService().getRegistries(), csn, entry );
438 
439                 // Send a LogEntry message for the entry.
440                 writeTimeLimitedMessage( ctx, new LogEntryMessage( ctx.getNextSequence(), op ) );
441             }
442         }
443         finally
444         {
445             cursor.close();
446         }
447     }
448 
449 
450     @SuppressWarnings("unchecked")
451     private void sendReplicationLogs( Registries registries, ReplicationContext ctx, CSNVector myPV, CSNVector yourUV )
452     {
453         for ( String replicaId : myPV.getReplicaIds() )
454         {
455             CSN myCSN = myPV.getCSN( replicaId );
456             CSN yourCSN = yourUV.getCSN( replicaId );
457             
458             if ( yourCSN != null && ( myCSN == null || yourCSN.compareTo( myCSN ) < 0 ) )
459             {
460                 SessionLog.warn( ctx.getSession(), "Remote update vector (" + yourUV
461                     + ") is out-of-date.  Full replication is required." );
462                 ctx.getSession().close();
463                 return;
464             }
465         }
466 
467         ReplicationLogIterator logIt = ctx.getConfiguration().getStore().getLogs( yourUV, false );
468         try
469         {
470             while ( logIt.next() )
471             {
472                 Operation op = logIt.getOperation( registries );
473                 writeTimeLimitedMessage( ctx, new LogEntryMessage( ctx.getNextSequence(), op ) );
474             }
475         }
476         finally
477         {
478             logIt.close();
479         }
480     }
481 
482 
483     private void onUnexpectedMessage( ReplicationContext ctx, Object message )
484     {
485         SessionLog.warn( ctx.getSession(), "Unexpected message: " + message );
486         ctx.getSession().close();
487     }
488 }