View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.directory.mitosis.service.protocol.handler;
21  
22  
23  import org.apache.directory.mitosis.common.CSNVector;
24  import org.apache.directory.mitosis.common.Replica;
25  import org.apache.directory.mitosis.operation.Operation;
26  import org.apache.directory.mitosis.service.ReplicationContext;
27  import org.apache.directory.mitosis.service.ReplicationContext.State;
28  import org.apache.directory.mitosis.service.protocol.Constants;
29  import org.apache.directory.mitosis.service.protocol.message.BeginLogEntriesAckMessage;
30  import org.apache.directory.mitosis.service.protocol.message.BeginLogEntriesMessage;
31  import org.apache.directory.mitosis.service.protocol.message.EndLogEntriesAckMessage;
32  import org.apache.directory.mitosis.service.protocol.message.EndLogEntriesMessage;
33  import org.apache.directory.mitosis.service.protocol.message.LogEntryAckMessage;
34  import org.apache.directory.mitosis.service.protocol.message.LogEntryMessage;
35  import org.apache.directory.mitosis.service.protocol.message.LoginAckMessage;
36  import org.apache.directory.mitosis.service.protocol.message.LoginMessage;
37  import org.apache.directory.mitosis.store.ReplicationStore;
38  import org.apache.mina.common.IdleStatus;
39  import org.apache.mina.util.SessionLog;
40  
41  import java.net.InetSocketAddress;
42  
43  
44  /**
45   * {@link ReplicationContextHandler} that implements server-side replication logic
46   * which retrieves any changes occurred in remote replicas.  Please refer to
47   * {@link ReplicationClientContextHandler} for the detailed protocol flow and
48   * the description of the replication logic execution.
49   *
50   * @author The Apache Directory Project (dev@directory.apache.org)
51   * @version $Rev: 116 $, $Date: 2006-09-18 13:47:53Z $
52   */
53  public class ReplicationServerContextHandler implements ReplicationContextHandler
54  {
55      private Replica replicaInTransaction;
56  
57  
58      public void contextBegin( ReplicationContext ctx ) throws Exception
59      {
60          // Set login timeout
61          ctx.getSession().setIdleTime( IdleStatus.BOTH_IDLE, ctx.getConfiguration().getResponseTimeout() );
62  
63          // Set write timeout
64          ctx.getSession().setWriteTimeout( ctx.getConfiguration().getResponseTimeout() );
65      }
66  
67  
68      public synchronized void contextEnd( ReplicationContext ctx ) throws Exception
69      {
70          // Reset the mark if the context has the unfinished transaction.
71          if ( ctx.getPeer() != null && ctx.getPeer().equals( replicaInTransaction ) )
72          {
73              replicaInTransaction = null;
74          }
75      }
76  
77  
78      public void messageReceived( ReplicationContext ctx, Object message ) throws Exception
79      {
80          if ( ctx.getState() == State.READY )
81          {
82              if ( message instanceof LogEntryMessage )
83              {
84                  onLogEntry( ctx, ( LogEntryMessage ) message );
85              }
86              else if ( message instanceof BeginLogEntriesMessage )
87              {
88                  onBeginLogEntries( ctx, ( BeginLogEntriesMessage ) message );
89              }
90              else if ( message instanceof EndLogEntriesMessage )
91              {
92                  onEndLogEntries( ctx, ( EndLogEntriesMessage ) message );
93              }
94              else
95              {
96                  onUnexpectedMessage( ctx, message );
97              }
98          }
99          else
100         {
101             if ( message instanceof LoginMessage )
102             {
103                 onLogin( ctx, ( LoginMessage ) message );
104             }
105             else
106             {
107                 onUnexpectedMessage( ctx, message );
108             }
109         }
110     }
111 
112 
113     public void messageSent( ReplicationContext ctx, Object message ) throws Exception
114     {
115     }
116 
117 
118     public void exceptionCaught( ReplicationContext ctx, Throwable cause ) throws Exception
119     {
120         SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
121                 + "] Unexpected exception.", cause );
122         ctx.getSession().close();
123     }
124 
125 
126     public void contextIdle( ReplicationContext ctx, IdleStatus status ) throws Exception
127     {
128         if ( ctx.getState() == State.INIT )
129         {
130             SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
131                 + "] No login attempt in " + ctx.getConfiguration().getResponseTimeout()
132                 + " second(s)." );
133             ctx.getSession().close();
134         }
135     }
136 
137 
138     private void onLogin( ReplicationContext ctx, LoginMessage message )
139     {
140         for ( Replica replica : ctx.getConfiguration().getPeerReplicas() )
141         {
142             if ( replica.getId().equals( message.getReplicaId() ) )
143             {
144                 if ( replica.getAddress().getAddress().equals(
145                         ( ( InetSocketAddress ) ctx.getSession().getRemoteAddress() ).getAddress() ) )
146                 {
147                     ctx.getSession()
148                             .write(
149                                     new LoginAckMessage( message.getSequence(), Constants.OK, ctx.getConfiguration()
150                                             .getReplicaId() ) );
151                     ctx.setPeer( replica );
152                     ctx.setState( State.READY );
153 
154                     // Clear login timeout.
155                     ctx.getSession().setIdleTime( IdleStatus.BOTH_IDLE, 0 );
156                     return;
157                 }
158                 else
159                 {
160                     SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
161                             + "] Peer address mismatches: "
162                             + ctx.getSession().getRemoteAddress() + " (expected: " + replica.getAddress() );
163                     ctx.getSession().write(
164                             new LoginAckMessage( message.getSequence(), Constants.NOT_OK, ctx.getConfiguration()
165                                     .getReplicaId() ) );
166                     ctx.getSession().close();
167                     return;
168                 }
169             }
170         }
171 
172         SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
173                 + "] Unknown peer replica ID: " + message.getReplicaId() );
174         ctx.getSession().write(
175             new LoginAckMessage( message.getSequence(), Constants.NOT_OK, ctx.getConfiguration().getReplicaId() ) );
176         ctx.getSession().close();
177     }
178 
179     
180     private synchronized void onLogEntry( ReplicationContext ctx, LogEntryMessage message ) throws Exception
181     {
182         // Return error if other replica than what is in progress sends
183         // a log entry
184         if ( !ctx.getPeer().equals( replicaInTransaction ) )
185         {
186             ctx.getSession().write( new LogEntryAckMessage( message.getSequence(), Constants.NOT_OK ) );
187             return;
188         }
189 
190         Operation op = message.getOperation();
191         LogEntryAckMessage ack = null;
192         try
193         {
194             op.execute( ctx.getDirectoryService().getPartitionNexus(), ctx.getConfiguration().getStore(),
195             		ctx.getDirectoryService().getSession() );
196             
197             ack = new LogEntryAckMessage( message.getSequence(), Constants.OK );
198         }
199         catch ( Exception e )
200         {
201             ack = new LogEntryAckMessage( message.getSequence(), Constants.NOT_OK );
202             throw e;
203         }
204         finally
205         {
206             ctx.getSession().write( ack );
207         }
208     }
209 
210 
211     private synchronized void onBeginLogEntries( ReplicationContext ctx, BeginLogEntriesMessage message )
212     {
213         // Return error if the transaction is already in progress.
214         if ( replicaInTransaction != null )
215         {
216             ctx.getSession()
217                 .write( new BeginLogEntriesAckMessage( message.getSequence(), Constants.NOT_OK, null, null ) );
218             return;
219         }
220 
221         ReplicationStore store = ctx.getConfiguration().getStore();
222         try
223         {
224             CSNVector pv = store.getPurgeVector();
225             CSNVector uv = store.getUpdateVector();
226             replicaInTransaction = ctx.getPeer(); // Mark as replica in transaction
227             ctx.getSession().write( new BeginLogEntriesAckMessage( message.getSequence(), Constants.OK, pv, uv ) );
228         }
229         catch ( Exception e )
230         {
231             SessionLog.warn( ctx.getSession(), "Failed to get update vector.", e );
232             ctx.getSession()
233                 .write( new BeginLogEntriesAckMessage( message.getSequence(), Constants.NOT_OK, null, null ) );
234         }
235     }
236 
237 
238     private synchronized void onEndLogEntries( ReplicationContext ctx, EndLogEntriesMessage message )
239     {
240         // Return error if other replica than what is in progress sends
241         // a flow control message
242         if ( !ctx.getPeer().equals( replicaInTransaction ) )
243         {
244             ctx.getSession().write( new EndLogEntriesAckMessage( message.getSequence(), Constants.NOT_OK ) );
245             return;
246         }
247 
248         ctx.getSession().write( new EndLogEntriesAckMessage( message.getSequence(), Constants.OK ) );
249         replicaInTransaction = null; // Reset the mark.
250     }
251 
252 
253     private void onUnexpectedMessage( ReplicationContext ctx, Object message )
254     {
255         SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
256                 + "] Unexpected message: " + message );
257         ctx.getSession().close();
258     }
259 }