1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.directory.mitosis.service.protocol.handler;
21
22
23 import org.apache.directory.mitosis.common.CSNVector;
24 import org.apache.directory.mitosis.common.Replica;
25 import org.apache.directory.mitosis.operation.Operation;
26 import org.apache.directory.mitosis.service.ReplicationContext;
27 import org.apache.directory.mitosis.service.ReplicationContext.State;
28 import org.apache.directory.mitosis.service.protocol.Constants;
29 import org.apache.directory.mitosis.service.protocol.message.BeginLogEntriesAckMessage;
30 import org.apache.directory.mitosis.service.protocol.message.BeginLogEntriesMessage;
31 import org.apache.directory.mitosis.service.protocol.message.EndLogEntriesAckMessage;
32 import org.apache.directory.mitosis.service.protocol.message.EndLogEntriesMessage;
33 import org.apache.directory.mitosis.service.protocol.message.LogEntryAckMessage;
34 import org.apache.directory.mitosis.service.protocol.message.LogEntryMessage;
35 import org.apache.directory.mitosis.service.protocol.message.LoginAckMessage;
36 import org.apache.directory.mitosis.service.protocol.message.LoginMessage;
37 import org.apache.directory.mitosis.store.ReplicationStore;
38 import org.apache.mina.common.IdleStatus;
39 import org.apache.mina.util.SessionLog;
40
41 import java.net.InetSocketAddress;
42
43
44
45
46
47
48
49
50
51
52
53 public class ReplicationServerContextHandler implements ReplicationContextHandler
54 {
55 private Replica replicaInTransaction;
56
57
58 public void contextBegin( ReplicationContext ctx ) throws Exception
59 {
60
61 ctx.getSession().setIdleTime( IdleStatus.BOTH_IDLE, ctx.getConfiguration().getResponseTimeout() );
62
63
64 ctx.getSession().setWriteTimeout( ctx.getConfiguration().getResponseTimeout() );
65 }
66
67
68 public synchronized void contextEnd( ReplicationContext ctx ) throws Exception
69 {
70
71 if ( ctx.getPeer() != null && ctx.getPeer().equals( replicaInTransaction ) )
72 {
73 replicaInTransaction = null;
74 }
75 }
76
77
78 public void messageReceived( ReplicationContext ctx, Object message ) throws Exception
79 {
80 if ( ctx.getState() == State.READY )
81 {
82 if ( message instanceof LogEntryMessage )
83 {
84 onLogEntry( ctx, ( LogEntryMessage ) message );
85 }
86 else if ( message instanceof BeginLogEntriesMessage )
87 {
88 onBeginLogEntries( ctx, ( BeginLogEntriesMessage ) message );
89 }
90 else if ( message instanceof EndLogEntriesMessage )
91 {
92 onEndLogEntries( ctx, ( EndLogEntriesMessage ) message );
93 }
94 else
95 {
96 onUnexpectedMessage( ctx, message );
97 }
98 }
99 else
100 {
101 if ( message instanceof LoginMessage )
102 {
103 onLogin( ctx, ( LoginMessage ) message );
104 }
105 else
106 {
107 onUnexpectedMessage( ctx, message );
108 }
109 }
110 }
111
112
113 public void messageSent( ReplicationContext ctx, Object message ) throws Exception
114 {
115 }
116
117
118 public void exceptionCaught( ReplicationContext ctx, Throwable cause ) throws Exception
119 {
120 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
121 + "] Unexpected exception.", cause );
122 ctx.getSession().close();
123 }
124
125
126 public void contextIdle( ReplicationContext ctx, IdleStatus status ) throws Exception
127 {
128 if ( ctx.getState() == State.INIT )
129 {
130 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
131 + "] No login attempt in " + ctx.getConfiguration().getResponseTimeout()
132 + " second(s)." );
133 ctx.getSession().close();
134 }
135 }
136
137
138 private void onLogin( ReplicationContext ctx, LoginMessage message )
139 {
140 for ( Replica replica : ctx.getConfiguration().getPeerReplicas() )
141 {
142 if ( replica.getId().equals( message.getReplicaId() ) )
143 {
144 if ( replica.getAddress().getAddress().equals(
145 ( ( InetSocketAddress ) ctx.getSession().getRemoteAddress() ).getAddress() ) )
146 {
147 ctx.getSession()
148 .write(
149 new LoginAckMessage( message.getSequence(), Constants.OK, ctx.getConfiguration()
150 .getReplicaId() ) );
151 ctx.setPeer( replica );
152 ctx.setState( State.READY );
153
154
155 ctx.getSession().setIdleTime( IdleStatus.BOTH_IDLE, 0 );
156 return;
157 }
158 else
159 {
160 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
161 + "] Peer address mismatches: "
162 + ctx.getSession().getRemoteAddress() + " (expected: " + replica.getAddress() );
163 ctx.getSession().write(
164 new LoginAckMessage( message.getSequence(), Constants.NOT_OK, ctx.getConfiguration()
165 .getReplicaId() ) );
166 ctx.getSession().close();
167 return;
168 }
169 }
170 }
171
172 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
173 + "] Unknown peer replica ID: " + message.getReplicaId() );
174 ctx.getSession().write(
175 new LoginAckMessage( message.getSequence(), Constants.NOT_OK, ctx.getConfiguration().getReplicaId() ) );
176 ctx.getSession().close();
177 }
178
179
180 private synchronized void onLogEntry( ReplicationContext ctx, LogEntryMessage message ) throws Exception
181 {
182
183
184 if ( !ctx.getPeer().equals( replicaInTransaction ) )
185 {
186 ctx.getSession().write( new LogEntryAckMessage( message.getSequence(), Constants.NOT_OK ) );
187 return;
188 }
189
190 Operation op = message.getOperation();
191 LogEntryAckMessage ack = null;
192 try
193 {
194 op.execute( ctx.getDirectoryService().getPartitionNexus(), ctx.getConfiguration().getStore(),
195 ctx.getDirectoryService().getSession() );
196
197 ack = new LogEntryAckMessage( message.getSequence(), Constants.OK );
198 }
199 catch ( Exception e )
200 {
201 ack = new LogEntryAckMessage( message.getSequence(), Constants.NOT_OK );
202 throw e;
203 }
204 finally
205 {
206 ctx.getSession().write( ack );
207 }
208 }
209
210
211 private synchronized void onBeginLogEntries( ReplicationContext ctx, BeginLogEntriesMessage message )
212 {
213
214 if ( replicaInTransaction != null )
215 {
216 ctx.getSession()
217 .write( new BeginLogEntriesAckMessage( message.getSequence(), Constants.NOT_OK, null, null ) );
218 return;
219 }
220
221 ReplicationStore store = ctx.getConfiguration().getStore();
222 try
223 {
224 CSNVector pv = store.getPurgeVector();
225 CSNVector uv = store.getUpdateVector();
226 replicaInTransaction = ctx.getPeer();
227 ctx.getSession().write( new BeginLogEntriesAckMessage( message.getSequence(), Constants.OK, pv, uv ) );
228 }
229 catch ( Exception e )
230 {
231 SessionLog.warn( ctx.getSession(), "Failed to get update vector.", e );
232 ctx.getSession()
233 .write( new BeginLogEntriesAckMessage( message.getSequence(), Constants.NOT_OK, null, null ) );
234 }
235 }
236
237
238 private synchronized void onEndLogEntries( ReplicationContext ctx, EndLogEntriesMessage message )
239 {
240
241
242 if ( !ctx.getPeer().equals( replicaInTransaction ) )
243 {
244 ctx.getSession().write( new EndLogEntriesAckMessage( message.getSequence(), Constants.NOT_OK ) );
245 return;
246 }
247
248 ctx.getSession().write( new EndLogEntriesAckMessage( message.getSequence(), Constants.OK ) );
249 replicaInTransaction = null;
250 }
251
252
253 private void onUnexpectedMessage( ReplicationContext ctx, Object message )
254 {
255 SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
256 + "] Unexpected message: " + message );
257 ctx.getSession().close();
258 }
259 }