001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.http; 018 019 import java.io.BufferedReader; 020 import java.io.DataOutputStream; 021 import java.io.IOException; 022 import java.util.HashMap; 023 import java.util.Map; 024 import java.util.concurrent.LinkedBlockingQueue; 025 import java.util.concurrent.TimeUnit; 026 027 import javax.servlet.ServletException; 028 import javax.servlet.http.HttpServlet; 029 import javax.servlet.http.HttpServletRequest; 030 import javax.servlet.http.HttpServletResponse; 031 032 import org.apache.activemq.command.Command; 033 import org.apache.activemq.command.WireFormatInfo; 034 import org.apache.activemq.transport.TransportAcceptListener; 035 import org.apache.activemq.transport.util.TextWireFormat; 036 import org.apache.activemq.transport.xstream.XStreamWireFormat; 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 040 /** 041 * A servlet which handles server side HTTP transport, delegating to the 042 * ActiveMQ broker. This servlet is designed for being embedded inside an 043 * ActiveMQ Broker using an embedded Jetty or Tomcat instance. 044 * 045 * @version $Revision$ 046 */ 047 public class HttpTunnelServlet extends HttpServlet { 048 private static final long serialVersionUID = -3826714430767484333L; 049 private static final Log LOG = LogFactory.getLog(HttpTunnelServlet.class); 050 051 private TransportAcceptListener listener; 052 private TextWireFormat wireFormat; 053 private Map<String, BlockingQueueTransport> clients = new HashMap<String, BlockingQueueTransport>(); 054 private long requestTimeout = 30000L; 055 056 public void init() throws ServletException { 057 super.init(); 058 listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener"); 059 if (listener == null) { 060 throw new ServletException("No such attribute 'acceptListener' available in the ServletContext"); 061 } 062 wireFormat = (TextWireFormat)getServletContext().getAttribute("wireFormat"); 063 if (wireFormat == null) { 064 wireFormat = createWireFormat(); 065 } 066 } 067 068 protected void doHead(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 069 createTransportChannel(request, response); 070 } 071 072 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 073 // lets return the next response 074 Command packet = null; 075 int count = 0; 076 try { 077 BlockingQueueTransport transportChannel = getTransportChannel(request, response); 078 if (transportChannel == null) { 079 return; 080 } 081 082 packet = (Command)transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS); 083 084 DataOutputStream stream = new DataOutputStream(response.getOutputStream()); 085 // while( packet !=null ) { 086 wireFormat.marshal(packet, stream); 087 count++; 088 // packet = (Command) transportChannel.getQueue().poll(0, 089 // TimeUnit.MILLISECONDS); 090 // } 091 092 } catch (InterruptedException ignore) { 093 } 094 if (count == 0) { 095 response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT); 096 } 097 } 098 099 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 100 101 // Read the command directly from the reader 102 Command command = (Command)wireFormat.unmarshalText(request.getReader()); 103 104 if (command instanceof WireFormatInfo) { 105 WireFormatInfo info = (WireFormatInfo)command; 106 if (!canProcessWireFormatVersion(info.getVersion())) { 107 response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: " + info.getVersion()); 108 } 109 110 } else { 111 112 BlockingQueueTransport transport = getTransportChannel(request, response); 113 if (transport == null) { 114 return; 115 } 116 117 transport.doConsume(command); 118 } 119 } 120 121 private boolean canProcessWireFormatVersion(int version) { 122 // TODO: 123 return true; 124 } 125 126 protected String readRequestBody(HttpServletRequest request) throws IOException { 127 StringBuffer buffer = new StringBuffer(); 128 BufferedReader reader = request.getReader(); 129 while (true) { 130 String line = reader.readLine(); 131 if (line == null) { 132 break; 133 } else { 134 buffer.append(line); 135 buffer.append("\n"); 136 } 137 } 138 return buffer.toString(); 139 } 140 141 protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException { 142 String clientID = request.getHeader("clientID"); 143 if (clientID == null) { 144 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified"); 145 LOG.warn("No clientID header specified"); 146 return null; 147 } 148 synchronized (this) { 149 BlockingQueueTransport answer = clients.get(clientID); 150 if (answer == null) { 151 LOG.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: " + clientID); 152 return null; 153 } 154 return answer; 155 } 156 } 157 158 protected BlockingQueueTransport createTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException { 159 String clientID = request.getHeader("clientID"); 160 161 if (clientID == null) { 162 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified"); 163 LOG.warn("No clientID header specified"); 164 return null; 165 } 166 167 synchronized (this) { 168 BlockingQueueTransport answer = clients.get(clientID); 169 if (answer != null) { 170 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '" + clientID + "' has allready been established"); 171 LOG.warn("A session for clientID '" + clientID + "' has allready been established"); 172 return null; 173 } 174 175 answer = createTransportChannel(); 176 clients.put(clientID, answer); 177 listener.onAccept(answer); 178 //wait for the transport to connect 179 while (!answer.isConnected()) { 180 try { 181 Thread.sleep(100); 182 } catch (InterruptedException ignore) { 183 } 184 } 185 return answer; 186 } 187 } 188 189 protected BlockingQueueTransport createTransportChannel() { 190 return new BlockingQueueTransport(new LinkedBlockingQueue<Object>()); 191 } 192 193 protected TextWireFormat createWireFormat() { 194 return new XStreamWireFormat(); 195 } 196 }