001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.discovery.http;
018    
019    import java.io.IOException;
020    import java.util.HashMap;
021    import java.util.HashSet;
022    import java.util.Map;
023    import java.util.Scanner;
024    import java.util.Set;
025    import java.util.concurrent.atomic.AtomicBoolean;
026    import java.util.concurrent.atomic.AtomicInteger;
027    import java.util.concurrent.atomic.AtomicReference;
028    
029    import org.apache.activemq.Service;
030    import org.apache.activemq.command.DiscoveryEvent;
031    import org.apache.activemq.transport.discovery.DiscoveryAgent;
032    import org.apache.activemq.transport.discovery.DiscoveryListener;
033    import org.apache.activemq.util.IntrospectionSupport;
034    import org.apache.commons.httpclient.HttpClient;
035    import org.apache.commons.httpclient.methods.DeleteMethod;
036    import org.apache.commons.httpclient.methods.GetMethod;
037    import org.apache.commons.httpclient.methods.PutMethod;
038    import org.apache.commons.logging.Log;
039    import org.apache.commons.logging.LogFactory;
040    
041    public class HTTPDiscoveryAgent implements DiscoveryAgent {
042        
043        private static final Log LOG = LogFactory.getLog(HTTPDiscoveryAgent.class);
044        
045        private String registryURL = "http://localhost:8080/discovery-registry/default";
046        private HttpClient httpClient = new HttpClient();
047        private AtomicBoolean running=new AtomicBoolean();
048        private final AtomicReference<DiscoveryListener> discoveryListener = new AtomicReference<DiscoveryListener>();
049        private final HashSet<String> registeredServices = new HashSet<String>();
050        private final HashMap<String, SimpleDiscoveryEvent> discoveredServices = new HashMap<String, SimpleDiscoveryEvent>();    
051        private Thread thread;   
052        private long updateInterval = 1000*10;
053        private String brokerName;
054        private boolean startEmbeddRegistry=false;
055        private Service jetty;
056        private AtomicInteger startCounter=new AtomicInteger(0);
057    
058        
059        private long initialReconnectDelay = 1000;
060        private long maxReconnectDelay = 1000 * 30;
061        private long backOffMultiplier = 2;
062        private boolean useExponentialBackOff=true;    
063        private int maxReconnectAttempts;
064        private final Object sleepMutex = new Object();
065        private long minConnectTime = 5000;
066        
067        class SimpleDiscoveryEvent extends DiscoveryEvent {
068    
069            private int connectFailures;
070            private long reconnectDelay = initialReconnectDelay;
071            private long connectTime = System.currentTimeMillis();
072            private AtomicBoolean failed = new AtomicBoolean(false);
073            private AtomicBoolean removed = new AtomicBoolean(false);
074    
075            public SimpleDiscoveryEvent(String service) {
076                super(service);
077            }
078    
079        }
080    
081        
082        public String getGroup() {
083            return null;
084        }
085    
086        public void registerService(String service) throws IOException {
087            synchronized(registeredServices) {
088                registeredServices.add(service);
089            }
090            doRegister(service);
091        }
092    
093        synchronized private void doRegister(String service) {
094            String url = registryURL;
095            try {
096                PutMethod method = new PutMethod(url);
097    //            method.setParams(createParams());
098                method.setRequestHeader("service", service);
099                int responseCode = httpClient.executeMethod(method);
100                LOG.debug("PUT to "+url+" got a "+responseCode);
101            } catch (Exception e) {
102                LOG.debug("PUT to "+url+" failed with: "+e);
103            }
104        }
105        
106        synchronized private void doUnRegister(String service) {
107            String url = registryURL;
108            try {
109                DeleteMethod method = new DeleteMethod(url);
110    //            method.setParams(createParams());
111                method.setRequestHeader("service", service);
112                int responseCode = httpClient.executeMethod(method);
113                LOG.debug("DELETE to "+url+" got a "+responseCode);
114            } catch (Exception e) {
115                LOG.debug("DELETE to "+url+" failed with: "+e);
116            }
117        }
118    
119    //    private HttpMethodParams createParams() {
120    //        HttpMethodParams params = new HttpMethodParams();
121    //        params.setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(0,false));
122    //        return params;
123    //    }
124        
125        synchronized private Set<String> doLookup(long freshness) {
126            String url = registryURL+"?freshness="+freshness;
127            try {
128                GetMethod method = new GetMethod(url);
129    //            method.setParams(createParams());
130                int responseCode = httpClient.executeMethod(method);
131                LOG.debug("GET to "+url+" got a "+responseCode);
132                if( responseCode == 200 ) {
133                    Set<String> rc = new HashSet<String>();
134                    Scanner scanner = new Scanner(method.getResponseBodyAsStream());
135                    while( scanner.hasNextLine() ) {
136                        String service = scanner.nextLine();
137                        if( service.trim().length() != 0 ) {
138                            rc.add(service);
139                        }
140                    }
141                    return rc;
142                } else {
143                    LOG.debug("GET to "+url+" failed with response code: "+responseCode);
144                    return null;
145                }
146            } catch (Exception e) {
147                LOG.debug("GET to "+url+" failed with: "+e);
148                return null;
149            }
150        }
151    
152        public void serviceFailed(DiscoveryEvent devent) throws IOException {
153    
154            final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
155            if (event.failed.compareAndSet(false, true)) {
156                    discoveryListener.get().onServiceRemove(event);
157                    if(!event.removed.get()) {
158                            // Setup a thread to re-raise the event...
159                        Thread thread = new Thread() {
160                            public void run() {
161            
162                                // We detect a failed connection attempt because the service
163                                // fails right away.
164                                if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
165                                    LOG.debug("Failure occured soon after the discovery event was generated.  It will be clasified as a connection failure: "+event);
166            
167                                    event.connectFailures++;
168            
169                                    if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
170                                        LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries.  Reconnecting has been disabled.");
171                                        return;
172                                    }
173            
174                                    synchronized (sleepMutex) {
175                                        try {
176                                            if (!running.get() || event.removed.get()) {
177                                                return;
178                                            }
179                                            LOG.debug("Waiting "+event.reconnectDelay+" ms before attepting to reconnect.");
180                                            sleepMutex.wait(event.reconnectDelay);
181                                        } catch (InterruptedException ie) {
182                                            Thread.currentThread().interrupt();
183                                            return;
184                                        }
185                                    }
186            
187                                    if (!useExponentialBackOff) {
188                                        event.reconnectDelay = initialReconnectDelay;
189                                    } else {
190                                        // Exponential increment of reconnect delay.
191                                        event.reconnectDelay *= backOffMultiplier;
192                                        if (event.reconnectDelay > maxReconnectDelay) {
193                                            event.reconnectDelay = maxReconnectDelay;
194                                        }
195                                    }
196            
197                                } else {
198                                    event.connectFailures = 0;
199                                    event.reconnectDelay = initialReconnectDelay;
200                                }
201            
202                                if (!running.get() || event.removed.get()) {
203                                    return;
204                                }
205            
206                                event.connectTime = System.currentTimeMillis();
207                                event.failed.set(false);
208                                discoveryListener.get().onServiceAdd(event);
209                            }
210                        };
211                        thread.setDaemon(true);
212                        thread.start();
213                    }
214            }
215        }
216    
217    
218        public void setBrokerName(String brokerName) {
219            this.brokerName = brokerName;
220        }
221    
222        public void setDiscoveryListener(DiscoveryListener discoveryListener) {
223            this.discoveryListener.set(discoveryListener);
224        }
225    
226        public void setGroup(String group) {
227        }
228    
229        public void start() throws Exception {
230            if( startCounter.addAndGet(1)==1 ) {
231                if( startEmbeddRegistry ) {
232                    jetty = createEmbeddedJettyServer();
233                    Map props = new HashMap();
234                    props.put("agent", this);
235                    IntrospectionSupport.setProperties(jetty, props);
236                    jetty.start();
237                }
238                
239                running.set(true);
240                thread = new Thread("HTTPDiscovery Agent") {
241                    @Override
242                    public void run() {
243                        while(running.get()) {
244                            try {
245                                update();
246                                Thread.sleep(updateInterval);
247                            } catch (InterruptedException e) {
248                                return;
249                            }
250                        }
251                    }
252                };
253                thread.setDaemon(true);
254                thread.start();
255            }
256        }
257    
258        /**
259         * Create the EmbeddedJettyServer instance via reflection so that we can avoid a hard runtime dependency on 
260         * jetty.
261         * 
262         * @return
263         * @throws Exception
264         */
265        private Service createEmbeddedJettyServer()  throws Exception {
266            Class clazz = HTTPDiscoveryAgent.class.getClassLoader().loadClass("org.apache.activemq.transport.discovery.http.EmbeddedJettyServer");
267            return (Service)clazz.newInstance();
268        }
269    
270        private void update() {
271            // Register all our services...
272            synchronized(registeredServices) {
273                for (String service : registeredServices) {
274                    doRegister(service);
275                }
276            }
277            
278            // Find new registered services...
279            DiscoveryListener discoveryListener = this.discoveryListener.get();
280            if(discoveryListener!=null) {
281                Set<String> activeServices = doLookup(updateInterval*3);
282                // If there is error talking the the central server, then activeServices == null
283                if( activeServices !=null ) {
284                    synchronized(discoveredServices) {
285                        
286                        HashSet<String> removedServices = new HashSet<String>(discoveredServices.keySet());
287                        removedServices.removeAll(activeServices);
288                        
289                        HashSet<String> addedServices = new HashSet<String>(activeServices);
290                        addedServices.removeAll(discoveredServices.keySet());
291                        addedServices.removeAll(removedServices);
292                        
293                        for (String service : addedServices) {
294                            SimpleDiscoveryEvent e = new SimpleDiscoveryEvent(service);
295                            discoveredServices.put(service, e);
296                            discoveryListener.onServiceAdd(e);
297                        }
298                        
299                        for (String service : removedServices) {
300                            SimpleDiscoveryEvent e = discoveredServices.remove(service);
301                            if( e !=null ) {
302                                    e.removed.set(true);
303                            }
304                            discoveryListener.onServiceRemove(e);
305                        }
306                    }
307                }
308            }
309        }
310    
311        public void stop() throws Exception {
312            if( startCounter.decrementAndGet()==0 ) {
313                running.set(false);
314                if( thread!=null ) {
315                    thread.join(updateInterval*3);
316                    thread=null;
317                }
318                if( jetty!=null ) {
319                    jetty.stop();
320                    jetty = null;
321                }
322            }
323        }
324    
325        public String getRegistryURL() {
326            return registryURL;
327        }
328    
329        public void setRegistryURL(String discoveryRegistryURL) {
330            this.registryURL = discoveryRegistryURL;
331        }
332    
333        public long getUpdateInterval() {
334            return updateInterval;
335        }
336    
337        public void setUpdateInterval(long updateInterval) {
338            this.updateInterval = updateInterval;
339        }
340    
341        public boolean isStartEmbeddRegistry() {
342            return startEmbeddRegistry;
343        }
344    
345        public void setStartEmbeddRegistry(boolean startEmbeddRegistry) {
346            this.startEmbeddRegistry = startEmbeddRegistry;
347        }
348    
349    }