001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.web; 019 020 import java.io.IOException; 021 import java.io.PrintWriter; 022 import java.util.HashMap; 023 import java.util.LinkedList; 024 import java.util.List; 025 026 import javax.jms.Destination; 027 import javax.jms.JMSException; 028 import javax.jms.Message; 029 import javax.jms.MessageConsumer; 030 import javax.jms.ObjectMessage; 031 import javax.jms.TextMessage; 032 import javax.servlet.ServletConfig; 033 import javax.servlet.ServletContext; 034 import javax.servlet.ServletException; 035 import javax.servlet.http.HttpServletRequest; 036 import javax.servlet.http.HttpServletResponse; 037 038 import org.apache.activemq.MessageAvailableConsumer; 039 import org.apache.activemq.MessageAvailableListener; 040 import org.apache.activemq.camel.converter.ActiveMQMessageConverter; 041 import org.apache.activemq.command.ActiveMQDestination; 042 import org.apache.activemq.command.ActiveMQTextMessage; 043 import org.apache.camel.Endpoint; 044 import org.apache.camel.Exchange; 045 import org.apache.camel.ExchangePattern; 046 import org.apache.camel.Producer; 047 import org.apache.commons.logging.Log; 048 import org.apache.commons.logging.LogFactory; 049 import org.mortbay.util.ajax.Continuation; 050 import org.mortbay.util.ajax.ContinuationSupport; 051 052 /** 053 * A servlet for sending and receiving messages to/from JMS destinations using 054 * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the 055 * destination and whether it is a topic or queue via configuration details on 056 * the servlet or as request parameters. <p/> For reading messages you can 057 * specify a readTimeout parameter to determine how long the servlet should 058 * block for. 059 * 060 * @version $Revision: 1.1.1.1 $ 061 */ 062 public class MessageServlet extends MessageServletSupport { 063 private static final Log LOG = LogFactory.getLog(MessageServlet.class); 064 065 private String readTimeoutParameter = "readTimeout"; 066 private long defaultReadTimeout = -1; 067 private long maximumReadTimeout = 20000; 068 private long requestTimeout = 1000; 069 070 private HashMap<String, WebClient> clients = new HashMap<String, WebClient>(); 071 072 public void init() throws ServletException { 073 ServletConfig servletConfig = getServletConfig(); 074 String name = servletConfig.getInitParameter("defaultReadTimeout"); 075 if (name != null) { 076 defaultReadTimeout = asLong(name); 077 } 078 name = servletConfig.getInitParameter("maximumReadTimeout"); 079 if (name != null) { 080 maximumReadTimeout = asLong(name); 081 } 082 name = servletConfig.getInitParameter("replyTimeout"); 083 if (name != null) { 084 requestTimeout = asLong(name); 085 } 086 } 087 088 /** 089 * Sends a message to a destination 090 * 091 * @param request 092 * @param response 093 * @throws ServletException 094 * @throws IOException 095 */ 096 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 097 // lets turn the HTTP post into a JMS Message 098 try { 099 WebClient client = getWebClient(request); 100 101 String text = getPostedMessageBody(request); 102 103 // lets create the destination from the URI? 104 Destination destination = getDestination(client, request); 105 if (destination == null) { 106 throw new NoDestinationSuppliedException(); 107 } 108 109 if (LOG.isDebugEnabled()) { 110 LOG.debug("Sending message to: " + destination + " with text: " + text); 111 } 112 113 boolean sync = isSync(request); 114 TextMessage message = client.getSession().createTextMessage(text); 115 116 if (sync) { 117 String point = "activemq:" 118 + ((ActiveMQDestination)destination).getPhysicalName().replace("//", "") 119 + "?requestTimeout=" + requestTimeout; 120 try { 121 String body = (String)client.getProducerTemplate().requestBody(point, text); 122 ActiveMQTextMessage answer = new ActiveMQTextMessage(); 123 answer.setText(body); 124 writeMessageResponse(response.getWriter(), answer); 125 } catch (Exception e) { 126 IOException ex = new IOException(); 127 ex.initCause(e); 128 throw ex; 129 } 130 } else { 131 appendParametersToMessage(request, message); 132 boolean persistent = isSendPersistent(request); 133 int priority = getSendPriority(request); 134 long timeToLive = getSendTimeToLive(request); 135 client.send(destination, message, persistent, priority, timeToLive); 136 } 137 138 // lets return a unique URI for reliable messaging 139 response.setHeader("messageID", message.getJMSMessageID()); 140 response.setStatus(HttpServletResponse.SC_OK); 141 } catch (JMSException e) { 142 throw new ServletException("Could not post JMS message: " + e, e); 143 } 144 } 145 146 /** 147 * Supports a HTTP DELETE to be equivlanent of consuming a singe message 148 * from a queue 149 */ 150 protected void doDelete(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 151 doMessages(request, response, 1); 152 } 153 154 /** 155 * Supports a HTTP DELETE to be equivlanent of consuming a singe message 156 * from a queue 157 */ 158 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 159 doMessages(request, response, -1); 160 } 161 162 /** 163 * Reads a message from a destination up to some specific timeout period 164 * 165 * @param request 166 * @param response 167 * @throws ServletException 168 * @throws IOException 169 */ 170 protected void doMessages(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException, IOException { 171 172 int messages = 0; 173 try { 174 WebClient client = getWebClient(request); 175 Destination destination = getDestination(client, request); 176 if (destination == null) { 177 throw new NoDestinationSuppliedException(); 178 } 179 long timeout = getReadTimeout(request); 180 boolean ajax = isRicoAjax(request); 181 if (!ajax) { 182 maxMessages = 1; 183 } 184 185 if (LOG.isDebugEnabled()) { 186 LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout); 187 } 188 189 MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination); 190 Continuation continuation = null; 191 Listener listener = null; 192 Message message = null; 193 194 synchronized (consumer) { 195 // Fetch the listeners 196 listener = (Listener)consumer.getAvailableListener(); 197 if (listener == null) { 198 listener = new Listener(consumer); 199 consumer.setAvailableListener(listener); 200 } 201 // Look for any available messages 202 message = consumer.receiveNoWait(); 203 204 // Get an existing Continuation or create a new one if there are 205 // no events. 206 if (message == null) { 207 continuation = ContinuationSupport.getContinuation(request, consumer); 208 209 // register this continuation with our listener. 210 listener.setContinuation(continuation); 211 212 // Get the continuation object (may wait and/or retry 213 // request here). 214 continuation.suspend(timeout); 215 } 216 217 // Try again now 218 if (message == null) { 219 message = consumer.receiveNoWait(); 220 } 221 222 // write a responds 223 response.setContentType("text/xml"); 224 PrintWriter writer = response.getWriter(); 225 226 if (ajax) { 227 writer.println("<ajax-response>"); 228 } 229 230 // handle any message(s) 231 if (message == null) { 232 // No messages so OK response of for ajax else no content. 233 response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT); 234 } else { 235 // We have at least one message so set up the response 236 response.setStatus(HttpServletResponse.SC_OK); 237 String type = getContentType(request); 238 if (type != null) { 239 response.setContentType(type); 240 } 241 242 // send a response for each available message (up to max 243 // messages) 244 while ((maxMessages < 0 || messages < maxMessages) && message != null) { 245 if (ajax) { 246 writer.print("<response type='object' id='"); 247 writer.print(request.getParameter("id")); 248 writer.println("'>"); 249 } else { 250 // only ever 1 message for non ajax! 251 setResponseHeaders(response, message); 252 } 253 254 writeMessageResponse(writer, message); 255 256 if (ajax) { 257 writer.println("</response>"); 258 } 259 260 // look for next message 261 messages++; 262 if(maxMessages < 0 || messages < maxMessages) { 263 message = consumer.receiveNoWait(); 264 } 265 } 266 } 267 268 if (ajax) { 269 writer.println("<response type='object' id='poll'><ok/></response>"); 270 writer.println("</ajax-response>"); 271 } 272 } 273 } catch (JMSException e) { 274 throw new ServletException("Could not post JMS message: " + e, e); 275 } finally { 276 if (LOG.isDebugEnabled()) { 277 LOG.debug("Received " + messages + " message(s)"); 278 } 279 } 280 } 281 282 /** 283 * Reads a message from a destination up to some specific timeout period 284 * 285 * @param request 286 * @param response 287 * @throws ServletException 288 * @throws IOException 289 */ 290 protected void doMessagesWithoutContinuation(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException, IOException { 291 292 int messages = 0; 293 try { 294 WebClient client = getWebClient(request); 295 Destination destination = getDestination(client, request); 296 long timeout = getReadTimeout(request); 297 boolean ajax = isRicoAjax(request); 298 if (!ajax) { 299 maxMessages = 1; 300 } 301 if (LOG.isDebugEnabled()) { 302 LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout); 303 } 304 305 MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination); 306 Message message = null; 307 308 // write a responds 309 response.setContentType("text/xml"); 310 PrintWriter writer = response.getWriter(); 311 312 if (ajax) { 313 writer.println("<ajax-response>"); 314 } 315 316 // Only one client thread at a time should poll for messages. 317 if (client.getSemaphore().tryAcquire()) { 318 try { 319 // Look for any available messages 320 message = consumer.receive(timeout); 321 322 // handle any message(s) 323 if (message == null) { 324 // No messages so OK response of for ajax else no 325 // content. 326 response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT); 327 } else { 328 // We have at least one message so set up the 329 // response 330 response.setStatus(HttpServletResponse.SC_OK); 331 String type = getContentType(request); 332 if (type != null) { 333 response.setContentType(type); 334 } 335 336 // send a response for each available message (up to 337 // max 338 // messages) 339 while ((maxMessages < 0 || messages < maxMessages) && message != null) { 340 if (ajax) { 341 writer.print("<response type='object' id='"); 342 writer.print(request.getParameter("id")); 343 writer.println("'>"); 344 } else { 345 // only ever 1 message for non ajax! 346 setResponseHeaders(response, message); 347 } 348 349 writeMessageResponse(writer, message); 350 351 if (ajax) { 352 writer.println("</response>"); 353 } 354 355 // look for next message 356 messages++; 357 if(maxMessages < 0 || messages < maxMessages) { 358 message = consumer.receiveNoWait(); 359 } 360 361 } 362 } 363 } finally { 364 client.getSemaphore().release(); 365 } 366 } else { 367 // Client is using us in another thread. 368 response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT); 369 } 370 371 if (ajax) { 372 writer.println("<response type='object' id='poll'><ok/></response>"); 373 writer.println("</ajax-response>"); 374 } 375 376 } catch (JMSException e) { 377 throw new ServletException("Could not post JMS message: " + e, e); 378 } finally { 379 if (LOG.isDebugEnabled()) { 380 LOG.debug("Received " + messages + " message(s)"); 381 } 382 } 383 } 384 385 protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException { 386 if (message instanceof TextMessage) { 387 TextMessage textMsg = (TextMessage)message; 388 String txt = textMsg.getText(); 389 if (txt.startsWith("<?")) { 390 txt = txt.substring(txt.indexOf("?>") + 2); 391 } 392 writer.print(txt); 393 } else if (message instanceof ObjectMessage) { 394 ObjectMessage objectMsg = (ObjectMessage)message; 395 Object object = objectMsg.getObject(); 396 writer.print(object.toString()); 397 } 398 } 399 400 protected boolean isRicoAjax(HttpServletRequest request) { 401 String rico = request.getParameter("rico"); 402 return rico != null && rico.equals("true"); 403 } 404 405 public WebClient getWebClient(HttpServletRequest request) { 406 String clientId = request.getParameter("clientId"); 407 if (clientId != null) { 408 synchronized(this) { 409 LOG.debug("Getting local client [" + clientId + "]"); 410 WebClient client = clients.get(clientId); 411 if (client == null) { 412 LOG.debug("Creating new client [" + clientId + "]"); 413 client = new WebClient(); 414 clients.put(clientId, client); 415 } 416 return client; 417 } 418 419 } else { 420 return WebClient.getWebClient(request); 421 } 422 } 423 424 protected String getContentType(HttpServletRequest request) { 425 /* 426 * log("Params: " + request.getParameterMap()); Enumeration iter = 427 * request.getHeaderNames(); while (iter.hasMoreElements()) { String 428 * name = (String) iter.nextElement(); log("Header: " + name + " = " + 429 * request.getHeader(name)); } 430 */ 431 String value = request.getParameter("xml"); 432 if (value != null && "true".equalsIgnoreCase(value)) { 433 return "text/xml"; 434 } 435 return null; 436 } 437 438 protected void setResponseHeaders(HttpServletResponse response, Message message) throws JMSException { 439 response.setHeader("destination", message.getJMSDestination().toString()); 440 response.setHeader("id", message.getJMSMessageID()); 441 } 442 443 /** 444 * @return the timeout value for read requests which is always >= 0 and <= 445 * maximumReadTimeout to avoid DoS attacks 446 */ 447 protected long getReadTimeout(HttpServletRequest request) { 448 long answer = defaultReadTimeout; 449 450 String name = request.getParameter(readTimeoutParameter); 451 if (name != null) { 452 answer = asLong(name); 453 } 454 if (answer < 0 || answer > maximumReadTimeout) { 455 answer = maximumReadTimeout; 456 } 457 return answer; 458 } 459 460 /* 461 * Listen for available messages and wakeup any continuations. 462 */ 463 private static class Listener implements MessageAvailableListener { 464 MessageConsumer consumer; 465 Continuation continuation; 466 List queue = new LinkedList(); 467 468 Listener(MessageConsumer consumer) { 469 this.consumer = consumer; 470 } 471 472 public void setContinuation(Continuation continuation) { 473 synchronized (consumer) { 474 this.continuation = continuation; 475 } 476 } 477 478 public void onMessageAvailable(MessageConsumer consumer) { 479 assert this.consumer == consumer; 480 481 synchronized (this.consumer) { 482 if (continuation != null) { 483 continuation.resume(); 484 } 485 continuation = null; 486 } 487 } 488 } 489 490 }