001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.discovery.simple;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.util.concurrent.SynchronousQueue;
022    import java.util.concurrent.ThreadFactory;
023    import java.util.concurrent.ThreadPoolExecutor;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicBoolean;
026    
027    import org.apache.activemq.command.DiscoveryEvent;
028    import org.apache.activemq.transport.discovery.DiscoveryAgent;
029    import org.apache.activemq.transport.discovery.DiscoveryListener;
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    
033    /**
034     * A simple DiscoveryAgent that allows static configuration of the discovered
035     * services.
036     * 
037     * @version $Revision$
038     */
039    public class SimpleDiscoveryAgent implements DiscoveryAgent {
040    
041        private final static Log LOG = LogFactory.getLog(SimpleDiscoveryAgent.class); 
042        private static final ThreadPoolExecutor ASYNC_TASKS;
043        private long initialReconnectDelay = 1000;
044        private long maxReconnectDelay = 1000 * 30;
045        private long backOffMultiplier = 2;
046        private boolean useExponentialBackOff=true;
047        private int maxReconnectAttempts;
048        private final Object sleepMutex = new Object();
049        private long minConnectTime = 5000;
050        private DiscoveryListener listener;
051        private String services[] = new String[] {};
052        private final AtomicBoolean running = new AtomicBoolean(false);
053    
054        class SimpleDiscoveryEvent extends DiscoveryEvent {
055    
056            private int connectFailures;
057            private long reconnectDelay = initialReconnectDelay;
058            private long connectTime = System.currentTimeMillis();
059            private AtomicBoolean failed = new AtomicBoolean(false);
060    
061            public SimpleDiscoveryEvent(String service) {
062                super(service);
063            }
064    
065        }
066    
067        public void setDiscoveryListener(DiscoveryListener listener) {
068            this.listener = listener;
069        }
070    
071        public void registerService(String name) throws IOException {
072        }
073    
074        public void start() throws Exception {
075            running.set(true);
076            for (int i = 0; i < services.length; i++) {
077                listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
078            }
079        }
080    
081        public void stop() throws Exception {
082            running.set(false);
083            synchronized (sleepMutex) {
084                sleepMutex.notifyAll();
085            }
086        }
087    
088        public String[] getServices() {
089            return services;
090        }
091    
092        public void setServices(String services) {
093            this.services = services.split(",");
094        }
095    
096        public void setServices(String services[]) {
097            this.services = services;
098        }
099    
100        public void setServices(URI services[]) {
101            this.services = new String[services.length];
102            for (int i = 0; i < services.length; i++) {
103                this.services[i] = services[i].toString();
104            }
105        }
106    
107        public void serviceFailed(DiscoveryEvent devent) throws IOException {
108    
109            final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
110            if (event.failed.compareAndSet(false, true)) {
111    
112                listener.onServiceRemove(event);
113                ASYNC_TASKS.execute(new Runnable() {
114                    public void run() {
115    
116                        // We detect a failed connection attempt because the service
117                        // fails right
118                        // away.
119                        if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
120                            LOG.debug("Failure occured soon after the discovery event was generated.  It will be clasified as a connection failure: "+event);
121    
122                            event.connectFailures++;
123    
124                            if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
125                                LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries.  Reconnecting has been disabled.");
126                                return;
127                            }
128    
129                            synchronized (sleepMutex) {
130                                try {
131                                    if (!running.get()) {
132                                        return;
133                                    }
134    
135                                    LOG.debug("Waiting "+event.reconnectDelay+" ms before attepting to reconnect.");
136                                    sleepMutex.wait(event.reconnectDelay);
137                                } catch (InterruptedException ie) {
138                                    Thread.currentThread().interrupt();
139                                    return;
140                                }
141                            }
142    
143                            if (!useExponentialBackOff) {
144                                event.reconnectDelay = initialReconnectDelay;
145                            } else {
146                                // Exponential increment of reconnect delay.
147                                event.reconnectDelay *= backOffMultiplier;
148                                if (event.reconnectDelay > maxReconnectDelay) {
149                                    event.reconnectDelay = maxReconnectDelay;
150                                }
151                            }
152    
153                        } else {
154                            event.connectFailures = 0;
155                            event.reconnectDelay = initialReconnectDelay;
156                        }
157    
158                        if (!running.get()) {
159                            return;
160                        }
161    
162                        event.connectTime = System.currentTimeMillis();
163                        event.failed.set(false);
164                        listener.onServiceAdd(event);
165                    }
166                });
167            }
168        }
169    
170        public long getBackOffMultiplier() {
171            return backOffMultiplier;
172        }
173    
174        public void setBackOffMultiplier(long backOffMultiplier) {
175            this.backOffMultiplier = backOffMultiplier;
176        }
177    
178        public long getInitialReconnectDelay() {
179            return initialReconnectDelay;
180        }
181    
182        public void setInitialReconnectDelay(long initialReconnectDelay) {
183            this.initialReconnectDelay = initialReconnectDelay;
184        }
185    
186        public int getMaxReconnectAttempts() {
187            return maxReconnectAttempts;
188        }
189    
190        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
191            this.maxReconnectAttempts = maxReconnectAttempts;
192        }
193    
194        public long getMaxReconnectDelay() {
195            return maxReconnectDelay;
196        }
197    
198        public void setMaxReconnectDelay(long maxReconnectDelay) {
199            this.maxReconnectDelay = maxReconnectDelay;
200        }
201    
202        public long getMinConnectTime() {
203            return minConnectTime;
204        }
205    
206        public void setMinConnectTime(long minConnectTime) {
207            this.minConnectTime = minConnectTime;
208        }
209    
210        public boolean isUseExponentialBackOff() {
211            return useExponentialBackOff;
212        }
213    
214        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
215            this.useExponentialBackOff = useExponentialBackOff;
216        }
217        
218        static {
219            ASYNC_TASKS =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
220                public Thread newThread(Runnable runnable) {
221                    Thread thread = new Thread(runnable, "Simple Discovery Agent: "+runnable);
222                    thread.setDaemon(true);
223                    return thread;
224                }
225            });
226        }
227    
228    
229    }