View Javadoc

1   /***************************************************************************************
2    * Copyright (c) Jonas Bonér, Alexandre Vasseur. All rights reserved.                 *
3    * http://aspectwerkz.codehaus.org                                                    *
4    * ---------------------------------------------------------------------------------- *
5    * The software in this package is published under the terms of the LGPL license      *
6    * a copy of which has been included with this distribution in the license.txt file.  *
7    **************************************************************************************/
8   package org.codehaus.aspectwerkz.connectivity;
9   
10  import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
11  import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
12  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
13  import org.codehaus.aspectwerkz.exception.WrappedRuntimeException;
14  
15  import java.io.FileInputStream;
16  import java.io.IOException;
17  import java.net.InetAddress;
18  import java.net.ServerSocket;
19  import java.net.Socket;
20  import java.util.Properties;
21  
22  /***
23   * Server that listens to a specified port for client requests. <p/>The implementation is based on sockets. <p/>The
24   * invoker spawns a specified number of listener threads in which each one of these spawns a new RemoteProxyServerThread
25   * for each client request that comes in. <p/>Uses a thread pool from util.concurrent.
26   * 
27   * @author <a href="mailto:jboner@codehaus.org">Jonas Bonér </a>
28   */
29  public class RemoteProxyServer implements Runnable {
30      private static String HOST_NAME;
31  
32      private static int PORT;
33  
34      private static boolean BOUNDED_THREAD_POOL;
35  
36      private static boolean LISTENER_THREAD_RUN_AS_DAEMON;
37  
38      private static int BACKLOG;
39  
40      private static int NUM_LISTENER_THREADS;
41  
42      private static int LISTENER_THREAD_PRIORITY = Thread.NORM_PRIORITY;
43  
44      private static int CLIENT_THREAD_TIMEOUT;
45  
46      private static int THREAD_POOL_MAX_SIZE;
47  
48      private static int THREAD_POOL_MIN_SIZE;
49  
50      private static int THREAD_POOL_INIT_SIZE;
51  
52      private static int THREAD_POOL_KEEP_ALIVE_TIME;
53  
54      private static boolean THREAD_POOL_WAIT_WHEN_BLOCKED;
55  
56      /***
57       * Initalize the server properties.
58       */
59      static {
60          Properties properties = new Properties();
61          try {
62              properties.load(new FileInputStream(System.getProperty("aspectwerkz.resource.bundle")));
63          } catch (Exception e) {
64              System.out.println("no aspectwerkz resource bundle found on classpath, using defaults");
65  
66              // ignore, use defaults
67          }
68          String property = properties.getProperty("remote.server.hostname");
69          if (property == null) {
70              HOST_NAME = property;
71          } else {
72              HOST_NAME = property;
73          }
74          property = properties.getProperty("remote.server.port");
75          if (property == null) {
76              PORT = 7777;
77          } else {
78              PORT = Integer.parseInt(property);
79          }
80          property = properties.getProperty("remote.server.listener.threads.backlog");
81          if (property == null) {
82              BACKLOG = 200;
83          } else {
84              BACKLOG = Integer.parseInt(property);
85          }
86          property = properties.getProperty("remote.server.listener.threads.nr");
87          if (property == null) {
88              NUM_LISTENER_THREADS = 10;
89          } else {
90              NUM_LISTENER_THREADS = Integer.parseInt(property);
91          }
92          property = properties.getProperty("remote.server.client.threads.timeout");
93          if (property == null) {
94              CLIENT_THREAD_TIMEOUT = 60000;
95          } else {
96              CLIENT_THREAD_TIMEOUT = Integer.parseInt(property);
97          }
98          property = properties.getProperty("remote.server.thread.pool.max.size");
99          if (property == null) {
100             THREAD_POOL_MAX_SIZE = 100;
101         } else {
102             THREAD_POOL_MAX_SIZE = Integer.parseInt(property);
103         }
104         property = properties.getProperty("remote.server.thread.pool.min.size");
105         if (property == null) {
106             THREAD_POOL_MIN_SIZE = 10;
107         } else {
108             THREAD_POOL_MIN_SIZE = Integer.parseInt(property);
109         }
110         property = properties.getProperty("remote.server.thread.pool.init.size");
111         if (property == null) {
112             THREAD_POOL_INIT_SIZE = 10;
113         } else {
114             THREAD_POOL_INIT_SIZE = Integer.parseInt(property);
115         }
116         property = properties.getProperty("remote.server.thread.pool.keep.alive.time");
117         if (property == null) {
118             THREAD_POOL_KEEP_ALIVE_TIME = 300000;
119         } else {
120             THREAD_POOL_KEEP_ALIVE_TIME = Integer.parseInt(property);
121         }
122         property = properties.getProperty("remote.server.thread.pool.type");
123         if ((property != null) && property.equals("dynamic")) {
124             BOUNDED_THREAD_POOL = false;
125         } else {
126             BOUNDED_THREAD_POOL = true;
127         }
128         property = properties.getProperty("remote.server.listener.threads.run.as.daemon");
129         if ((property != null) && property.equals("true")) {
130             LISTENER_THREAD_RUN_AS_DAEMON = true;
131         } else {
132             LISTENER_THREAD_RUN_AS_DAEMON = false;
133         }
134         property = properties.getProperty("remote.server.thread.pool.wait.when.blocked");
135         if ((property != null) && property.equals("true")) {
136             THREAD_POOL_WAIT_WHEN_BLOCKED = true;
137         } else {
138             THREAD_POOL_WAIT_WHEN_BLOCKED = false;
139         }
140     }
141 
142     /***
143      * The server socket.
144      */
145     private ServerSocket m_serverSocket = null;
146 
147     /***
148      * All listener threads.
149      */
150     private Thread[] m_listenerThreads = null;
151 
152     /***
153      * The thread pool.
154      */
155     private PooledExecutor m_threadPool = null;
156 
157     /***
158      * The class loader to use.
159      */
160     private ClassLoader m_loader = null;
161 
162     /***
163      * The invoker instance.
164      */
165     private Invoker m_invoker = null;
166 
167     /***
168      * Marks the server as running.
169      */
170     private boolean m_running = true;
171 
172     /***
173      * Starts a server object and starts listening for client access.
174      * 
175      * @param loader the classloader to use
176      * @param invoker the invoker that makes the method invocation in the client thread
177      */
178     public RemoteProxyServer(final ClassLoader loader, final Invoker invoker) {
179         m_invoker = invoker;
180         m_loader = loader;
181     }
182 
183     /***
184      * Starts up the proxy server.
185      */
186     public void start() {
187         m_running = true;
188         try {
189             InetAddress bindAddress = InetAddress.getByName(HOST_NAME);
190             m_serverSocket = new ServerSocket(PORT, BACKLOG, bindAddress);
191             if (BOUNDED_THREAD_POOL) {
192                 createBoundedThreadPool(
193                     THREAD_POOL_MAX_SIZE,
194                     THREAD_POOL_MIN_SIZE,
195                     THREAD_POOL_INIT_SIZE,
196                     THREAD_POOL_KEEP_ALIVE_TIME,
197                     THREAD_POOL_WAIT_WHEN_BLOCKED);
198             } else {
199                 createDynamicThreadPool(THREAD_POOL_MIN_SIZE, THREAD_POOL_INIT_SIZE, THREAD_POOL_KEEP_ALIVE_TIME);
200             }
201             m_listenerThreads = new Thread[NUM_LISTENER_THREADS];
202             for (int i = 0; i < NUM_LISTENER_THREADS; i++) {
203                 m_listenerThreads[i] = new Thread(this);
204                 m_listenerThreads[i].setName("AspectWerkz::Listener " + (i + 1));
205                 m_listenerThreads[i].setDaemon(LISTENER_THREAD_RUN_AS_DAEMON);
206                 m_listenerThreads[i].setPriority(LISTENER_THREAD_PRIORITY);
207                 m_listenerThreads[i].start();
208             }
209         } catch (IOException e) {
210             throw new WrappedRuntimeException(e);
211         }
212     }
213 
214     /***
215      * Stops the socket proxy server.
216      */
217     public void stop() {
218         m_running = false;
219         for (int i = 0; i < NUM_LISTENER_THREADS; i++) {
220             m_listenerThreads[i].interrupt();
221         }
222         m_threadPool.shutdownNow();
223     }
224 
225     /***
226      * Does the actual work of listening for a client request and spawns a new RemoteProxyServerThread to serve the
227      * client.
228      */
229     public void run() {
230         try {
231             while (m_running) {
232                 final Socket clientSocket = m_serverSocket.accept();
233                 synchronized (m_threadPool) {
234                     m_threadPool.execute(new RemoteProxyServerThread(
235                         clientSocket,
236                         m_loader,
237                         m_invoker,
238                         CLIENT_THREAD_TIMEOUT));
239                 }
240             }
241             m_serverSocket.close();
242         } catch (Exception e) {
243             throw new WrappedRuntimeException(e);
244         }
245     }
246 
247     /***
248      * Creates a new bounded thread pool.
249      * 
250      * @param threadPoolMaxSize
251      * @param threadPoolMinSize
252      * @param threadPoolInitSize
253      * @param keepAliveTime
254      * @param waitWhenBlocked
255      */
256     private void createBoundedThreadPool(
257         final int threadPoolMaxSize,
258         final int threadPoolMinSize,
259         final int threadPoolInitSize,
260         final int keepAliveTime,
261         final boolean waitWhenBlocked) {
262         m_threadPool = new PooledExecutor(new BoundedBuffer(threadPoolInitSize), threadPoolMaxSize);
263         m_threadPool.setKeepAliveTime(keepAliveTime);
264         m_threadPool.createThreads(threadPoolInitSize);
265         m_threadPool.setMinimumPoolSize(threadPoolMinSize);
266         if (waitWhenBlocked) {
267             m_threadPool.waitWhenBlocked();
268         }
269     }
270 
271     /***
272      * Creates a new dynamic thread pool
273      * 
274      * @param threadPoolMinSize
275      * @param threadPoolInitSize
276      * @param keepAliveTime
277      */
278     private void createDynamicThreadPool(
279         final int threadPoolMinSize,
280         final int threadPoolInitSize,
281         final int keepAliveTime) {
282         m_threadPool = new PooledExecutor(new LinkedQueue());
283         m_threadPool.setKeepAliveTime(keepAliveTime);
284         m_threadPool.createThreads(threadPoolInitSize);
285         m_threadPool.setMinimumPoolSize(threadPoolMinSize);
286     }
287 }