View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.directory.mitosis.configuration;
21  
22  
23  import java.net.InetAddress;
24  import java.net.UnknownHostException;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.TreeSet;
30  
31  import org.apache.directory.mitosis.common.CSN;
32  import org.apache.directory.mitosis.common.CSNFactory;
33  import org.apache.directory.mitosis.common.Replica;
34  import org.apache.directory.mitosis.common.DefaultCSNFactory;
35  import org.apache.directory.mitosis.service.ReplicationInterceptor;
36  import org.apache.directory.mitosis.store.ReplicationStore;
37  import org.apache.directory.mitosis.store.derby.DerbyReplicationStore;
38  import org.apache.directory.shared.ldap.util.StringTools;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  /**
43   * A configuration for {@link ReplicationInterceptor}.  This configuration can be
44   * used by calling {@link ReplicationInterceptor#setConfiguration(ReplicationConfiguration)}.
45   * 
46   * @org.apache.xbean.XBean
47   *
48   * @author The Apache Directory Project Team
49   */
50  public class ReplicationConfiguration
51  {
52      public static final int DEFAULT_LOG_MAX_AGE = 7;
53      public static final int DEFAULT_REPLICATION_INTERVAL = 5;
54      public static final int DEFAULT_RESPONSE_TIMEOUT = 60;
55      public static final int DEFAULT_SERVER_PORT = 7846;
56  
57      /** The logger */
58      private static Logger log = LoggerFactory.getLogger( ReplicationConfiguration.class );
59  
60      /** The server identifier */ 
61      private String replicaId;
62      
63      /** Default values for the communication part */
64      private int serverPort = DEFAULT_SERVER_PORT;
65      private int responseTimeout = DEFAULT_RESPONSE_TIMEOUT;
66      private int replicationInterval = DEFAULT_REPLICATION_INTERVAL;
67  
68      /** List of connected replicas */
69      private final Set<Replica> peerReplicas = new HashSet<Replica>();
70      
71      /** Factories */
72      private CSNFactory csnFactory = new DefaultCSNFactory();
73      private ReplicationStore store = new DerbyReplicationStore();
74      
75      /** The longest period of time before a stored entry is removed from storage */
76      private int logMaxAge = DEFAULT_LOG_MAX_AGE; // a week (days)
77  
78  
79      /**
80       * Creates a new instance with default properties except for
81       * the {@link ReplicaId} of the service and the  the list of peer
82       * {@link Replica}s.  You can set these properties by calling
83       * {@link #setReplicaId(ReplicaId)} and {@link #setPeerReplicas(Set)}
84       * respectively.
85       */
86      public ReplicationConfiguration()
87      {
88      }
89  
90      /**
91       * Returns the TCP/IP port number that a {@link ReplicationInterceptor}
92       * listens to.  The default value is {@link #DEFAULT_SERVER_PORT}. 
93       */
94      public int getServerPort()
95      {
96          return serverPort;
97      }
98  
99  
100     /**
101      * Sets the TCP/IP port number that a {@link ReplicationInterceptor}
102      * listens to.  The default value is {@link #DEFAULT_SERVER_PORT}.
103      */
104     public void setServerPort( int serverPort )
105     {
106         this.serverPort = serverPort;
107     }
108 
109     /**
110      * Returns the response timeout value (seconds) for each sent message
111      * during the communication between replicas.  If any response message
112      * is not received within this timeout, the connection is closed and
113      * reestablished.  The default value is {@link #DEFAULT_RESPONSE_TIMEOUT}.
114      */
115     public int getResponseTimeout()
116     {
117         return responseTimeout;
118     }
119 
120     /**
121      * Sets the response timeout value (seconds) for each sent message
122      * during the communication between replicas.  If any response message
123      * is not received within this timeout, the connection is closed and
124      * reestablished.  The default value is {@link #DEFAULT_RESPONSE_TIMEOUT}.
125      */
126     public void setResponseTimeout( int responseTimeout )
127     {
128         this.responseTimeout = responseTimeout;
129     }
130 
131     /**
132      * Returns the replication data exchange interval (seconds) between two
133      * replicas. The default value is {@link #DEFAULT_REPLICATION_INTERVAL}.
134      * 
135      * @return <tt>0</tt> if automatic replication is disabled
136      */
137     public int getReplicationInterval() {
138         return replicationInterval;
139     }
140 
141 
142     /**
143      * Sets the replication data exchange interval (seconds) between two
144      * replicas. The default value is {@link #DEFAULT_REPLICATION_INTERVAL}.
145      * 
146      * @param replicationInterval <tt>0</tt> or below to disable automatic replication.
147      */
148     public void setReplicationInterval( int replicationInterval )
149     {
150         if( replicationInterval < 0 )
151         {
152             replicationInterval = 0;
153         }
154         
155         this.replicationInterval = replicationInterval;
156     }
157 
158 
159     /**
160      * Returns the {@link CSNFactory} for generating {@link CSN}s.
161      * The default factory is {@link DefaultCSNFactory}.
162      */
163     public CSNFactory getCsnFactory()
164     {
165         return csnFactory;
166     }
167 
168 
169     /**                +
170      * Sets the {@link CSNFactory} for generating {@link CSN}s.
171      * The default factory is {@link DefaultCSNFactory}.
172      */
173     public void setCsnFactory( CSNFactory csnFactory )
174     {
175         this.csnFactory = csnFactory;
176     }
177 
178     /**
179      * Adds the specified {@link Replica} to the remote peer replica list.
180      */
181     public void addPeerReplica( Replica peer )
182     {
183         assert peer != null;
184         peerReplicas.add( peer );
185     }
186 
187     /**
188      * Removed the specified {@link Replica} from the remote peer replica list.
189      */
190     public void removePeerReplica( Replica peer )
191     {
192         assert peer != null;
193         peerReplicas.remove( peer );
194     }
195 
196     /**
197      * Clears the remote peer replica list.
198      */
199     public void removeAllPeerReplicas()
200     {
201         peerReplicas.clear();
202     }
203 
204     /**
205      * Returns the remote peer replica list.
206      */
207     public Set<Replica> getPeerReplicas()
208     {
209         Set<Replica> result = new HashSet<Replica>();
210         result.addAll( peerReplicas );
211         return result;
212     }
213 
214 
215     /**
216      * Sets the remote peer replica list.
217      */
218     public void setPeerReplicas( Set<Object> replicas )
219     {
220         assert replicas != null;
221 
222         Set<Replica> normalizedReplicas = new HashSet<Replica>();
223         
224         for ( Object replica:replicas )
225         {
226             if ( replica instanceof Replica )
227             {
228                 normalizedReplicas.add( (Replica)replica );
229             }
230             else
231             {
232                 normalizedReplicas.add( new Replica( replica.toString() ) );
233             }
234         }
235         
236         peerReplicas.clear();
237         peerReplicas.addAll( normalizedReplicas );
238     }
239 
240     /**
241      * Returns the ID of the replica this configuration is configuring.
242      */
243     public String getReplicaId()
244     {
245         return replicaId;
246     }
247 
248     /**
249      * Sets the ID of the replica this configuration is configuring.
250      */
251     public void setReplicaId( String replicaId )
252     {
253         assert replicaId != null;
254         this.replicaId = replicaId;
255     }
256 
257     /**
258      * Returns the {@link ReplicationStore} which stores the change log
259      * of the replica this configuration is configuring.  The default
260      * implementation is {@link DerbyReplicationStore}.
261      */
262     public ReplicationStore getStore()
263     {
264         return store;
265     }
266 
267     /**
268      * Sets the {@link ReplicationStore} which stores the change log
269      * of the replica this configuration is configuring.  The default
270      * implementation is {@link DerbyReplicationStore}.
271      */
272     public void setStore( ReplicationStore store )
273     {
274         this.store = store;
275     }
276 
277     /**
278      * Returns the maximum age (days) of change logs stored in
279      * {@link ReplicationStore}.  Any change logs and deleted entries
280      * older than this value will be purged periodically.  The default value
281      * is {@link #DEFAULT_LOG_MAX_AGE}.
282      */
283     public int getLogMaxAge()
284     {
285         return logMaxAge;
286     }
287 
288     /**
289      * Sets the maximum age (days) of change logs stored in
290      * {@link ReplicationStore}.  Any change logs and deleted entries
291      * older than this value will be purged periodically.  The default value
292      * is {@link #DEFAULT_LOG_MAX_AGE}.
293      */
294     public void setLogMaxAge( int logMaxAge )
295     {
296         if ( logMaxAge <= 0 )
297         {
298             throw new ReplicationConfigurationException( "logMaxAge: " + logMaxAge );
299         }
300 
301         this.logMaxAge = logMaxAge;
302     }
303 
304 
305     /**
306      * Validate Mitosis configuration.
307      * 
308      * We check that the configuration file contains valid
309      * parameters :<br/>
310      *  - a replicaId<br/>
311      *  - a valid server port (between 0 and 65535)<br/>
312      *  - a valid response timeout ( > 0 )<br/>
313      *  - a uuidFactory<br/>
314      *  - a CSN factory<br/>
315      *  - a store (derby)<br/>
316      *  - a list of valid replica, none of them being equal
317      *  to the replicaId 
318      *
319      * @throws ReplicationConfigurationException If the configuration file is invalid
320      */
321     public void validate() throws ReplicationConfigurationException
322     {
323         if ( replicaId == null )
324         {
325             log.error( "The replicaId is missing" );
326             throw new ReplicationConfigurationException( "Replica ID is not specified." );
327         }
328 
329         if ( ( serverPort < 0 ) || ( serverPort > 65535 ) )
330         {
331             log.error( "The replica port is not between 0 and 65535" );
332             throw new ReplicationConfigurationException( "Server port is invalid: " + serverPort );
333         }
334 
335         if ( responseTimeout <= 0 )
336         {
337             log.error( "The replica responsetimeout is negative" );
338             throw new ReplicationConfigurationException( "Invalid response timeout: " + responseTimeout );
339         }
340 
341         if ( csnFactory == null )
342         {
343             log.error( "The CSN factory has not been declared" );
344             throw new ReplicationConfigurationException( "CSN factory is not specified." );
345         }
346 
347         if ( store == null )
348         {
349             log.error( "The store has not been declared" );
350             throw new ReplicationConfigurationException( "Replication store is not specified." );
351         }
352 
353         if ( peerReplicas.size() == 0 )
354         {
355             log.error( "The replicas peer list is empty" );
356             throw new ReplicationConfigurationException( "No peer replicas" );
357         }
358 
359         // Check the peer replicas.
360         // We should check that no replica has the same Id, and that we don't
361         // have two replicas on the same server with the same port
362         Set<String> ids = new TreeSet<String>();
363         Map<String, Integer> servers = new HashMap<String, Integer>();
364 
365         // Initialize the set with this server replicaId
366         ids.add( replicaId );
367 
368         // And store the local inetadress
369         servers.put( "localhost", serverPort );
370         servers.put( "127.0.0.1", serverPort );
371 
372         try
373         {
374             servers.put( StringTools.lowerCase( InetAddress.getByName( "127.0.0.1" ).getHostName() ), serverPort );
375         }
376         catch ( UnknownHostException uhe )
377         {
378             // Should never occurs with 127.0.0.1
379             throw new ReplicationConfigurationException( "Unknown host name" );
380         }
381 
382         for ( Replica peer:peerReplicas )
383         {
384             if ( ids.contains( peer.getId() ) )
385             {
386                 log.error( "Peer replica ID '{}' has already been declared.", peer.getId() );
387                 throw new ReplicationConfigurationException( "Peer replica ID '" + peer.getId()
388                     + "' has already been declared." );
389             }
390 
391             // Now check that we don't already have a replica on a server with the same port 
392             String replicaServer = StringTools.lowerCase( peer.getAddress().getHostName() );
393             int replicaPort = peer.getAddress().getPort();
394 
395             if ( servers.containsKey( replicaServer ) )
396             {
397                 int peerPort = servers.get( replicaServer );
398 
399                 if ( replicaPort == peerPort )
400                 {
401                     log.error(
402                         "The replica in the peer list has already been declared on the server {} with the port {}",
403                         replicaServer, peerPort );
404                     throw new ReplicationConfigurationException( "Replication store is not specified." );
405                 }
406             }
407 
408             servers.put( replicaServer, replicaPort );
409         }
410     }
411 }