001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport;
020    import java.net.URI;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    
025    import javax.jms.ExceptionListener;
026    import javax.jms.JMSException;
027    
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    import org.activemq.UnsupportedWireFormatException;
031    import org.activemq.broker.BrokerConnector;
032    import org.activemq.io.WireFormat;
033    import org.activemq.message.Packet;
034    import org.activemq.message.PacketListener;
035    import org.activemq.message.Receipt;
036    import org.activemq.message.ReceiptHolder;
037    import org.activemq.message.WireFormatInfo;
038    import org.activemq.util.ExecutorHelper;
039    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
040    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
041    import EDU.oswego.cs.dl.util.concurrent.Executor;
042    
043    /**
044     * Some basic functionality, common across most transport implementations of channels
045     * 
046     * @version $Revision: 1.1.1.1 $
047     */
048    public abstract class TransportChannelSupport implements TransportChannel {
049        private static final Log log = LogFactory.getLog(TransportChannelSupport.class);
050        private CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
051        private ConcurrentHashMap requestMap = new ConcurrentHashMap();
052        private PacketListener packetListener;
053        private ExceptionListener exceptionListener;
054        private String clientID;
055        private TransportChannelListener transportChannelListener;
056        private long lastReceiptTimstamp = 0;
057        private boolean serverSide;
058        protected boolean pendingStop = false;
059        protected boolean transportConnected = true;
060        protected WireFormat currentWireFormat;
061        protected boolean cachingEnabled = false;
062        protected boolean noDelay = false;
063        protected boolean usedInternally = false; //denotes if transport is used by an internal Connection
064        
065        
066        
067        protected TransportChannelSupport(){
068        }
069        
070        protected TransportChannelSupport(WireFormat wf){
071            this.currentWireFormat = wf;
072        }
073    
074        /**
075         * Give the TransportChannel a hint it's about to stop
076         * 
077         * @param pendingStop
078         */
079        public void setPendingStop(boolean pendingStop) {
080            this.pendingStop = pendingStop;
081        }
082    
083        /**
084         * @return true if the channel is about to stop
085         */
086        public boolean isPendingStop() {
087            return pendingStop;
088        }
089        
090        /**
091        * set the wire format to be used by this channel
092        * @param wireformat
093        */
094       public void setWireFormat(WireFormat wireformat){
095           currentWireFormat = wireformat;
096       }
097       
098       /**
099        * Get the current wireformat used by this channel
100        * @return the current wire format - or null if not set
101        */
102       public WireFormat getWireFormat(){
103           return currentWireFormat;
104       }
105    
106        /**
107         * close the channel
108         */
109        public void stop() {
110            transportConnected = false;
111            Map map = new HashMap(this.requestMap);
112            for (Iterator i = map.values().iterator();i.hasNext();) {
113                ReceiptHolder rh = (ReceiptHolder) i.next();
114                rh.close();
115            }
116            map.clear();
117            requestMap.clear();
118            if (transportChannelListener != null) {
119                transportChannelListener.removeClient(this);
120            }
121            exceptionListener = null;
122            packetListener = null;
123        }
124    
125        /**
126         * synchronously send a Packet
127         * 
128         * @param packet
129         * @return a Receipt
130         * @throws JMSException
131         */
132        public Receipt send(Packet packet) throws JMSException {
133            return send(packet, 0);
134        }
135    
136        /**
137         * Synchronously send a Packet
138         * 
139         * @param packet packet to send
140         * @param timeout amount of time to wait for a receipt
141         * @return the Receipt
142         * @throws JMSException
143         */
144        public Receipt send(Packet packet, int timeout) throws JMSException {
145            ReceiptHolder rh = asyncSendWithReceipt(packet);
146            Receipt result = rh.getReceipt(timeout);
147            return result;
148        }
149        
150        /**
151         * Asynchronously send a Packet with receipt.
152         * 
153         * @param packet the packet to send
154         * @return a ReceiptHolder for the packet
155         * @throws JMSException
156         */
157            public ReceiptHolder asyncSendWithReceipt(Packet packet) throws JMSException {
158            ReceiptHolder rh = new ReceiptHolder();
159            requestMap.put(new Short(packet.getId()), rh);
160            Packet response = doAsyncSend(packet);
161            if (response != null && response instanceof Receipt){
162                rh.setReceipt((Receipt)response);
163            }
164            return rh;
165            }
166    
167        // Properties
168        //-------------------------------------------------------------------------
169        /**
170         * @return the transportChannelListener
171         */
172        public TransportChannelListener getTransportChannelListener() {
173            return transportChannelListener;
174        }
175    
176        /**
177         * @param transportChannelListener
178         */
179        public void setTransportChannelListener(TransportChannelListener transportChannelListener) {
180            this.transportChannelListener = transportChannelListener;
181        }
182    
183        /**
184         * Add a listener for changes in a channels status
185         * 
186         * @param listener
187         */
188        public void addTransportStatusEventListener(TransportStatusEventListener listener) {
189            listeners.add(listener);
190        }
191    
192        /**
193         * Remove a listener for changes in a channels status
194         * 
195         * @param listener
196         */
197        public void removeTransportStatusEventListener(TransportStatusEventListener listener) {
198            listeners.remove(listener);
199        }
200    
201        /**
202         * @return the clientID
203         */
204        public String getClientID() {
205            return clientID;
206        }
207    
208        /**
209         * @param clientID set the clientID
210         */
211        public void setClientID(String clientID) {
212            this.clientID = clientID;
213        }
214    
215        /**
216         * @return the exception listener
217         */
218        public ExceptionListener getExceptionListener() {
219            return exceptionListener;
220        }
221    
222        /**
223         * @return the packet listener
224         */
225        public PacketListener getPacketListener() {
226            return packetListener;
227        }
228    
229        /**
230         * Set a listener for Packets
231         * 
232         * @param l
233         */
234        public void setPacketListener(PacketListener l) {
235            this.packetListener = l;
236        }
237    
238        /**
239         * Set an exception listener to listen for asynchronously generated exceptions
240         * 
241         * @param listener
242         */
243        public void setExceptionListener(ExceptionListener listener) {
244            this.exceptionListener = listener;
245        }
246    
247        /**
248         * @return true if server side
249         */
250        public boolean isServerSide() {
251            return serverSide;
252        }
253    
254        /**
255         * @param serverSide
256         */
257        public void setServerSide(boolean serverSide) {
258            this.serverSide = serverSide;
259        }
260        
261        /**
262         * @return true if the transport channel is active,
263         * this value will be false through reconnecting
264         */
265        public boolean isTransportConnected(){
266            return transportConnected;
267        }
268        
269        protected void setTransportConnected(boolean value){
270            transportConnected = value;
271        }
272        
273        /**
274         * Some transports rely on an embedded broker (beer based protocols)
275         * @return true if an embedded broker required
276         */
277        public boolean requiresEmbeddedBroker(){
278            return false;
279        }
280        
281        /**
282         * Some transports that rely on an embedded broker need to
283         * create the connector used by the broker
284         * @return the BrokerConnector or null if not applicable
285         * @throws JMSException
286         */
287        public BrokerConnector getEmbeddedBrokerConnector() throws JMSException{
288            return null;
289        }
290        
291        
292        /**
293         * @return true if this transport is multicast based (i.e. broadcasts to multiple nodes)
294         */
295        public boolean isMulticast(){
296            return false;
297        }
298        
299        /**
300         * Can this wireformat process packets of this version
301         * @param version the version number to test
302         * @return true if can accept the version
303         */
304        public boolean canProcessWireFormatVersion(int version){
305            return true;
306        }
307        
308            public long getLastReceiptTimestamp() {
309                    return lastReceiptTimstamp;
310            }
311        
312        /**
313         * @return Returns the usedInternally.
314         */
315        public boolean isUsedInternally() {
316            return usedInternally;
317        }
318        /**
319         * @param usedInternally The usedInternally to set.
320         */
321        public void setUsedInternally(boolean usedInternally) {
322            this.usedInternally = usedInternally;
323        }
324        
325        /**
326         * Does the transport support wire format version info
327         * @return
328         */
329        public boolean doesSupportWireFormatVersioning(){
330            return true;
331        }
332        
333        /**
334         * @return the current version of this wire format
335         */
336        public int getCurrentWireFormatVersion(){
337            return -1;
338        }
339        
340    
341           
342        /**
343         * some transports/wire formats will implement their own fragementation
344         * @return true unless a transport/wire format supports it's own fragmentation
345         */
346        public boolean doesSupportMessageFragmentation(){
347            return getWireFormat() != null && getWireFormat().doesSupportMessageFragmentation();
348        }
349        
350        
351        /**
352         * Some transports/wireformats will not be able to understand compressed messages
353         * @return true unless a transport/wire format cannot understand compression
354         */
355        public boolean doesSupportMessageCompression(){
356            return getWireFormat() != null && getWireFormat().doesSupportMessageCompression();
357        }
358        
359        // Implementation methods
360        //-------------------------------------------------------------------------
361        /**
362         * consume a packet from the channel
363         * 
364         * @param packet
365         * @throws UnsupportedWireFormatException
366         */
367        protected void doConsumePacket(Packet packet) {
368            doConsumePacket(packet, packetListener);
369        }
370    
371        protected void doConsumePacket(Packet packet, PacketListener listener) {
372            if (!doHandleReceipt(packet) && !doHandleWireFormat(packet)) {
373                if (listener != null) {
374                    listener.consume(packet);
375                }
376                else {
377                    log.warn("No packet listener set to receive packets");
378                }
379            }
380        }
381    
382        protected boolean doHandleReceipt(Packet packet) {
383            boolean result = false;
384            if (packet != null) {
385                if (packet.isReceipt()) {
386                    lastReceiptTimstamp = System.currentTimeMillis();
387                    result = true;
388                    Receipt receipt = (Receipt) packet;
389                    ReceiptHolder rh = (ReceiptHolder) requestMap.remove(new Short(receipt.getCorrelationId()));
390                    if (rh != null) {
391                        rh.setReceipt(receipt);
392                    }
393                    else {
394                        log.warn("No Packet found to match Receipt correlationId: " + receipt.getCorrelationId());
395                    }
396                }
397            }
398            return result;
399        }
400    
401        protected boolean doHandleWireFormat(Packet packet) {
402            boolean handled = false;
403            if (packet.getPacketType() == Packet.WIRE_FORMAT_INFO) {
404                handled = true;
405                WireFormatInfo info = (WireFormatInfo) packet;
406                if (!canProcessWireFormatVersion(info.getVersion())) {
407                    setPendingStop(true);
408                    String errorStr = "Cannot process wire format of version: " + info.getVersion();
409                    TransportStatusEvent event = new TransportStatusEvent();
410                    event.setChannelStatus(TransportStatusEvent.FAILED);
411                    fireStatusEvent(event);
412                    onAsyncException(new UnsupportedWireFormatException(errorStr));
413                    stop();
414                }
415                else {
416                    if (log.isDebugEnabled()) {
417                        log.debug(this + " using wire format version: " + info.getVersion());
418                    }
419                }
420            }
421            return handled;
422        }
423    
424        /**
425         * send a Packet to the raw underlying transport This method is here to allow specific implementations to override
426         * this method
427         * 
428         * @param packet
429         * @return a response or null
430         * @throws JMSException
431         */
432        protected Packet doAsyncSend(Packet packet) throws JMSException {
433            asyncSend(packet);
434            return null;
435        }
436    
437        /**
438         * Handles an exception thrown while performing async dispatch of messages
439         * 
440         * @param e
441         */
442        protected void onAsyncException(JMSException e) {
443            if (exceptionListener != null) {
444                transportConnected = false;
445                exceptionListener.onException(e);
446            }
447            else {
448                log.warn("Caught exception dispatching message and no ExceptionListener registered: " + e, e);
449            }
450        }
451    
452        /**
453         * Fire status event to any status event listeners
454         * 
455         * @param remoteURI
456         * @param status
457         */
458        protected void fireStatusEvent(URI remoteURI, int status) {
459            TransportStatusEvent event = new TransportStatusEvent();
460            event.setChannelStatus(status);
461            event.setRemoteURI(remoteURI);
462            fireStatusEvent(event);
463        }
464    
465        /**
466         * Fire status event to any status event listeners
467         * 
468         * @param event
469         */
470        protected void fireStatusEvent(TransportStatusEvent event) {
471            if (event != null) {
472                for (Iterator i = listeners.iterator();i.hasNext();) {
473                    TransportStatusEventListener l = (TransportStatusEventListener) i.next();
474                    l.statusChanged(event);
475                }
476            }
477        }
478    
479        /**
480         * A helper method to stop the execution of an executor
481         * 
482         * @param executor the executor or null if one is not created yet
483         * @throws InterruptedException
484         * @throws JMSException
485         */
486        protected void stopExecutor(Executor executor) throws InterruptedException, JMSException {
487            ExecutorHelper.stopExecutor(executor);
488        }
489        /**
490         * @return Returns the cachingEnabled.
491         */
492        public boolean isCachingEnabled() {
493            return cachingEnabled;
494        }
495        /**
496         * @param cachingEnabled The cachingEnabled to set.
497         */
498        public void setCachingEnabled(boolean cachingEnabled) {
499            this.cachingEnabled = cachingEnabled;
500        }
501        
502        /**
503         * Inform Transport to send messages as quickly
504         * as possible - for Tcp - this means disabling Nagles,
505         * which on OSX may provide better performance for sync
506         * sends
507         * @return Returns the noDelay.
508         */
509        public boolean isNoDelay() {
510            return noDelay;
511        }
512        /**
513         * @param noDelay The noDelay to set.
514         */
515        public void setNoDelay(boolean noDelay) {
516            this.noDelay = noDelay;
517        }
518    }