1 /***************************************************************************************
2 * Copyright (c) Jonas Bonér, Alexandre Vasseur. All rights reserved. *
3 * http://aspectwerkz.codehaus.org *
4 * ---------------------------------------------------------------------------------- *
5 * The software in this package is published under the terms of the LGPL license *
6 * a copy of which has been included with this distribution in the license.txt file. *
7 **************************************************************************************/
8 package org.codehaus.aspectwerkz.connectivity;
9
10 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
11 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
12 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
13 import org.codehaus.aspectwerkz.exception.WrappedRuntimeException;
14
15 import java.io.FileInputStream;
16 import java.io.IOException;
17 import java.net.InetAddress;
18 import java.net.ServerSocket;
19 import java.net.Socket;
20 import java.util.Properties;
21
22 /***
23 * Server that listens to a specified port for client requests. <p/>The implementation is based on sockets. <p/>The
24 * invoker spawns a specified number of listener threads in which each one of these spawns a new RemoteProxyServerThread
25 * for each client request that comes in. <p/>Uses a thread pool from util.concurrent.
26 *
27 * @author <a href="mailto:jboner@codehaus.org">Jonas Bonér </a>
28 */
29 public class RemoteProxyServer implements Runnable {
30 private static String HOST_NAME;
31
32 private static int PORT;
33
34 private static boolean BOUNDED_THREAD_POOL;
35
36 private static boolean LISTENER_THREAD_RUN_AS_DAEMON;
37
38 private static int BACKLOG;
39
40 private static int NUM_LISTENER_THREADS;
41
42 private static int LISTENER_THREAD_PRIORITY = Thread.NORM_PRIORITY;
43
44 private static int CLIENT_THREAD_TIMEOUT;
45
46 private static int THREAD_POOL_MAX_SIZE;
47
48 private static int THREAD_POOL_MIN_SIZE;
49
50 private static int THREAD_POOL_INIT_SIZE;
51
52 private static int THREAD_POOL_KEEP_ALIVE_TIME;
53
54 private static boolean THREAD_POOL_WAIT_WHEN_BLOCKED;
55
56 /***
57 * Initalize the server properties.
58 */
59 static {
60 Properties properties = new Properties();
61 try {
62 properties.load(new FileInputStream(System.getProperty("aspectwerkz.resource.bundle")));
63 } catch (Exception e) {
64 System.out.println("no aspectwerkz resource bundle found on classpath, using defaults");
65
66
67 }
68 String property = properties.getProperty("remote.server.hostname");
69 if (property == null) {
70 HOST_NAME = property;
71 } else {
72 HOST_NAME = property;
73 }
74 property = properties.getProperty("remote.server.port");
75 if (property == null) {
76 PORT = 7777;
77 } else {
78 PORT = Integer.parseInt(property);
79 }
80 property = properties.getProperty("remote.server.listener.threads.backlog");
81 if (property == null) {
82 BACKLOG = 200;
83 } else {
84 BACKLOG = Integer.parseInt(property);
85 }
86 property = properties.getProperty("remote.server.listener.threads.nr");
87 if (property == null) {
88 NUM_LISTENER_THREADS = 10;
89 } else {
90 NUM_LISTENER_THREADS = Integer.parseInt(property);
91 }
92 property = properties.getProperty("remote.server.client.threads.timeout");
93 if (property == null) {
94 CLIENT_THREAD_TIMEOUT = 60000;
95 } else {
96 CLIENT_THREAD_TIMEOUT = Integer.parseInt(property);
97 }
98 property = properties.getProperty("remote.server.thread.pool.max.size");
99 if (property == null) {
100 THREAD_POOL_MAX_SIZE = 100;
101 } else {
102 THREAD_POOL_MAX_SIZE = Integer.parseInt(property);
103 }
104 property = properties.getProperty("remote.server.thread.pool.min.size");
105 if (property == null) {
106 THREAD_POOL_MIN_SIZE = 10;
107 } else {
108 THREAD_POOL_MIN_SIZE = Integer.parseInt(property);
109 }
110 property = properties.getProperty("remote.server.thread.pool.init.size");
111 if (property == null) {
112 THREAD_POOL_INIT_SIZE = 10;
113 } else {
114 THREAD_POOL_INIT_SIZE = Integer.parseInt(property);
115 }
116 property = properties.getProperty("remote.server.thread.pool.keep.alive.time");
117 if (property == null) {
118 THREAD_POOL_KEEP_ALIVE_TIME = 300000;
119 } else {
120 THREAD_POOL_KEEP_ALIVE_TIME = Integer.parseInt(property);
121 }
122 property = properties.getProperty("remote.server.thread.pool.type");
123 if ((property != null) && property.equals("dynamic")) {
124 BOUNDED_THREAD_POOL = false;
125 } else {
126 BOUNDED_THREAD_POOL = true;
127 }
128 property = properties.getProperty("remote.server.listener.threads.run.as.daemon");
129 if ((property != null) && property.equals("true")) {
130 LISTENER_THREAD_RUN_AS_DAEMON = true;
131 } else {
132 LISTENER_THREAD_RUN_AS_DAEMON = false;
133 }
134 property = properties.getProperty("remote.server.thread.pool.wait.when.blocked");
135 if ((property != null) && property.equals("true")) {
136 THREAD_POOL_WAIT_WHEN_BLOCKED = true;
137 } else {
138 THREAD_POOL_WAIT_WHEN_BLOCKED = false;
139 }
140 }
141
142 /***
143 * The server socket.
144 */
145 private ServerSocket m_serverSocket = null;
146
147 /***
148 * All listener threads.
149 */
150 private Thread[] m_listenerThreads = null;
151
152 /***
153 * The thread pool.
154 */
155 private PooledExecutor m_threadPool = null;
156
157 /***
158 * The class loader to use.
159 */
160 private ClassLoader m_loader = null;
161
162 /***
163 * The invoker instance.
164 */
165 private Invoker m_invoker = null;
166
167 /***
168 * Marks the server as running.
169 */
170 private boolean m_running = true;
171
172 /***
173 * Starts a server object and starts listening for client access.
174 *
175 * @param loader the classloader to use
176 * @param invoker the invoker that makes the method invocation in the client thread
177 */
178 public RemoteProxyServer(final ClassLoader loader, final Invoker invoker) {
179 m_invoker = invoker;
180 m_loader = loader;
181 }
182
183 /***
184 * Starts up the proxy server.
185 */
186 public void start() {
187 m_running = true;
188 try {
189 InetAddress bindAddress = InetAddress.getByName(HOST_NAME);
190 m_serverSocket = new ServerSocket(PORT, BACKLOG, bindAddress);
191 if (BOUNDED_THREAD_POOL) {
192 createBoundedThreadPool(
193 THREAD_POOL_MAX_SIZE,
194 THREAD_POOL_MIN_SIZE,
195 THREAD_POOL_INIT_SIZE,
196 THREAD_POOL_KEEP_ALIVE_TIME,
197 THREAD_POOL_WAIT_WHEN_BLOCKED);
198 } else {
199 createDynamicThreadPool(THREAD_POOL_MIN_SIZE, THREAD_POOL_INIT_SIZE, THREAD_POOL_KEEP_ALIVE_TIME);
200 }
201 m_listenerThreads = new Thread[NUM_LISTENER_THREADS];
202 for (int i = 0; i < NUM_LISTENER_THREADS; i++) {
203 m_listenerThreads[i] = new Thread(this);
204 m_listenerThreads[i].setName("AspectWerkz::Listener " + (i + 1));
205 m_listenerThreads[i].setDaemon(LISTENER_THREAD_RUN_AS_DAEMON);
206 m_listenerThreads[i].setPriority(LISTENER_THREAD_PRIORITY);
207 m_listenerThreads[i].start();
208 }
209 } catch (IOException e) {
210 throw new WrappedRuntimeException(e);
211 }
212 }
213
214 /***
215 * Stops the socket proxy server.
216 */
217 public void stop() {
218 m_running = false;
219 for (int i = 0; i < NUM_LISTENER_THREADS; i++) {
220 m_listenerThreads[i].interrupt();
221 }
222 m_threadPool.shutdownNow();
223 }
224
225 /***
226 * Does the actual work of listening for a client request and spawns a new RemoteProxyServerThread to serve the
227 * client.
228 */
229 public void run() {
230 try {
231 while (m_running) {
232 final Socket clientSocket = m_serverSocket.accept();
233 synchronized (m_threadPool) {
234 m_threadPool.execute(new RemoteProxyServerThread(
235 clientSocket,
236 m_loader,
237 m_invoker,
238 CLIENT_THREAD_TIMEOUT));
239 }
240 }
241 m_serverSocket.close();
242 } catch (Exception e) {
243 throw new WrappedRuntimeException(e);
244 }
245 }
246
247 /***
248 * Creates a new bounded thread pool.
249 *
250 * @param threadPoolMaxSize
251 * @param threadPoolMinSize
252 * @param threadPoolInitSize
253 * @param keepAliveTime
254 * @param waitWhenBlocked
255 */
256 private void createBoundedThreadPool(
257 final int threadPoolMaxSize,
258 final int threadPoolMinSize,
259 final int threadPoolInitSize,
260 final int keepAliveTime,
261 final boolean waitWhenBlocked) {
262 m_threadPool = new PooledExecutor(new BoundedBuffer(threadPoolInitSize), threadPoolMaxSize);
263 m_threadPool.setKeepAliveTime(keepAliveTime);
264 m_threadPool.createThreads(threadPoolInitSize);
265 m_threadPool.setMinimumPoolSize(threadPoolMinSize);
266 if (waitWhenBlocked) {
267 m_threadPool.waitWhenBlocked();
268 }
269 }
270
271 /***
272 * Creates a new dynamic thread pool
273 *
274 * @param threadPoolMinSize
275 * @param threadPoolInitSize
276 * @param keepAliveTime
277 */
278 private void createDynamicThreadPool(
279 final int threadPoolMinSize,
280 final int threadPoolInitSize,
281 final int keepAliveTime) {
282 m_threadPool = new PooledExecutor(new LinkedQueue());
283 m_threadPool.setKeepAliveTime(keepAliveTime);
284 m_threadPool.createThreads(threadPoolInitSize);
285 m_threadPool.setMinimumPoolSize(threadPoolMinSize);
286 }
287 }