001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.http;
018    
019    import java.io.BufferedReader;
020    import java.io.DataOutputStream;
021    import java.io.IOException;
022    import java.util.HashMap;
023    import java.util.Map;
024    import java.util.concurrent.LinkedBlockingQueue;
025    import java.util.concurrent.TimeUnit;
026    
027    import javax.servlet.ServletException;
028    import javax.servlet.http.HttpServlet;
029    import javax.servlet.http.HttpServletRequest;
030    import javax.servlet.http.HttpServletResponse;
031    
032    import org.apache.activemq.command.Command;
033    import org.apache.activemq.command.WireFormatInfo;
034    import org.apache.activemq.transport.TransportAcceptListener;
035    import org.apache.activemq.transport.util.TextWireFormat;
036    import org.apache.activemq.transport.xstream.XStreamWireFormat;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    
040    /**
041     * A servlet which handles server side HTTP transport, delegating to the
042     * ActiveMQ broker. This servlet is designed for being embedded inside an
043     * ActiveMQ Broker using an embedded Jetty or Tomcat instance.
044     * 
045     * @version $Revision$
046     */
047    public class HttpTunnelServlet extends HttpServlet {
048        private static final long serialVersionUID = -3826714430767484333L;
049        private static final Log LOG = LogFactory.getLog(HttpTunnelServlet.class);
050    
051        private TransportAcceptListener listener;
052        private TextWireFormat wireFormat;
053        private Map<String, BlockingQueueTransport> clients = new HashMap<String, BlockingQueueTransport>();
054        private long requestTimeout = 30000L;
055    
056        public void init() throws ServletException {
057            super.init();
058            listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener");
059            if (listener == null) {
060                throw new ServletException("No such attribute 'acceptListener' available in the ServletContext");
061            }
062            wireFormat = (TextWireFormat)getServletContext().getAttribute("wireFormat");
063            if (wireFormat == null) {
064                wireFormat = createWireFormat();
065            }
066        }
067    
068        protected void doHead(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
069            createTransportChannel(request, response);
070        }
071    
072        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
073            // lets return the next response
074            Command packet = null;
075            int count = 0;
076            try {
077                BlockingQueueTransport transportChannel = getTransportChannel(request, response);
078                if (transportChannel == null) {
079                    return;
080                }
081    
082                packet = (Command)transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS);
083    
084                DataOutputStream stream = new DataOutputStream(response.getOutputStream());
085                // while( packet !=null ) {
086                wireFormat.marshal(packet, stream);
087                count++;
088                // packet = (Command) transportChannel.getQueue().poll(0,
089                // TimeUnit.MILLISECONDS);
090                // }
091    
092            } catch (InterruptedException ignore) {
093            }
094            if (count == 0) {
095                response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT);
096            }
097        }
098    
099        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
100    
101            // Read the command directly from the reader
102            Command command = (Command)wireFormat.unmarshalText(request.getReader());
103    
104            if (command instanceof WireFormatInfo) {
105                WireFormatInfo info = (WireFormatInfo)command;
106                if (!canProcessWireFormatVersion(info.getVersion())) {
107                    response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: " + info.getVersion());
108                }
109    
110            } else {
111    
112                BlockingQueueTransport transport = getTransportChannel(request, response);
113                if (transport == null) {
114                    return;
115                }
116    
117                transport.doConsume(command);
118            }
119        }
120    
121        private boolean canProcessWireFormatVersion(int version) {
122            // TODO:
123            return true;
124        }
125    
126        protected String readRequestBody(HttpServletRequest request) throws IOException {
127            StringBuffer buffer = new StringBuffer();
128            BufferedReader reader = request.getReader();
129            while (true) {
130                String line = reader.readLine();
131                if (line == null) {
132                    break;
133                } else {
134                    buffer.append(line);
135                    buffer.append("\n");
136                }
137            }
138            return buffer.toString();
139        }
140    
141        protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
142            String clientID = request.getHeader("clientID");
143            if (clientID == null) {
144                response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
145                LOG.warn("No clientID header specified");
146                return null;
147            }
148            synchronized (this) {
149                BlockingQueueTransport answer = clients.get(clientID);
150                if (answer == null) {
151                    LOG.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: " + clientID);
152                    return null;
153                }
154                return answer;
155            }
156        }
157    
158        protected BlockingQueueTransport createTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
159            String clientID = request.getHeader("clientID");
160    
161            if (clientID == null) {
162                response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
163                LOG.warn("No clientID header specified");
164                return null;
165            }
166    
167            synchronized (this) {
168                BlockingQueueTransport answer = clients.get(clientID);
169                if (answer != null) {
170                    response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '" + clientID + "' has allready been established");
171                    LOG.warn("A session for clientID '" + clientID + "' has allready been established");
172                    return null;
173                }
174    
175                answer = createTransportChannel();
176                clients.put(clientID, answer);
177                listener.onAccept(answer);
178                //wait for the transport to connect
179                while (!answer.isConnected()) {
180                    try {
181                            Thread.sleep(100);
182                    } catch (InterruptedException ignore) {
183                    }
184                }
185                return answer;
186            }
187        }
188    
189        protected BlockingQueueTransport createTransportChannel() {
190           return new BlockingQueueTransport(new LinkedBlockingQueue<Object>());
191        }
192    
193        protected TextWireFormat createWireFormat() {
194            return new XStreamWireFormat();
195        }
196    }