001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.fanout;
018    
019    import java.io.IOException;
020    import java.io.InterruptedIOException;
021    import java.net.URI;
022    import java.util.ArrayList;
023    import java.util.Iterator;
024    import java.util.concurrent.ConcurrentHashMap;
025    import java.util.concurrent.atomic.AtomicInteger;
026    
027    import org.apache.activemq.command.Command;
028    import org.apache.activemq.command.ConsumerInfo;
029    import org.apache.activemq.command.Message;
030    import org.apache.activemq.command.Response;
031    import org.apache.activemq.state.ConnectionStateTracker;
032    import org.apache.activemq.thread.DefaultThreadPools;
033    import org.apache.activemq.thread.Task;
034    import org.apache.activemq.thread.TaskRunner;
035    import org.apache.activemq.transport.CompositeTransport;
036    import org.apache.activemq.transport.DefaultTransportListener;
037    import org.apache.activemq.transport.FutureResponse;
038    import org.apache.activemq.transport.ResponseCallback;
039    import org.apache.activemq.transport.Transport;
040    import org.apache.activemq.transport.TransportFactory;
041    import org.apache.activemq.transport.TransportListener;
042    import org.apache.activemq.util.IOExceptionSupport;
043    import org.apache.activemq.util.ServiceStopper;
044    import org.apache.activemq.util.ServiceSupport;
045    import org.apache.commons.logging.Log;
046    import org.apache.commons.logging.LogFactory;
047    
048    /**
049     * A Transport that fans out a connection to multiple brokers.
050     * 
051     * @version $Revision$
052     */
053    public class FanoutTransport implements CompositeTransport {
054    
055        private static final Log LOG = LogFactory.getLog(FanoutTransport.class);
056    
057        private TransportListener transportListener;
058        private boolean disposed;
059        private boolean connected;
060    
061        private final Object reconnectMutex = new Object();
062        private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
063        private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
064    
065        private final TaskRunner reconnectTask;
066        private boolean started;
067    
068        private ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>();
069        private int connectedCount;
070    
071        private int minAckCount = 2;
072    
073        private long initialReconnectDelay = 10;
074        private long maxReconnectDelay = 1000 * 30;
075        private long backOffMultiplier = 2;
076        private boolean useExponentialBackOff = true;
077        private int maxReconnectAttempts;
078        private Exception connectionFailure;
079        private FanoutTransportHandler primary;
080        private boolean fanOutQueues = false;
081    
082        static class RequestCounter {
083    
084            final Command command;
085            final AtomicInteger ackCount;
086    
087            RequestCounter(Command command, int count) {
088                this.command = command;
089                this.ackCount = new AtomicInteger(count);
090            }
091    
092            public String toString() {
093                return command.getCommandId() + "=" + ackCount.get();
094            }
095        }
096    
097        class FanoutTransportHandler extends DefaultTransportListener {
098    
099            private final URI uri;
100            private Transport transport;
101    
102            private int connectFailures;
103            private long reconnectDelay = initialReconnectDelay;
104            private long reconnectDate;
105    
106            public FanoutTransportHandler(URI uri) {
107                this.uri = uri;
108            }
109    
110            public void onCommand(Object o) {
111                Command command = (Command)o;
112                if (command.isResponse()) {
113                    Integer id = new Integer(((Response)command).getCorrelationId());
114                    RequestCounter rc = requestMap.get(id);
115                    if (rc != null) {
116                        if (rc.ackCount.decrementAndGet() <= 0) {
117                            requestMap.remove(id);
118                            transportListenerOnCommand(command);
119                        }
120                    } else {
121                        transportListenerOnCommand(command);
122                    }
123                } else {
124                    transportListenerOnCommand(command);
125                }
126            }
127    
128            public void onException(IOException error) {
129                try {
130                    synchronized (reconnectMutex) {
131                        if (transport == null) {
132                            return;
133                        }
134    
135                        LOG.debug("Transport failed, starting up reconnect task", error);
136    
137                        ServiceSupport.dispose(transport);
138                        transport = null;
139                        connectedCount--;
140                        if (primary == this) {
141                            primary = null;
142                        }
143                        reconnectTask.wakeup();
144                    }
145                } catch (InterruptedException e) {
146                    Thread.currentThread().interrupt();
147                    if (transportListener != null) {
148                        transportListener.onException(new InterruptedIOException());
149                    }
150                }
151            }
152        }
153    
154        public FanoutTransport() throws InterruptedIOException {
155            // Setup a task that is used to reconnect the a connection async.
156            reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
157                public boolean iterate() {
158                    return doConnect();
159                }
160            }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this));
161        }
162    
163        /**
164         * @return
165         */
166        private boolean doConnect() {
167            long closestReconnectDate = 0;
168            synchronized (reconnectMutex) {
169    
170                if (disposed || connectionFailure != null) {
171                    reconnectMutex.notifyAll();
172                }
173    
174                if (transports.size() == connectedCount || disposed || connectionFailure != null) {
175                    return false;
176                } else {
177    
178                    if (transports.isEmpty()) {
179                        // connectionFailure = new IOException("No uris available to
180                        // connect to.");
181                    } else {
182    
183                        // Try to connect them up.
184                        Iterator<FanoutTransportHandler> iter = transports.iterator();
185                        for (int i = 0; iter.hasNext() && !disposed; i++) {
186    
187                            long now = System.currentTimeMillis();
188    
189                            FanoutTransportHandler fanoutHandler = iter.next();
190                            if (fanoutHandler.transport != null) {
191                                continue;
192                            }
193    
194                            // Are we waiting a little to try to reconnect this one?
195                            if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) {
196                                if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
197                                    closestReconnectDate = fanoutHandler.reconnectDate;
198                                }
199                                continue;
200                            }
201    
202                            URI uri = fanoutHandler.uri;
203                            try {
204                                LOG.debug("Stopped: " + this);
205                                LOG.debug("Attempting connect to: " + uri);
206                                Transport t = TransportFactory.compositeConnect(uri);
207                                fanoutHandler.transport = t;
208                                t.setTransportListener(fanoutHandler);
209                                if (started) {
210                                    restoreTransport(fanoutHandler);
211                                }
212                                LOG.debug("Connection established");
213                                fanoutHandler.reconnectDelay = initialReconnectDelay;
214                                fanoutHandler.connectFailures = 0;
215                                if (primary == null) {
216                                    primary = fanoutHandler;
217                                }
218                                connectedCount++;
219                            } catch (Exception e) {
220                                LOG.debug("Connect fail to: " + uri + ", reason: " + e);
221    
222                                if( fanoutHandler.transport !=null ) {
223                                    ServiceSupport.dispose(fanoutHandler.transport);
224                                    fanoutHandler.transport=null;
225                                }
226                                
227                                if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
228                                    LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
229                                    connectionFailure = e;
230                                    reconnectMutex.notifyAll();
231                                    return false;
232                                } else {
233    
234                                    if (useExponentialBackOff) {
235                                        // Exponential increment of reconnect delay.
236                                        fanoutHandler.reconnectDelay *= backOffMultiplier;
237                                        if (fanoutHandler.reconnectDelay > maxReconnectDelay) {
238                                            fanoutHandler.reconnectDelay = maxReconnectDelay;
239                                        }
240                                    }
241    
242                                    fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;
243    
244                                    if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
245                                        closestReconnectDate = fanoutHandler.reconnectDate;
246                                    }
247                                }
248                            }
249                        }
250                        if (transports.size() == connectedCount || disposed) {
251                            reconnectMutex.notifyAll();
252                            return false;
253                        }
254    
255                    }
256                }
257    
258            }
259    
260            try {
261                long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
262                if (reconnectDelay > 0) {
263                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
264                    Thread.sleep(reconnectDelay);
265                }
266            } catch (InterruptedException e1) {
267                Thread.currentThread().interrupt();
268            }
269            return true;
270        }
271    
272        public void start() throws Exception {
273            synchronized (reconnectMutex) {
274                LOG.debug("Started.");
275                if (started) {
276                    return;
277                }
278                started = true;
279                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
280                    FanoutTransportHandler th = iter.next();
281                    if (th.transport != null) {
282                        restoreTransport(th);
283                    }
284                }
285                connected=true;
286            }
287        }
288    
289        public void stop() throws Exception {
290            synchronized (reconnectMutex) {
291                ServiceStopper ss = new ServiceStopper();
292    
293                if (!started) {
294                    return;
295                }
296                started = false;
297                disposed = true;
298                connected=false;
299    
300                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
301                    FanoutTransportHandler th = iter.next();
302                    if (th.transport != null) {
303                        ss.stop(th.transport);
304                    }
305                }
306    
307                LOG.debug("Stopped: " + this);
308                ss.throwFirstException();
309            }
310            reconnectTask.shutdown();
311        }
312    
313            public int getMinAckCount() {
314                    return minAckCount;
315            }
316    
317            public void setMinAckCount(int minAckCount) {
318                    this.minAckCount = minAckCount;
319            }    
320        
321        public long getInitialReconnectDelay() {
322            return initialReconnectDelay;
323        }
324    
325        public void setInitialReconnectDelay(long initialReconnectDelay) {
326            this.initialReconnectDelay = initialReconnectDelay;
327        }
328    
329        public long getMaxReconnectDelay() {
330            return maxReconnectDelay;
331        }
332    
333        public void setMaxReconnectDelay(long maxReconnectDelay) {
334            this.maxReconnectDelay = maxReconnectDelay;
335        }
336    
337        public long getReconnectDelayExponent() {
338            return backOffMultiplier;
339        }
340    
341        public void setReconnectDelayExponent(long reconnectDelayExponent) {
342            this.backOffMultiplier = reconnectDelayExponent;
343        }
344    
345        public int getMaxReconnectAttempts() {
346            return maxReconnectAttempts;
347        }
348    
349        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
350            this.maxReconnectAttempts = maxReconnectAttempts;
351        }
352    
353        public void oneway(Object o) throws IOException {
354            final Command command = (Command)o;
355            try {
356                synchronized (reconnectMutex) {
357    
358                    // Wait for transport to be connected.
359                    while (connectedCount < minAckCount && !disposed && connectionFailure == null) {
360                        LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
361                        reconnectMutex.wait(1000);
362                    }
363    
364                    // Still not fully connected.
365                    if (connectedCount < minAckCount) {
366    
367                        Exception error;
368    
369                        // Throw the right kind of error..
370                        if (disposed) {
371                            error = new IOException("Transport disposed.");
372                        } else if (connectionFailure != null) {
373                            error = connectionFailure;
374                        } else {
375                            error = new IOException("Unexpected failure.");
376                        }
377    
378                        if (error instanceof IOException) {
379                            throw (IOException)error;
380                        }
381                        throw IOExceptionSupport.create(error);
382                    }
383    
384                    // If it was a request and it was not being tracked by
385                    // the state tracker,
386                    // then hold it in the requestMap so that we can replay
387                    // it later.
388                    boolean fanout = isFanoutCommand(command);
389                    if (stateTracker.track(command) == null && command.isResponseRequired()) {
390                        int size = fanout ? minAckCount : 1;
391                        requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
392                    }
393                    
394                    // Send the message.
395                    if (fanout) {
396                        for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
397                            FanoutTransportHandler th = iter.next();
398                            if (th.transport != null) {
399                                try {
400                                    th.transport.oneway(command);
401                                } catch (IOException e) {
402                                    LOG.debug("Send attempt: failed.");
403                                    th.onException(e);
404                                }
405                            }
406                        }
407                    } else {
408                        try {
409                            primary.transport.oneway(command);
410                        } catch (IOException e) {
411                            LOG.debug("Send attempt: failed.");
412                            primary.onException(e);
413                        }
414                    }
415    
416                }
417            } catch (InterruptedException e) {
418                // Some one may be trying to stop our thread.
419                Thread.currentThread().interrupt();
420                throw new InterruptedIOException();
421            }
422        }
423    
424        /**
425         * @param command
426         * @return
427         */
428        private boolean isFanoutCommand(Command command) {
429            if (command.isMessage()) {
430                if( fanOutQueues ) {
431                    return true;
432                }
433                return ((Message)command).getDestination().isTopic();
434            }
435            if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
436                return false;
437            }
438            return true;
439        }
440    
441        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
442            throw new AssertionError("Unsupported Method");
443        }
444    
445        public Object request(Object command) throws IOException {
446            throw new AssertionError("Unsupported Method");
447        }
448    
449        public Object request(Object command, int timeout) throws IOException {
450            throw new AssertionError("Unsupported Method");
451        }
452    
453        public void reconnect() {
454            LOG.debug("Waking up reconnect task");
455            try {
456                reconnectTask.wakeup();
457            } catch (InterruptedException e) {
458                Thread.currentThread().interrupt();
459            }
460        }
461    
462        public TransportListener getTransportListener() {
463            return transportListener;
464        }
465    
466        public void setTransportListener(TransportListener commandListener) {
467            this.transportListener = commandListener;
468        }
469    
470        public <T> T narrow(Class<T> target) {
471    
472            if (target.isAssignableFrom(getClass())) {
473                return target.cast(this);
474            }
475    
476            synchronized (reconnectMutex) {
477                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
478                    FanoutTransportHandler th = iter.next();
479                    if (th.transport != null) {
480                        T rc = th.transport.narrow(target);
481                        if (rc != null) {
482                            return rc;
483                        }
484                    }
485                }
486            }
487    
488            return null;
489    
490        }
491    
492        protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
493            th.transport.start();
494            stateTracker.setRestoreConsumers(th.transport == primary);
495            stateTracker.restore(th.transport);
496            for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
497                RequestCounter rc = iter2.next();
498                th.transport.oneway(rc.command);
499            }
500        }
501    
502        public void add(URI uris[]) {
503    
504            synchronized (reconnectMutex) {
505                for (int i = 0; i < uris.length; i++) {
506                    URI uri = uris[i];
507    
508                    boolean match = false;
509                    for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
510                        FanoutTransportHandler th = iter.next();
511                        if (th.uri.equals(uri)) {
512                            match = true;
513                            break;
514                        }
515                    }
516                    if (!match) {
517                        FanoutTransportHandler th = new FanoutTransportHandler(uri);
518                        transports.add(th);
519                        reconnect();
520                    }
521                }
522            }
523    
524        }
525    
526        public void remove(URI uris[]) {
527    
528            synchronized (reconnectMutex) {
529                for (int i = 0; i < uris.length; i++) {
530                    URI uri = uris[i];
531    
532                    for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
533                        FanoutTransportHandler th = iter.next();
534                        if (th.uri.equals(uri)) {
535                            if (th.transport != null) {
536                                ServiceSupport.dispose(th.transport);
537                                connectedCount--;
538                            }
539                            iter.remove();
540                            break;
541                        }
542                    }
543                }
544            }
545    
546        }
547        
548        public void reconnect(URI uri) throws IOException {
549                    add(new URI[]{uri});
550                    
551            }
552    
553    
554        public String getRemoteAddress() {
555            if (primary != null) {
556                if (primary.transport != null) {
557                    return primary.transport.getRemoteAddress();
558                }
559            }
560            return null;
561        }
562    
563        protected void transportListenerOnCommand(Command command) {
564            if (transportListener != null) {
565                transportListener.onCommand(command);
566            }
567        }
568    
569        public boolean isFaultTolerant() {
570            return true;
571        }
572    
573        public boolean isFanOutQueues() {
574            return fanOutQueues;
575        }
576    
577        public void setFanOutQueues(boolean fanOutQueues) {
578            this.fanOutQueues = fanOutQueues;
579        }
580    
581            public boolean isDisposed() {
582                    return disposed;
583            }
584            
585    
586        public boolean isConnected() {
587            return connected;
588        }
589    
590        public int getReceiveCounter() {
591            int rc = 0;
592            synchronized (reconnectMutex) {
593                for (FanoutTransportHandler th : transports) {
594                    if (th.transport != null) {
595                        rc += th.transport.getReceiveCounter();
596                    }
597                }
598            }
599            return rc;
600        }
601    }