001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.http;
018    
019    import java.io.DataInputStream;
020    import java.io.IOException;
021    import java.io.InterruptedIOException;
022    import java.net.URI;
023    
024    import org.apache.activemq.command.ShutdownInfo;
025    import org.apache.activemq.transport.FutureResponse;
026    import org.apache.activemq.transport.util.TextWireFormat;
027    import org.apache.activemq.util.ByteArrayInputStream;
028    import org.apache.activemq.util.IOExceptionSupport;
029    import org.apache.activemq.util.IdGenerator;
030    import org.apache.activemq.util.ServiceStopper;
031    import org.apache.commons.httpclient.HttpClient;
032    import org.apache.commons.httpclient.HttpMethod;
033    import org.apache.commons.httpclient.HttpStatus;
034    import org.apache.commons.httpclient.methods.GetMethod;
035    import org.apache.commons.httpclient.methods.HeadMethod;
036    import org.apache.commons.httpclient.methods.InputStreamRequestEntity;
037    import org.apache.commons.httpclient.methods.PostMethod;
038    import org.apache.commons.httpclient.params.HttpClientParams;
039    import org.apache.commons.logging.Log;
040    import org.apache.commons.logging.LogFactory;
041    
042    /**
043     * A HTTP {@link org.apache.activemq.transport.TransportChannel} which uses the
044     * <a href="http://jakarta.apache.org/commons/httpclient/">commons-httpclient</a>
045     * library
046     * 
047     * @version $Revision$
048     */
049    public class HttpClientTransport extends HttpTransportSupport {
050    
051        public static final int MAX_CLIENT_TIMEOUT = 30000;
052        private static final Log LOG = LogFactory.getLog(HttpClientTransport.class);
053        private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator();
054    
055        private HttpClient sendHttpClient;
056        private HttpClient receiveHttpClient;
057    
058        private final String clientID = CLIENT_ID_GENERATOR.generateId();
059        private boolean trace;
060        private GetMethod httpMethod;
061        private volatile int receiveCounter;
062        
063        public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
064            super(wireFormat, remoteUrl);
065        }
066    
067        public FutureResponse asyncRequest(Object command) throws IOException {
068            return null;
069        }
070    
071        public void oneway(Object command) throws IOException {
072    
073            if (isStopped()) {
074                throw new IOException("stopped.");
075            }
076            PostMethod httpMethod = new PostMethod(getRemoteUrl().toString());
077            configureMethod(httpMethod);
078            String data = getTextWireFormat().marshalText(command);
079            byte[] bytes = data.getBytes("UTF-8");
080            InputStreamRequestEntity entity = new InputStreamRequestEntity(new ByteArrayInputStream(bytes));
081            httpMethod.setRequestEntity(entity);
082    
083            try {
084    
085                HttpClient client = getSendHttpClient();
086                HttpClientParams params = new HttpClientParams();
087                params.setSoTimeout(MAX_CLIENT_TIMEOUT);
088                client.setParams(params);
089                int answer = client.executeMethod(httpMethod);
090                if (answer != HttpStatus.SC_OK) {
091                    throw new IOException("Failed to post command: " + command + " as response was: " + answer);
092                }
093                if (command instanceof ShutdownInfo) {
094                    try {
095                            stop();
096                    } catch (Exception e) {
097                            LOG.warn("Error trying to stop HTTP client: "+ e, e);
098                    }
099                }
100            } catch (IOException e) {
101                throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
102            } finally {
103                httpMethod.getResponseBody();
104                httpMethod.releaseConnection();
105            }
106        }
107    
108        public Object request(Object command) throws IOException {
109            return null;
110        }
111    
112        public void run() {
113    
114            LOG.trace("HTTP GET consumer thread starting: " + this);
115            HttpClient httpClient = getReceiveHttpClient();
116            URI remoteUrl = getRemoteUrl();
117    
118            while (!isStopped() && !isStopping()) {
119    
120                httpMethod = new GetMethod(remoteUrl.toString());
121                configureMethod(httpMethod);
122    
123                try {
124                    int answer = httpClient.executeMethod(httpMethod);
125                    if (answer != HttpStatus.SC_OK) {
126                        if (answer == HttpStatus.SC_REQUEST_TIMEOUT) {
127                            LOG.debug("GET timed out");
128                            try {
129                                Thread.sleep(1000);
130                            } catch (InterruptedException e) {
131                                onException(new InterruptedIOException());
132                                break;
133                            }
134                        } else {
135                            onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer));
136                            break;
137                        }
138                    } else {
139                        receiveCounter++;
140                        DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream());
141                        Object command = (Object)getTextWireFormat().unmarshal(stream);
142                        if (command == null) {
143                            LOG.debug("Received null command from url: " + remoteUrl);
144                        } else {
145                            doConsume(command);
146                        }
147                    }
148                } catch (IOException e) {
149                    onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e));
150                    break;
151                } finally {
152                    httpMethod.releaseConnection();
153                }
154            }
155        }
156    
157        // Properties
158        // -------------------------------------------------------------------------
159        public HttpClient getSendHttpClient() {
160            if (sendHttpClient == null) {
161                sendHttpClient = createHttpClient();
162            }
163            return sendHttpClient;
164        }
165    
166        public void setSendHttpClient(HttpClient sendHttpClient) {
167            this.sendHttpClient = sendHttpClient;
168        }
169    
170        public HttpClient getReceiveHttpClient() {
171            if (receiveHttpClient == null) {
172                receiveHttpClient = createHttpClient();
173            }
174            return receiveHttpClient;
175        }
176    
177        public void setReceiveHttpClient(HttpClient receiveHttpClient) {
178            this.receiveHttpClient = receiveHttpClient;
179        }
180    
181        // Implementation methods
182        // -------------------------------------------------------------------------
183        protected void doStart() throws Exception {
184    
185            LOG.trace("HTTP GET consumer thread starting: " + this);
186            HttpClient httpClient = getReceiveHttpClient();
187            URI remoteUrl = getRemoteUrl();
188    
189            HeadMethod httpMethod = new HeadMethod(remoteUrl.toString());
190            configureMethod(httpMethod);
191    
192            int answer = httpClient.executeMethod(httpMethod);
193            if (answer != HttpStatus.SC_OK) {
194                throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
195            }
196    
197            super.doStart();
198        }
199    
200        protected void doStop(ServiceStopper stopper) throws Exception {
201            if (httpMethod != null) {
202                httpMethod.abort();
203            }
204        }
205    
206        protected HttpClient createHttpClient() {
207            HttpClient client = new HttpClient();
208            if (getProxyHost() != null) {
209                client.getHostConfiguration().setProxy(getProxyHost(), getProxyPort());
210            }
211            return client;
212        }
213    
214        protected void configureMethod(HttpMethod method) {
215            method.setRequestHeader("clientID", clientID);
216        }
217    
218        public boolean isTrace() {
219            return trace;
220        }
221    
222        public void setTrace(boolean trace) {
223            this.trace = trace;
224        }
225    
226        public int getReceiveCounter() {
227            return receiveCounter;
228        }
229    
230    }