1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.directory.mitosis.service;
21
22
23 import java.util.HashMap;
24 import java.util.Map;
25 import java.util.Timer;
26 import java.util.TimerTask;
27
28 import org.apache.directory.mitosis.common.Replica;
29 import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
30 import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientContextHandler;
31 import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientProtocolHandler;
32 import org.apache.directory.mitosis.service.protocol.handler.ReplicationContextHandler;
33 import org.apache.directory.mitosis.service.protocol.handler.ReplicationProtocolHandler;
34 import org.apache.directory.mitosis.service.protocol.message.BaseMessage;
35 import org.apache.directory.server.core.DirectoryService;
36 import org.apache.mina.common.IoSession;
37 import org.apache.mina.util.SessionLog;
38
39
40
41
42
43
44 public class DefaultReplicationContext implements ReplicationContext
45 {
46 private static final Timer EXPIRATION_TIMER = new Timer( "ReplicationMessageExpirer" );
47
48 private final ReplicationInterceptor interceptor;
49 private final ReplicationConfiguration configuration;
50 private final DirectoryService directoryService;
51 private final IoSession session;
52 private final Map<Integer,ExpirationTask> expirableMessages = new HashMap<Integer,ExpirationTask>();
53 private int nextSequence;
54 private Replica peer;
55 private State state = State.INIT;
56
57
58 public DefaultReplicationContext( ReplicationInterceptor interceptor, DirectoryService directoryService,
59 ReplicationConfiguration configuration, IoSession session )
60 {
61 this.interceptor = interceptor;
62 this.configuration = configuration;
63 this.directoryService = directoryService;
64 this.session = session;
65 }
66
67
68 public ReplicationInterceptor getService()
69 {
70 return interceptor;
71 }
72
73
74 public ReplicationConfiguration getConfiguration()
75 {
76 return configuration;
77 }
78
79
80 public DirectoryService getDirectoryService()
81 {
82 return directoryService;
83 }
84
85
86 public IoSession getSession()
87 {
88 return session;
89 }
90
91
92 public int getNextSequence()
93 {
94 return nextSequence++;
95 }
96
97
98 public Replica getPeer()
99 {
100 return peer;
101 }
102
103
104 public void setPeer( Replica peer )
105 {
106 assert peer != null;
107 this.peer = peer;
108 }
109
110
111 public State getState()
112 {
113 return state;
114 }
115
116
117 public void setState( State state )
118 {
119 this.state = state;
120 }
121
122
123 public void scheduleExpiration( Object message )
124 {
125 BaseMessage bm = ( BaseMessage ) message;
126 ExpirationTask task = new ExpirationTask( bm );
127 synchronized ( expirableMessages )
128 {
129 expirableMessages.put( bm.getSequence(), task );
130 }
131
132 EXPIRATION_TIMER.schedule( task, configuration.getResponseTimeout() * 1000L );
133 }
134
135
136 public Object cancelExpiration( int sequence )
137 {
138 ExpirationTask task = removeTask( sequence );
139 if ( task == null )
140 {
141 return null;
142 }
143
144 task.cancel();
145 return task.message;
146 }
147
148
149 public boolean replicate()
150 {
151 ReplicationProtocolHandler handler =
152 ( ReplicationProtocolHandler ) this.session.getHandler();
153 if( !( handler instanceof ReplicationClientProtocolHandler ) )
154 {
155 throw new UnsupportedOperationException(
156 "Only clients can begin replication." );
157 }
158
159 ReplicationContextHandler contextHandler = handler.getContextHandler();
160 return ( ( ReplicationClientContextHandler ) contextHandler ).beginReplication( this );
161 }
162
163
164 public void cancelAllExpirations()
165 {
166 synchronized ( expirableMessages )
167 {
168 for ( ExpirationTask expirationTask : expirableMessages.values() )
169 {
170 ( expirationTask ).cancel();
171 }
172 }
173 }
174
175
176 public int getScheduledExpirations()
177 {
178 synchronized ( expirableMessages )
179 {
180 return expirableMessages.size();
181 }
182 }
183
184
185 private ExpirationTask removeTask( int sequence )
186 {
187 ExpirationTask task;
188 synchronized ( expirableMessages )
189 {
190 task = expirableMessages.remove( sequence );
191 }
192 return task;
193 }
194
195
196 private class ExpirationTask extends TimerTask
197 {
198 private final BaseMessage message;
199
200
201 private ExpirationTask( Object message )
202 {
203 this.message = ( BaseMessage ) message;
204 }
205
206
207 public void run()
208 {
209 if ( removeTask( message.getSequence() ) == this )
210 {
211 SessionLog.warn( getSession(), "No response within " + configuration.getResponseTimeout()
212 + " second(s) for message #" + message.getSequence() );
213 getSession().close();
214 }
215 }
216 }
217 }