1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.directory.mitosis.service;
21
22
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.Map;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30
31 import org.apache.directory.mitosis.common.Replica;
32 import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
33 import org.apache.directory.mitosis.service.protocol.codec.ReplicationClientProtocolCodecFactory;
34 import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientContextHandler;
35 import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientProtocolHandler;
36 import org.apache.directory.mitosis.service.protocol.handler.ReplicationProtocolHandler;
37 import org.apache.mina.common.ConnectFuture;
38 import org.apache.mina.common.ExecutorThreadModel;
39 import org.apache.mina.common.IoConnector;
40 import org.apache.mina.common.IoConnectorConfig;
41 import org.apache.mina.common.IoSession;
42 import org.apache.mina.common.RuntimeIOException;
43 import org.apache.mina.filter.LoggingFilter;
44 import org.apache.mina.filter.codec.ProtocolCodecFilter;
45 import org.apache.mina.transport.socket.nio.SocketConnector;
46 import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 class ClientConnectionManager
69 {
70 private static final Logger LOG = LoggerFactory.getLogger( ClientConnectionManager.class );
71
72 private final ReplicationInterceptor interceptor;
73 private final IoConnector connector = new SocketConnector();
74 private final IoConnectorConfig connectorConfig = new SocketConnectorConfig();
75 private final Map<String,Connection> sessions = new HashMap<String,Connection>();
76 private ReplicationConfiguration configuration;
77 private ConnectionMonitor monitor;
78
79
80 ClientConnectionManager( ReplicationInterceptor interceptor )
81 {
82 this.interceptor = interceptor;
83
84 ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance( "mitosis" );
85 threadModel.setExecutor( new ThreadPoolExecutor( 16, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() ) );
86 connectorConfig.setThreadModel( threadModel );
87
88
89 connectorConfig.getFilterChain().addLast( "protocol",
90 new ProtocolCodecFilter( new ReplicationClientProtocolCodecFactory() ) );
91
92
93 connectorConfig.getFilterChain().addLast( "logger", new LoggingFilter() );
94 }
95
96
97 public void start( ReplicationConfiguration cfg ) throws Exception
98 {
99 this.configuration = cfg;
100 monitor = new ConnectionMonitor();
101 monitor.start();
102 }
103
104
105 public void stop() throws Exception
106 {
107
108 monitor.shutdown();
109
110
111 connector.getFilterChain().clear();
112
113 ( ( ExecutorService ) ( ( ExecutorThreadModel ) connectorConfig.getThreadModel() ).getExecutor() ).shutdown();
114
115
116 sessions.clear();
117 }
118
119
120 public void replicate()
121 {
122
123 for ( Connection connection : sessions.values() )
124 {
125 synchronized ( connection )
126 {
127
128 if ( connection.session != null )
129 {
130 ( ( ReplicationProtocolHandler ) connection.session.getHandler() )
131 .getContext( connection.session ).replicate();
132 }
133 }
134 }
135 }
136
137
138
139
140
141
142 public void interruptConnectors()
143 {
144 for ( Connection connection : sessions.values() )
145 {
146 synchronized ( connection )
147 {
148
149 if ( connection.inProgress && connection.connector != null )
150 {
151 connection.connector.interrupt();
152 }
153 }
154 }
155
156 }
157
158
159 private class ConnectionMonitor extends Thread
160 {
161 private boolean timeToShutdown;
162
163
164 public ConnectionMonitor()
165 {
166 super( "ClientConnectionManager" );
167
168
169 for ( Replica replica : configuration.getPeerReplicas() )
170 {
171 Connection con = sessions.get( replica.getId() );
172
173 if ( con == null )
174 {
175 con = new Connection();
176 con.replicaId = replica.getId();
177 sessions.put( replica.getId(), con );
178 }
179 }
180 }
181
182
183 public void shutdown()
184 {
185 timeToShutdown = true;
186 while ( isAlive() )
187 {
188 try
189 {
190 join();
191 }
192 catch ( InterruptedException e )
193 {
194 LOG.warn( "[Replica-{}] Unexpected exception.", configuration.getReplicaId(), e );
195 }
196 }
197 }
198
199
200 public void run()
201 {
202 while ( !timeToShutdown )
203 {
204 connectUnconnected();
205 try
206 {
207 Thread.sleep( 1000 );
208 }
209 catch ( InterruptedException e )
210 {
211 LOG.warn( "[Replica-{}] Unexpected exception.", configuration.getReplicaId(), e );
212 }
213 }
214
215 disconnectConnected();
216 }
217
218
219 private void connectUnconnected()
220 {
221 for ( Replica replica : configuration.getPeerReplicas() )
222 {
223
224
225 Connection con = sessions.get( replica.getId() );
226 if ( con == null )
227 {
228 con = new Connection();
229 con.replicaId = replica.getId();
230 sessions.put( replica.getId(), con );
231 }
232
233 synchronized ( con )
234 {
235 if ( con.inProgress )
236 {
237
238 continue;
239 }
240
241 if ( con.session != null )
242 {
243 if ( con.session.isConnected() )
244 {
245 continue;
246 }
247 con.session = null;
248 }
249
250
251
252 con.inProgress = true;
253
254 if ( con.delay < 0 )
255 {
256 con.delay = 0;
257 }
258 else if ( con.delay == 0 )
259 {
260 con.delay = 2;
261 }
262 else
263 {
264 con.delay *= 2;
265 if ( con.delay > 60 )
266 {
267 con.delay = 60;
268 }
269 }
270 }
271
272 Connector connector = new Connector( replica, con );
273 synchronized ( con )
274 {
275 con.connector = connector;
276 }
277 connector.start();
278 }
279 }
280
281
282 private void disconnectConnected()
283 {
284 LOG.info( "[Replica-{}] Closing all connections...", configuration.getReplicaId() );
285 for ( ;; )
286 {
287 Iterator<Connection> connections = sessions.values().iterator();
288
289 while ( connections.hasNext() )
290 {
291 Connection con = connections.next();
292
293 synchronized ( con )
294 {
295 if ( con.inProgress )
296 {
297 if ( con.connector != null )
298 {
299 con.connector.interrupt();
300 }
301
302 continue;
303 }
304
305 connections.remove();
306
307 if ( con.session != null )
308 {
309 LOG.info( "[Replica-{}] Closed connection to Replica-{}", configuration.getReplicaId(),
310 con.replicaId );
311 con.session.close();
312 }
313 }
314 }
315
316 if ( sessions.isEmpty() )
317 {
318 break;
319 }
320
321
322 try
323 {
324 Thread.sleep( 1000 );
325 }
326 catch ( InterruptedException e )
327 {
328 }
329 }
330 }
331 }
332
333
334 private class Connector extends Thread
335 {
336 private final Replica replica;
337 private final Connection con;
338
339
340 public Connector( Replica replica, Connection con )
341 {
342 super( "ClientConnectionManager-" + replica );
343 this.replica = replica;
344 this.con = con;
345 }
346
347
348 public void run()
349 {
350 if ( con.delay > 0 )
351 {
352 if ( LOG.isInfoEnabled() )
353 {
354 LOG.info( "[Replica-{}] Waiting for {} seconds to reconnect to replica-" + con.replicaId,
355 ClientConnectionManager.this.configuration.getReplicaId(), con.delay );
356 }
357
358 try
359 {
360 Thread.sleep( con.delay * 1000L );
361 }
362 catch ( InterruptedException e )
363 {
364 }
365 }
366
367 LOG.info( "[Replica-{}] Connecting to replica-{}",
368 ClientConnectionManager.this.configuration.getReplicaId(), replica.getId() );
369
370 IoSession session;
371 try
372 {
373 connectorConfig.setConnectTimeout( configuration.getResponseTimeout() );
374 ConnectFuture future = connector.connect( replica.getAddress(), new ReplicationClientProtocolHandler(
375 interceptor ), connectorConfig );
376
377 future.join();
378 session = future.getSession();
379
380 synchronized ( con )
381 {
382 con.session = session;
383 con.delay = -1;
384 con.inProgress = false;
385 con.replicaId = replica.getId();
386 }
387 }
388 catch ( RuntimeIOException e )
389 {
390 LOG.warn( "[Replica-" + ClientConnectionManager.this.configuration.getReplicaId()
391 + "] Failed to connect to replica-" + replica.getId(), e );
392 }
393 finally
394 {
395 synchronized ( con )
396 {
397 con.inProgress = false;
398 con.connector = null;
399 }
400 }
401 }
402 }
403
404 private static class Connection
405 {
406 private IoSession session;
407 private int delay = -1;
408 private boolean inProgress;
409 private Connector connector;
410 private String replicaId;
411
412
413 public Connection()
414 {
415 }
416 }
417 }