001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.transport.composite;
019    
020    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
021    import org.apache.commons.logging.Log;
022    import org.apache.commons.logging.LogFactory;
023    import org.activemq.TimeoutExpiredException;
024    import org.activemq.io.WireFormat;
025    import org.activemq.message.Packet;
026    import org.activemq.message.PacketListener;
027    import org.activemq.message.Receipt;
028    import org.activemq.message.ReceiptHolder;
029    import org.activemq.transport.TransportChannel;
030    import org.activemq.transport.TransportChannelProvider;
031    import org.activemq.transport.TransportChannelSupport;
032    import org.activemq.transport.TransportStatusEvent;
033    import org.activemq.transport.TransportStatusEventListener;
034    
035    import javax.jms.ExceptionListener;
036    import javax.jms.JMSException;
037    import java.net.URI;
038    import java.util.ArrayList;
039    import java.util.Collections;
040    import java.util.List;
041    
042    /**
043     * A Compsite implementation of a TransportChannel
044     *
045     * @version $Revision: 1.2 $
046     */
047    public class CompositeTransportChannel extends TransportChannelSupport implements TransportStatusEventListener {
048        private static final Log log = LogFactory.getLog(CompositeTransportChannel.class);
049    
050        protected List uris;
051        protected URI currentURI;
052        protected TransportChannel channel;
053        protected SynchronizedBoolean closed;
054        protected SynchronizedBoolean started;
055        protected int maximumRetries = 10;
056        protected long failureSleepTime = 500L;
057        protected long establishConnectionTimeout = 30000L;
058        protected long maximumTimeout = 30000L;
059        protected boolean incrementTimeout = true;
060    
061    
062        public CompositeTransportChannel(WireFormat wireFormat) {
063            super(wireFormat);
064            this.uris = Collections.synchronizedList(new ArrayList());
065            closed = new SynchronizedBoolean(false);
066            started = new SynchronizedBoolean(false);
067        }
068    
069        public CompositeTransportChannel(WireFormat wireFormat, List uris) {
070            this(wireFormat);
071            this.uris.addAll(uris);
072        }
073    
074        public String toString() {
075            return "CompositeTransportChannel: " + channel;
076        }
077    
078        public void start() throws JMSException {
079            if (started.commit(false, true)) {
080    //            // Since we could take a LONG time to connect to one of the servers in the connection list,
081    //            // do the connect async so that the client has a chance to stop() the channel if he needs to shutdown
082    //            // before a connection is established.
083    //            new Thread() { 
084    //                public void run() {
085    //                    try {
086                            establishConnection(establishConnectionTimeout);
087    //                    } catch (JMSException e) {
088    //                        if(getExceptionListener()!=null) {
089    //                            getExceptionListener().onException(e);
090    //                        }
091    //                    }
092                        fireStatusEvent(new TransportStatusEvent(CompositeTransportChannel.this,TransportStatusEvent.CONNECTED));
093    //                }
094    //            }.start();
095            }
096        }
097    
098        /**
099         * close the channel
100         */
101        public void stop() {
102            if (closed.commit(false, true)) {
103                if (channel != null) {
104                    try {
105                        channel.stop();
106                    }
107                    catch (Exception e) {
108                        log.warn("Caught while closing: " + e + ". Now Closed", e);
109                    }
110                    finally {
111                        channel = null;
112                        super.stop();
113                    }
114                }
115            }
116        }
117        
118        /**
119         * Forces disconnect by delegating to the child channel
120         */
121        public void forceDisconnect() {
122            if (channel != null) channel.forceDisconnect();
123        }
124        
125        public Receipt send(Packet packet) throws JMSException {
126            return getChannel().send(packet);
127        }
128    
129    
130        public Receipt send(Packet packet, int timeout) throws JMSException {
131            return getChannel().send(packet, timeout);
132        }
133    
134    
135        public void asyncSend(Packet packet) throws JMSException {
136            getChannel().asyncSend(packet);
137        }
138        
139            public ReceiptHolder asyncSendWithReceipt(Packet packet) throws JMSException {
140                    return getChannel().asyncSendWithReceipt(packet);
141            }
142    
143        public void setPacketListener(PacketListener listener) {
144            super.setPacketListener(listener);
145            if (channel != null) {
146                channel.setPacketListener(listener);
147            }
148        }
149    
150        public void setExceptionListener(ExceptionListener listener) {
151            super.setExceptionListener(listener);
152            if (channel != null) {
153                channel.setExceptionListener(listener);
154            }
155        }
156    
157    
158        public boolean isMulticast() {
159            return false;
160        }
161    
162        // Properties
163        //-------------------------------------------------------------------------
164    
165    
166        /**
167         * Return the maximum amount of time spent trying to establish a connection
168         * or a negative number to keep going forever
169         *
170         * @return
171         */
172        public long getEstablishConnectionTimeout() {
173            return establishConnectionTimeout;
174        }
175    
176        public void setEstablishConnectionTimeout(long establishConnectionTimeout) {
177            this.establishConnectionTimeout = establishConnectionTimeout;
178        }
179    
180        public int getMaximumRetries() {
181            return maximumRetries;
182        }
183    
184        public void setMaximumRetries(int maximumRetries) {
185            this.maximumRetries = maximumRetries;
186        }
187    
188        public long getFailureSleepTime() {
189            return failureSleepTime;
190        }
191    
192        public void setFailureSleepTime(long failureSleepTime) {
193            this.failureSleepTime = failureSleepTime;
194        }
195    
196        public List getUris() {
197            return uris;
198        }
199    
200        public void setUris(List list) {
201            synchronized (uris) {
202                uris.clear();
203                uris.addAll(list);
204            }
205        }
206        
207    
208        /**
209         * @return Returns the incrementTimeout.
210         */
211        public boolean isIncrementTimeout() {
212            return incrementTimeout;
213        }
214        /**
215         * @param incrementTimeout The incrementTimeout to set.
216         */
217        public void setIncrementTimeout(boolean incrementTimeout) {
218            this.incrementTimeout = incrementTimeout;
219        }
220        /**
221         * @return Returns the maximumTimeout.
222         */
223        public long getMaximumTimeout() {
224            return maximumTimeout;
225        }
226        /**
227         * @param maximumTimeout The maximumTimeout to set.
228         */
229        public void setMaximumTimeout(long maximumTimeout) {
230            this.maximumTimeout = maximumTimeout;
231        }
232    
233        /**
234         * @param clientID set the clientID
235         */
236        public void setClientID(String clientID) {
237            super.setClientID(clientID);
238    
239            /* If there is an existing channel, reflect the new clientID to the channel */
240            if (channel != null) {
241                channel.setClientID(clientID);
242            }
243        }
244    
245        /**
246         * Can this wireformat process packets of this version
247         * @param version the version number to test
248         * @return true if can accept the version
249         */
250        public boolean canProcessWireFormatVersion(int version){
251            return channel != null ? channel.canProcessWireFormatVersion(version) : true;
252        }
253        
254        /**
255         * @return the current version of this wire format
256         */
257        public int getCurrentWireFormatVersion(){
258            return channel != null ? channel.getCurrentWireFormatVersion() : 1;
259        }
260    
261        /**
262         * Access to the current channel if one is active
263         * @throws JMSException if no channel is available
264         */
265        public TransportChannel getChannel() throws JMSException {
266            if (channel == null) {
267                throw new JMSException("No TransportChannel connection available");
268            }
269            return channel;
270        }
271    
272        // Implementation methods
273        //-------------------------------------------------------------------------
274        
275        protected void establishConnection(long timeout) throws JMSException {
276            // lets try connect
277            boolean connected = false;
278            long time = failureSleepTime;
279            long startTime = System.currentTimeMillis();
280            for (int i = 0;!connected && (i < maximumRetries || maximumRetries <= 0) && !closed.get() && !isPendingStop();i++) {
281                List list = new ArrayList(getUris());
282                if (i > 0) {
283                    if (maximumRetries > 0 || timeout > 0) {
284                        long current = System.currentTimeMillis();
285                        if (timeout >= 0) {
286                            if (current + time > startTime + timeout) {
287                                time = startTime + timeout - current;
288                            }
289                        }
290                        if (current > startTime + timeout || time <= 0) {
291                            throw new TimeoutExpiredException("Could not connect to any of the URIs: " + list);
292                        }
293                    }
294                    log.info("Could not connect; sleeping for: " + time + " millis and trying again");
295                    try {
296                        Thread.sleep(time);
297                    }
298                    catch (InterruptedException e) {
299                        log.warn("Sleep interupted: " + e, e);
300                    }
301                    if (incrementTimeout && time < maximumTimeout) {
302                        time *= 2;
303                        time = time > maximumTimeout ? maximumTimeout : time;
304                    }
305                }
306                while (!connected && !list.isEmpty() && !closed.get() && !isPendingStop()) {
307                    URI uri = extractURI(list);
308                    try {
309                        attemptToConnect(uri);
310                        configureChannel();
311                        connected = true;
312                        currentURI = uri;
313                    }
314                    catch (JMSException e) {
315                        log.info("Could not connect to: " + uri + ". Reason: " + e);
316                    }
317                }
318            }
319            if (!connected && !closed.get()) {
320                StringBuffer buffer = new StringBuffer("");
321                Object[] uriArray = getUris().toArray();
322                for (int i = 0;i < uriArray.length;i++) {
323                    buffer.append(uriArray[i]);
324                    if (i < (uriArray.length - 1)) {
325                        buffer.append(",");
326                    }
327                }
328                JMSException jmsEx = new JMSException("Failed to connect to resource(s): " + buffer.toString());
329                throw jmsEx;
330            }
331        }
332    
333    
334        protected void configureChannel() {
335            ExceptionListener exceptionListener = getExceptionListener();
336            if (exceptionListener != null) {
337                channel.setExceptionListener(exceptionListener);
338            }
339            PacketListener packetListener = getPacketListener();
340            if (packetListener != null) {
341                channel.setPacketListener(packetListener);
342            }
343            
344            channel.addTransportStatusEventListener(this);
345            channel.setCachingEnabled(isCachingEnabled());
346            channel.setNoDelay(isNoDelay());
347            channel.setUsedInternally(isUsedInternally());
348        }
349    
350        protected URI extractURI(List list) throws JMSException {
351            int idx = 0;
352            if (list.size() > 1) {
353                do {
354                    idx = (int) (Math.random() * list.size());
355                }
356                while (idx < 0 || idx >= list.size());
357            }
358            return (URI) list.remove(idx);
359        }
360    
361        protected void attemptToConnect(URI uri) throws JMSException {
362            WireFormat wf = currentWireFormat.copy();
363            channel = TransportChannelProvider.create(wf, uri);
364            if (started.get()) {
365                channel.start();
366            }
367        }
368        
369            public void statusChanged(TransportStatusEvent event) {
370                    // Delegate to own listeners
371            //set the transport to 'this'
372            event.setTransportChannel(this);
373                    fireStatusEvent(event);
374            }
375            
376            public boolean isTransportConnected() {
377                    return channel == null ? false : channel.isTransportConnected();
378            }
379            
380            public long getLastReceiptTimestamp() {
381                    return channel == null ? System.currentTimeMillis() : channel.getLastReceiptTimestamp();
382            }
383    }