001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.http;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.OutputStreamWriter;
022    import java.io.Writer;
023    import java.net.HttpURLConnection;
024    import java.net.MalformedURLException;
025    import java.net.URI;
026    import java.net.URL;
027    
028    import org.apache.activemq.command.Command;
029    import org.apache.activemq.command.ConnectionInfo;
030    import org.apache.activemq.transport.util.TextWireFormat;
031    import org.apache.activemq.util.ByteArrayOutputStream;
032    import org.apache.activemq.util.ByteSequence;
033    import org.apache.activemq.util.Callback;
034    import org.apache.activemq.util.IOExceptionSupport;
035    import org.apache.activemq.util.ServiceStopper;
036    import org.apache.commons.logging.Log;
037    import org.apache.commons.logging.LogFactory;
038    
039    /**
040     * @version $Revision$
041     */
042    public class HttpTransport extends HttpTransportSupport {
043        
044        private static final Log LOG = LogFactory.getLog(HttpTransport.class);
045        
046        private HttpURLConnection sendConnection;
047        private HttpURLConnection receiveConnection;
048        private URL url;
049        private String clientID;
050        private volatile int receiveCounter;
051        
052        // private String sessionID;
053    
054        public HttpTransport(TextWireFormat wireFormat, URI remoteUrl) throws MalformedURLException {
055            super(wireFormat, remoteUrl);
056            url = new URL(remoteUrl.toString());
057        }
058    
059        public void oneway(Object o) throws IOException {
060            final Command command = (Command)o;
061            try {
062                if (command.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) {
063                    boolean startGetThread = clientID == null;
064                    clientID = ((ConnectionInfo)command).getClientId();
065                    if (startGetThread && isStarted()) {
066                        try {
067                            super.doStart();
068                        } catch (Exception e) {
069                            throw IOExceptionSupport.create(e);
070                        }
071                    }
072                }
073    
074                HttpURLConnection connection = getSendConnection();
075                String text = getTextWireFormat().marshalText(command);
076                Writer writer = new OutputStreamWriter(connection.getOutputStream());
077                writer.write(text);
078                writer.flush();
079                int answer = connection.getResponseCode();
080                if (answer != HttpURLConnection.HTTP_OK) {
081                    throw new IOException("Failed to post command: " + command + " as response was: " + answer);
082                }
083                // checkSession(connection);
084            } catch (IOException e) {
085                throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
086            }
087        }
088    
089        public void run() {
090            LOG.trace("HTTP GET consumer thread starting for transport: " + this);
091            URI remoteUrl = getRemoteUrl();
092            while (!isStopped()) {
093                try {
094                    HttpURLConnection connection = getReceiveConnection();
095                    int answer = connection.getResponseCode();
096                    if (answer != HttpURLConnection.HTTP_OK) {
097                        if (answer == HttpURLConnection.HTTP_CLIENT_TIMEOUT) {
098                            LOG.trace("GET timed out");
099                        } else {
100                            LOG.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
101                        }
102                    } else {
103                        // checkSession(connection);
104    
105                        // Create a String for the UTF content
106                        receiveCounter++;
107                        InputStream is = connection.getInputStream();
108                        ByteArrayOutputStream baos = new ByteArrayOutputStream(connection.getContentLength() > 0 ? connection.getContentLength() : 1024);
109                        int c = 0;
110                        while ((c = is.read()) >= 0) {
111                            baos.write(c);
112                        }
113                        ByteSequence sequence = baos.toByteSequence();
114                        String data = new String(sequence.data, sequence.offset, sequence.length, "UTF-8");
115    
116                        Command command = (Command)getTextWireFormat().unmarshalText(data);
117    
118                        if (command == null) {
119                            LOG.warn("Received null packet from url: " + remoteUrl);
120                        } else {
121                            doConsume(command);
122                        }
123                    }
124                } catch (Throwable e) {
125                    if (!isStopped()) {
126                        LOG.error("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
127                    } else {
128                        LOG.trace("Caught error after closed: " + e, e);
129                    }
130                } finally {
131                    safeClose(receiveConnection);
132                    receiveConnection = null;
133                }
134            }
135        }
136    
137        // Implementation methods
138        // -------------------------------------------------------------------------
139        protected HttpURLConnection createSendConnection() throws IOException {
140            HttpURLConnection conn = (HttpURLConnection)getRemoteURL().openConnection();
141            conn.setDoOutput(true);
142            conn.setRequestMethod("POST");
143            configureConnection(conn);
144            conn.connect();
145            return conn;
146        }
147    
148        protected HttpURLConnection createReceiveConnection() throws IOException {
149            HttpURLConnection conn = (HttpURLConnection)getRemoteURL().openConnection();
150            conn.setDoOutput(false);
151            conn.setDoInput(true);
152            conn.setRequestMethod("GET");
153            configureConnection(conn);
154            conn.connect();
155            return conn;
156        }
157    
158        // protected void checkSession(HttpURLConnection connection)
159        // {
160        // String set_cookie=connection.getHeaderField("Set-Cookie");
161        // if (set_cookie!=null && set_cookie.startsWith("JSESSIONID="))
162        // {
163        // String[] bits=set_cookie.split("[=;]");
164        // sessionID=bits[1];
165        // }
166        // }
167    
168        protected void configureConnection(HttpURLConnection connection) {
169            // if (sessionID !=null) {
170            // connection.addRequestProperty("Cookie", "JSESSIONID="+sessionID);
171            // }
172            // else
173            if (clientID != null) {
174                connection.setRequestProperty("clientID", clientID);
175            }
176        }
177    
178        protected URL getRemoteURL() {
179            return url;
180        }
181    
182        protected HttpURLConnection getSendConnection() throws IOException {
183            setSendConnection(createSendConnection());
184            return sendConnection;
185        }
186    
187        protected HttpURLConnection getReceiveConnection() throws IOException {
188            setReceiveConnection(createReceiveConnection());
189            return receiveConnection;
190        }
191    
192        protected void setSendConnection(HttpURLConnection conn) {
193            safeClose(sendConnection);
194            sendConnection = conn;
195        }
196    
197        protected void setReceiveConnection(HttpURLConnection conn) {
198            safeClose(receiveConnection);
199            receiveConnection = conn;
200        }
201    
202        protected void doStart() throws Exception {
203            // Don't start the background thread until the clientId has been
204            // established.
205            if (clientID != null) {
206                super.doStart();
207            }
208        }
209    
210        protected void doStop(ServiceStopper stopper) throws Exception {
211            stopper.run(new Callback() {
212                public void execute() throws Exception {
213                    safeClose(sendConnection);
214                }
215            });
216            sendConnection = null;
217            stopper.run(new Callback() {
218                public void execute() {
219                    safeClose(receiveConnection);
220                }
221            });
222        }
223    
224        /**
225         * @param connection TODO
226         */
227        private void safeClose(HttpURLConnection connection) {
228            if (connection != null) {
229                connection.disconnect();
230            }
231        }
232    
233        public int getReceiveCounter() {
234            return receiveCounter;
235        }
236    
237    }