001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.transport.http;
019    
020    import java.io.BufferedReader;
021    import java.io.DataOutputStream;
022    import java.io.IOException;
023    import java.util.HashMap;
024    import java.util.Map;
025    
026    import javax.jms.JMSException;
027    import javax.servlet.ServletException;
028    import javax.servlet.http.HttpServlet;
029    import javax.servlet.http.HttpServletRequest;
030    import javax.servlet.http.HttpServletResponse;
031    
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    import org.activemq.io.TextWireFormat;
035    import org.activemq.message.Packet;
036    import org.activemq.message.WireFormatInfo;
037    import org.activemq.message.PacketListener;
038    import org.activemq.transport.TransportChannelListener;
039    import org.activemq.transport.xstream.XStreamWireFormat;
040    import org.activemq.util.JMSExceptionHelper;
041    
042    import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
043    
044    /**
045     * A servlet which handles server side HTTP transport, delegaging to the ActiveMQ broker.
046     * This servlet is designed for being embedded inside an ActiveMQ Broker using an embedded
047     * Jetty or Tomcat instance.
048     *
049     * @version $Revision$
050     */
051    public class HttpTunnelServlet extends HttpServlet {
052    
053        private static final Log log = LogFactory.getLog(HttpTunnelServlet.class);
054    
055        private TransportChannelListener listener;
056        private TextWireFormat wireFormat;
057        private Map clients = new HashMap();
058        private long requestTimeout = 30000L;
059    
060        public void init() throws ServletException {
061            super.init();
062            listener = (TransportChannelListener) getServletContext().getAttribute("transportChannelListener");
063            if (listener == null) {
064                throw new ServletException("No such attribute 'transportChannelListener' available in the ServletContext");
065            }
066            wireFormat = (TextWireFormat) getServletContext().getAttribute("wireFormat");
067            if (wireFormat == null) {
068                wireFormat = createWireFormat();
069            }
070        }
071    
072        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
073            // lets return the next response
074            Packet packet = null;
075            try {
076                HttpServerTransportChannel transportChannel = getTransportChannel(request);
077                if (transportChannel == null) {
078                    return;
079                }
080                packet = (Packet) transportChannel.getChannel().poll(requestTimeout);
081            }
082            catch (InterruptedException e) {
083                // ignore
084            }
085            if (packet == null) {
086                response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT);
087            }
088            else {
089                try {
090                    wireFormat.writePacket(packet, new DataOutputStream(response.getOutputStream()));
091                }
092                catch (JMSException e) {
093                    throw JMSExceptionHelper.newIOException(e);
094                }
095            }
096        }
097    
098        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
099            try {
100                Packet packet = wireFormat.fromString(readRequestBody(request));
101                
102                if( packet.getPacketType() == Packet.WIRE_FORMAT_INFO ) {
103                    
104                    // Can we handle the requested wire format?
105                    WireFormatInfo info = (WireFormatInfo) packet;
106                    if (!canProcessWireFormatVersion(info.getVersion())) {
107                            response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: " + info.getVersion());
108                    }
109                    
110                } else {            
111                        HttpServerTransportChannel transportChannel = getTransportChannel(request);
112                        if (transportChannel == null) {
113                            response.setStatus(HttpServletResponse.SC_NOT_FOUND);
114                        }
115                    else {
116                        PacketListener packetListener = transportChannel.getPacketListener();
117                        if (packetListener == null) {
118                            log.error("No packetListener available to process inbound packet: " + packet);
119                        }
120                        else {
121                            packetListener.consume(packet);
122                        }
123                    }
124                }
125            }
126            catch (IOException e) {
127                log.error("Caught: " + e, e);
128            }
129            catch (JMSException e) {
130                throw JMSExceptionHelper.newIOException(e);
131            }
132        }
133    
134        /**
135         * @param version
136         * @return
137         */
138        private boolean canProcessWireFormatVersion(int version) {
139            // TODO: 
140            return true;
141        }
142    
143        protected String readRequestBody(HttpServletRequest request) throws IOException {
144            StringBuffer buffer = new StringBuffer();
145            BufferedReader reader = request.getReader();
146            while (true) {
147                String line = reader.readLine();
148                if (line == null) {
149                    break;
150                }
151                else {
152                    buffer.append(line);
153                    buffer.append("\n");
154                }
155            }
156            return buffer.toString();
157        }
158    
159        protected HttpServerTransportChannel getTransportChannel(HttpServletRequest request) {
160            String clientID = request.getHeader("clientID");
161            if (clientID == null) {
162                clientID = request.getParameter("clientID");
163            }
164            if (clientID == null) {
165                log.warn("No clientID header so ignoring request");
166                return null;
167            }
168            synchronized (this) {
169                HttpServerTransportChannel answer = (HttpServerTransportChannel) clients.get(clientID);
170                if (answer == null) {
171                    answer = createTransportChannel();
172                    clients.put(clientID, answer);
173                    listener.addClient(answer);
174                }
175                else {
176                    // this lookup should keep the client alive, otherwise we need to discard it
177                    keepAlivePing(answer);
178                }
179                return answer;
180            }
181        }
182    
183        /**
184         * Disable this channel from being auto-disconnected after a timeout period
185         */
186        protected void keepAlivePing(HttpServerTransportChannel channel) {
187            /** TODO */
188        }
189    
190        protected HttpServerTransportChannel createTransportChannel() {
191            return new HttpServerTransportChannel(new BoundedLinkedQueue(10));
192        }
193    
194        protected TextWireFormat createWireFormat() {
195            return new XStreamWireFormat();
196        }
197    }