View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.directory.mitosis.service;
21  
22  
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.Map;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.ThreadPoolExecutor;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.directory.mitosis.common.Replica;
32  import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
33  import org.apache.directory.mitosis.service.protocol.codec.ReplicationClientProtocolCodecFactory;
34  import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientContextHandler;
35  import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientProtocolHandler;
36  import org.apache.directory.mitosis.service.protocol.handler.ReplicationProtocolHandler;
37  import org.apache.mina.common.ConnectFuture;
38  import org.apache.mina.common.ExecutorThreadModel;
39  import org.apache.mina.common.IoConnector;
40  import org.apache.mina.common.IoConnectorConfig;
41  import org.apache.mina.common.IoSession;
42  import org.apache.mina.common.RuntimeIOException;
43  import org.apache.mina.filter.LoggingFilter;
44  import org.apache.mina.filter.codec.ProtocolCodecFilter;
45  import org.apache.mina.transport.socket.nio.SocketConnector;
46  import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
47  import org.slf4j.Logger;
48  import org.slf4j.LoggerFactory;
49  
50  /**
51   * Manages all outgoing connections to remote replicas.
52   * It gets the list of the peer {@link Replica}s from
53   * {@link ReplicationInterceptor} and keeps trying to connect to them.
54   * <p>
55   * When the connection attempt fails, the interval between each connection
56   * attempt doubles up (0, 2, 4, 8, 16, ...) to 60 seconds at maximum.
57   * <p>
58   * Once the connection attempt succeeds, the interval value is reset to
59   * its initial value (0 second) and the established connection is handled
60   * by {@link ReplicationClientProtocolHandler}.
61   * The {@link ReplicationClientProtocolHandler} actually wraps
62   * a {@link ReplicationClientContextHandler} that drives the actual
63   * replication process.
64   *
65   * @author The Apache Directory Project (dev@directory.apache.org)
66   * @version $Rev: 116 $, $Date: 2006-09-18 13:47:53Z $
67   */
68  class ClientConnectionManager
69  {
70      private static final Logger LOG = LoggerFactory.getLogger( ClientConnectionManager.class );
71  
72      private final ReplicationInterceptor interceptor;
73      private final IoConnector connector = new SocketConnector();
74      private final IoConnectorConfig connectorConfig = new SocketConnectorConfig();
75      private final Map<String,Connection> sessions = new HashMap<String,Connection>();
76      private ReplicationConfiguration configuration;
77      private ConnectionMonitor monitor;
78  
79  
80      ClientConnectionManager( ReplicationInterceptor interceptor )
81      {
82          this.interceptor = interceptor;
83  
84          ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance( "mitosis" );
85          threadModel.setExecutor( new ThreadPoolExecutor( 16, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() ) );
86          connectorConfig.setThreadModel( threadModel );
87  
88          //// add codec
89          connectorConfig.getFilterChain().addLast( "protocol",
90              new ProtocolCodecFilter( new ReplicationClientProtocolCodecFactory() ) );
91  
92          //// add logger
93          connectorConfig.getFilterChain().addLast( "logger", new LoggingFilter() );
94      }
95  
96  
97      public void start( ReplicationConfiguration cfg ) throws Exception
98      {
99          this.configuration = cfg;
100         monitor = new ConnectionMonitor();
101         monitor.start();
102     }
103 
104 
105     public void stop() throws Exception
106     {
107         // close all connections
108         monitor.shutdown();
109 
110         // remove all filters
111         connector.getFilterChain().clear();
112 
113         ( ( ExecutorService ) ( ( ExecutorThreadModel ) connectorConfig.getThreadModel() ).getExecutor() ).shutdown();
114         
115         // Remove all status values.
116         sessions.clear();
117     }
118 
119 
120     public void replicate()
121     {
122         // FIXME Can get ConcurrentModificationException.
123         for ( Connection connection : sessions.values() )
124         {
125             synchronized ( connection )
126             {
127                 // Begin replication for the connected replicas.
128                 if ( connection.session != null )
129                 {
130                     ( ( ReplicationProtocolHandler ) connection.session.getHandler() )
131                             .getContext( connection.session ).replicate();
132                 }
133             }
134         }
135     }
136 
137 
138     /**
139      * Interrupt the unconnected connections to make them attempt to connect immediately.
140      *
141      */
142     public void interruptConnectors()
143     {
144         for ( Connection connection : sessions.values() )
145         {
146             synchronized ( connection )
147             {
148                 // Wake up the replicas that are sleeping.
149                 if ( connection.inProgress && connection.connector != null )
150                 {
151                     connection.connector.interrupt();
152                 }
153             }
154         }
155         
156     }
157 
158 
159     private class ConnectionMonitor extends Thread
160     {
161         private boolean timeToShutdown;
162 
163 
164         public ConnectionMonitor()
165         {
166             super( "ClientConnectionManager" );
167             
168             // Initialize the status map.
169             for ( Replica replica : configuration.getPeerReplicas() )
170             {
171                 Connection con = sessions.get( replica.getId() );
172                 
173                 if ( con == null )
174                 {
175                     con = new Connection();
176                     con.replicaId = replica.getId();
177                     sessions.put( replica.getId(), con );
178                 }
179             }
180         }
181 
182 
183         public void shutdown()
184         {
185             timeToShutdown = true;
186             while ( isAlive() )
187             {
188                 try
189                 {
190                     join();
191                 }
192                 catch ( InterruptedException e )
193                 {
194                     LOG.warn( "[Replica-{}] Unexpected exception.", configuration.getReplicaId(), e );
195                 }
196             }
197         }
198 
199 
200         public void run()
201         {
202             while ( !timeToShutdown )
203             {
204                 connectUnconnected();
205                 try
206                 {
207                     Thread.sleep( 1000 );
208                 }
209                 catch ( InterruptedException e )
210                 {
211                     LOG.warn( "[Replica-{}] Unexpected exception.", configuration.getReplicaId(), e );
212                 }
213             }
214 
215             disconnectConnected();
216         }
217 
218 
219         private void connectUnconnected()
220         {
221             for ( Replica replica : configuration.getPeerReplicas() )
222             {
223                 // Someone might have modified the configuration,
224                 // and therefore we try to detect newly added replicas.
225                 Connection con = sessions.get( replica.getId() );
226                 if ( con == null )
227                 {
228                     con = new Connection();
229                     con.replicaId = replica.getId();
230                     sessions.put( replica.getId(), con );
231                 }
232 
233                 synchronized ( con )
234                 {
235                     if ( con.inProgress )
236                     {
237                         // connection is in progress
238                         continue;
239                     }
240 
241                     if ( con.session != null )
242                     {
243                         if ( con.session.isConnected() )
244                         {
245                             continue;
246                         }
247                         con.session = null;
248                     }
249 
250                     // put to connectingSession with dummy value to mark
251                     // that connection is in progress
252                     con.inProgress = true;
253 
254                     if ( con.delay < 0 )
255                     {
256                         con.delay = 0;
257                     }
258                     else if ( con.delay == 0 )
259                     {
260                         con.delay = 2;
261                     }
262                     else
263                     {
264                         con.delay *= 2;
265                         if ( con.delay > 60 )
266                         {
267                             con.delay = 60;
268                         }
269                     }
270                 }
271 
272                 Connector connector = new Connector( replica, con );
273                 synchronized ( con )
274                 {
275                     con.connector = connector;
276                 }
277                 connector.start();
278             }
279         }
280 
281 
282         private void disconnectConnected()
283         {
284             LOG.info( "[Replica-{}] Closing all connections...", configuration.getReplicaId() );
285             for ( ;; )
286             {
287                 Iterator<Connection> connections = sessions.values().iterator();
288                 
289                 while ( connections.hasNext() )
290                 {
291                     Connection con =  connections.next();
292                     
293                     synchronized ( con )
294                     {
295                         if ( con.inProgress )
296                         {
297                             if ( con.connector != null )
298                             {
299                                 con.connector.interrupt();
300                             }
301                             
302                             continue;
303                         }
304 
305                         connections.remove();
306 
307                         if ( con.session != null )
308                         {
309                             LOG.info( "[Replica-{}] Closed connection to Replica-{}", configuration.getReplicaId(),
310                                     con.replicaId );
311                             con.session.close();
312                         }
313                     }
314                 }
315 
316                 if ( sessions.isEmpty() )
317                 {
318                     break;
319                 }
320 
321                 // Sleep 1 second and try again waiting for Connector threads.
322                 try
323                 {
324                     Thread.sleep( 1000 );
325                 }
326                 catch ( InterruptedException e )
327                 {
328                 }
329             }
330         }
331     }
332 
333 
334     private class Connector extends Thread
335     {
336         private final Replica replica;
337         private final Connection con;
338 
339 
340         public Connector( Replica replica, Connection con )
341         {
342             super( "ClientConnectionManager-" + replica );
343             this.replica = replica;
344             this.con = con;
345         }
346 
347 
348         public void run()
349         {
350             if ( con.delay > 0 )
351             {
352                 if ( LOG.isInfoEnabled() )
353                 {
354                     LOG.info( "[Replica-{}] Waiting for {} seconds to reconnect to replica-" + con.replicaId,
355                             ClientConnectionManager.this.configuration.getReplicaId(), con.delay );
356                 }
357                 
358                 try
359                 {
360                     Thread.sleep( con.delay * 1000L );
361                 }
362                 catch ( InterruptedException e )
363                 {
364                 }
365             }
366 
367             LOG.info( "[Replica-{}] Connecting to replica-{}",
368                     ClientConnectionManager.this.configuration.getReplicaId(), replica.getId() );
369 
370             IoSession session;
371             try
372             {
373                 connectorConfig.setConnectTimeout( configuration.getResponseTimeout() );
374                 ConnectFuture future = connector.connect( replica.getAddress(), new ReplicationClientProtocolHandler(
375                         interceptor ), connectorConfig );
376 
377                 future.join();
378                 session = future.getSession();
379 
380                 synchronized ( con )
381                 {
382                     con.session = session;
383                     con.delay = -1; // reset delay
384                     con.inProgress = false;
385                     con.replicaId = replica.getId();
386                 }
387             }
388             catch ( RuntimeIOException e )
389             {
390                 LOG.warn( "[Replica-" + ClientConnectionManager.this.configuration.getReplicaId()
391                         + "] Failed to connect to replica-" + replica.getId(), e );
392             }
393             finally
394             {
395                 synchronized ( con )
396                 {
397                     con.inProgress = false;
398                     con.connector = null;
399                 }
400             }
401         }
402     }
403 
404     private static class Connection
405     {
406         private IoSession session;
407         private int delay = -1;
408         private boolean inProgress;
409         private Connector connector;
410         private String replicaId;
411 
412 
413         public Connection()
414         {
415         }
416     }
417 }