View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.directory.mitosis.service;
21  
22  
23  import java.util.HashMap;
24  import java.util.Map;
25  import java.util.Timer;
26  import java.util.TimerTask;
27  
28  import org.apache.directory.mitosis.common.Replica;
29  import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
30  import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientContextHandler;
31  import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientProtocolHandler;
32  import org.apache.directory.mitosis.service.protocol.handler.ReplicationContextHandler;
33  import org.apache.directory.mitosis.service.protocol.handler.ReplicationProtocolHandler;
34  import org.apache.directory.mitosis.service.protocol.message.BaseMessage;
35  import org.apache.directory.server.core.DirectoryService;
36  import org.apache.mina.common.IoSession;
37  import org.apache.mina.util.SessionLog;
38  
39  /**
40   * The default implementation of {@link ReplicationContext}
41   * 
42   * @author The Apache Directory Project Team
43   */
44  public class DefaultReplicationContext implements ReplicationContext
45  {
46      private static final Timer EXPIRATION_TIMER = new Timer( "ReplicationMessageExpirer" );
47  
48      private final ReplicationInterceptor interceptor;
49      private final ReplicationConfiguration configuration;
50      private final DirectoryService directoryService;
51      private final IoSession session;
52      private final Map<Integer,ExpirationTask> expirableMessages = new HashMap<Integer,ExpirationTask>();
53      private int nextSequence;
54      private Replica peer;
55      private State state = State.INIT;
56  
57  
58      public DefaultReplicationContext( ReplicationInterceptor interceptor, DirectoryService directoryService,
59          ReplicationConfiguration configuration, IoSession session )
60      {
61          this.interceptor = interceptor;
62          this.configuration = configuration;
63          this.directoryService = directoryService;
64          this.session = session;
65      }
66  
67  
68      public ReplicationInterceptor getService()
69      {
70          return interceptor;
71      }
72  
73  
74      public ReplicationConfiguration getConfiguration()
75      {
76          return configuration;
77      }
78  
79  
80      public DirectoryService getDirectoryService()
81      {
82          return directoryService;
83      }
84  
85  
86      public IoSession getSession()
87      {
88          return session;
89      }
90  
91  
92      public int getNextSequence()
93      {
94          return nextSequence++;
95      }
96  
97  
98      public Replica getPeer()
99      {
100         return peer;
101     }
102 
103 
104     public void setPeer( Replica peer )
105     {
106         assert peer != null;
107         this.peer = peer;
108     }
109 
110 
111     public State getState()
112     {
113         return state;
114     }
115 
116 
117     public void setState( State state )
118     {
119         this.state = state;
120     }
121 
122 
123     public void scheduleExpiration( Object message )
124     {
125         BaseMessage bm = ( BaseMessage ) message;
126         ExpirationTask task = new ExpirationTask( bm );
127         synchronized ( expirableMessages )
128         {
129             expirableMessages.put( bm.getSequence(), task );
130         }
131 
132         EXPIRATION_TIMER.schedule( task, configuration.getResponseTimeout() * 1000L );
133     }
134 
135 
136     public Object cancelExpiration( int sequence )
137     {
138         ExpirationTask task = removeTask( sequence );
139         if ( task == null )
140         {
141             return null;
142         }
143 
144         task.cancel();
145         return task.message;
146     }
147 
148     
149     public boolean replicate()
150     {
151         ReplicationProtocolHandler handler =
152             ( ReplicationProtocolHandler ) this.session.getHandler();
153         if( !( handler instanceof ReplicationClientProtocolHandler ) )
154         {
155             throw new UnsupportedOperationException(
156                     "Only clients can begin replication." );
157         }
158         
159         ReplicationContextHandler contextHandler = handler.getContextHandler();
160         return ( ( ReplicationClientContextHandler ) contextHandler ).beginReplication( this );
161     }
162 
163 
164     public void cancelAllExpirations()
165     {
166         synchronized ( expirableMessages )
167         {
168             for ( ExpirationTask expirationTask : expirableMessages.values() )
169             {
170                 ( expirationTask ).cancel();
171             }
172         }
173     }
174 
175 
176     public int getScheduledExpirations()
177     {
178         synchronized ( expirableMessages )
179         {
180             return expirableMessages.size();
181         }
182     }
183 
184 
185     private ExpirationTask removeTask( int sequence )
186     {
187         ExpirationTask task;
188         synchronized ( expirableMessages )
189         {
190             task = expirableMessages.remove( sequence );
191         }
192         return task;
193     }
194 
195 
196     private class ExpirationTask extends TimerTask
197     {
198         private final BaseMessage message;
199 
200 
201         private ExpirationTask( Object message )
202         {
203             this.message = ( BaseMessage ) message;
204         }
205 
206 
207         public void run()
208         {
209             if ( removeTask( message.getSequence() ) == this )
210             {
211                 SessionLog.warn( getSession(), "No response within " + configuration.getResponseTimeout()
212                     + " second(s) for message #" + message.getSequence() );
213                 getSession().close();
214             }
215         }
216     }
217 }