001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.tcp;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.IOException;
022    import java.io.InterruptedIOException;
023    import java.net.InetAddress;
024    import java.net.InetSocketAddress;
025    import java.net.Socket;
026    import java.net.SocketException;
027    import java.net.SocketTimeoutException;
028    import java.net.URI;
029    import java.net.UnknownHostException;
030    import java.util.HashMap;
031    import java.util.Map;
032    import java.util.concurrent.CountDownLatch;
033    import java.util.concurrent.SynchronousQueue;
034    import java.util.concurrent.ThreadFactory;
035    import java.util.concurrent.ThreadPoolExecutor;
036    import java.util.concurrent.TimeUnit;
037    import java.util.concurrent.atomic.AtomicReference;
038    
039    import javax.net.SocketFactory;
040    
041    import org.apache.activemq.Service;
042    import org.apache.activemq.transport.Transport;
043    import org.apache.activemq.transport.TransportLoggerFactory;
044    import org.apache.activemq.transport.TransportThreadSupport;
045    import org.apache.activemq.util.IntrospectionSupport;
046    import org.apache.activemq.util.ServiceStopper;
047    import org.apache.activemq.wireformat.WireFormat;
048    import org.apache.commons.logging.Log;
049    import org.apache.commons.logging.LogFactory;
050    
051    /**
052     * An implementation of the {@link Transport} interface using raw tcp/ip
053     * 
054     * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
055     * @version $Revision$
056     */
057    public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
058        private static final Log LOG = LogFactory.getLog(TcpTransport.class);
059        private static final ThreadPoolExecutor SOCKET_CLOSE;
060        protected final URI remoteLocation;
061        protected final URI localLocation;
062        protected final WireFormat wireFormat;
063    
064        protected int connectionTimeout = 30000;
065        protected int soTimeout;
066        protected int socketBufferSize = 64 * 1024;
067        protected int ioBufferSize = 8 * 1024;
068        protected boolean closeAsync=true;
069        protected Socket socket;
070        protected DataOutputStream dataOut;
071        protected DataInputStream dataIn;
072        protected TcpBufferedOutputStream buffOut = null;
073        /**
074         * trace=true -> the Transport stack where this TcpTransport
075         * object will be, will have a TransportLogger layer
076         * trace=false -> the Transport stack where this TcpTransport
077         * object will be, will NOT have a TransportLogger layer, and therefore
078         * will never be able to print logging messages.
079         * This parameter is most probably set in Connection or TransportConnector URIs.
080         */
081        protected boolean trace = false;
082        /**
083         * Name of the LogWriter implementation to use.
084         * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
085         * This parameter is most probably set in Connection or TransportConnector URIs.
086         */
087        protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
088        /**
089         * Specifies if the TransportLogger will be manageable by JMX or not.
090         * Also, as long as there is at least 1 TransportLogger which is manageable,
091         * a TransportLoggerControl MBean will me created.
092         */
093        protected boolean dynamicManagement = false;
094        /**
095         * startLogging=true -> the TransportLogger object of the Transport stack
096         * will initially write messages to the log.
097         * startLogging=false -> the TransportLogger object of the Transport stack
098         * will initially NOT write messages to the log.
099         * This parameter only has an effect if trace == true.
100         * This parameter is most probably set in Connection or TransportConnector URIs.
101         */
102        protected boolean startLogging = true;
103        /**
104         * Specifies the port that will be used by the JMX server to manage
105         * the TransportLoggers.
106         * This should only be set in an URI by a client (producer or consumer) since
107         * a broker will already create a JMX server.
108         * It is useful for people who test a broker and clients in the same machine
109         * and want to control both via JMX; a different port will be needed.
110         */
111        protected int jmxPort = 1099;
112        protected boolean useLocalHost = true;
113        protected int minmumWireFormatVersion;
114        protected SocketFactory socketFactory;
115        protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
116    
117        private Map<String, Object> socketOptions;
118        private Boolean keepAlive;
119        private Boolean tcpNoDelay;
120        private Thread runnerThread;
121        private volatile int receiveCounter;
122    
123        /**
124         * Connect to a remote Node - e.g. a Broker
125         * 
126         * @param wireFormat
127         * @param socketFactory
128         * @param remoteLocation
129         * @param localLocation - e.g. local InetAddress and local port
130         * @throws IOException
131         * @throws UnknownHostException
132         */
133        public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
134                            URI localLocation) throws UnknownHostException, IOException {
135            this.wireFormat = wireFormat;
136            this.socketFactory = socketFactory;
137            try {
138                this.socket = socketFactory.createSocket();
139            } catch (SocketException e) {
140                this.socket = null;
141            }
142            this.remoteLocation = remoteLocation;
143            this.localLocation = localLocation;
144            setDaemon(false);
145        }
146    
147        /**
148         * Initialize from a server Socket
149         * 
150         * @param wireFormat
151         * @param socket
152         * @throws IOException
153         */
154        public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
155            this.wireFormat = wireFormat;
156            this.socket = socket;
157            this.remoteLocation = null;
158            this.localLocation = null;
159            setDaemon(true);
160        }
161    
162        /**
163         * A one way asynchronous send
164         */
165        public void oneway(Object command) throws IOException {
166            checkStarted();
167            wireFormat.marshal(command, dataOut);
168            dataOut.flush();
169        }
170    
171        /**
172         * @return pretty print of 'this'
173         */
174        public String toString() {
175            return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
176        }
177    
178        /**
179         * reads packets from a Socket
180         */
181        public void run() {
182            LOG.trace("TCP consumer thread for " + this + " starting");
183            this.runnerThread=Thread.currentThread();
184            try {
185                while (!isStopped()) {
186                    doRun();
187                }
188            } catch (IOException e) {
189                stoppedLatch.get().countDown();
190                onException(e);
191            } catch (Throwable e){
192                stoppedLatch.get().countDown();
193                IOException ioe=new IOException("Unexpected error occured");
194                ioe.initCause(e);
195                onException(ioe);
196            }finally {
197                stoppedLatch.get().countDown();
198            }
199        }
200    
201        protected void doRun() throws IOException {
202            try {
203                Object command = readCommand();
204                doConsume(command);
205            } catch (SocketTimeoutException e) {
206            } catch (InterruptedIOException e) {
207            }
208        }
209    
210        protected Object readCommand() throws IOException {
211            return wireFormat.unmarshal(dataIn);
212        }
213    
214        // Properties
215        // -------------------------------------------------------------------------
216    
217        public boolean isTrace() {
218            return trace;
219        }
220    
221        public void setTrace(boolean trace) {
222            this.trace = trace;
223        }
224        
225        public String getLogWriterName() {
226            return logWriterName;
227        }
228    
229        public void setLogWriterName(String logFormat) {
230            this.logWriterName = logFormat;
231        }
232    
233        public boolean isDynamicManagement() {
234            return dynamicManagement;
235        }
236    
237        public void setDynamicManagement(boolean useJmx) {
238            this.dynamicManagement = useJmx;
239        }
240    
241        public boolean isStartLogging() {
242            return startLogging;
243        }
244    
245        public void setStartLogging(boolean startLogging) {
246            this.startLogging = startLogging;
247        }
248    
249        public int getJmxPort() {
250            return jmxPort;
251        }
252    
253        public void setJmxPort(int jmxPort) {
254            this.jmxPort = jmxPort;
255        }
256        
257        public int getMinmumWireFormatVersion() {
258            return minmumWireFormatVersion;
259        }
260    
261        public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
262            this.minmumWireFormatVersion = minmumWireFormatVersion;
263        }
264    
265        public boolean isUseLocalHost() {
266            return useLocalHost;
267        }
268    
269        /**
270         * Sets whether 'localhost' or the actual local host name should be used to
271         * make local connections. On some operating systems such as Macs its not
272         * possible to connect as the local host name so localhost is better.
273         */
274        public void setUseLocalHost(boolean useLocalHost) {
275            this.useLocalHost = useLocalHost;
276        }
277    
278        public int getSocketBufferSize() {
279            return socketBufferSize;
280        }
281    
282        /**
283         * Sets the buffer size to use on the socket
284         */
285        public void setSocketBufferSize(int socketBufferSize) {
286            this.socketBufferSize = socketBufferSize;
287        }
288    
289        public int getSoTimeout() {
290            return soTimeout;
291        }
292    
293        /**
294         * Sets the socket timeout
295         */
296        public void setSoTimeout(int soTimeout) {
297            this.soTimeout = soTimeout;
298        }
299    
300        public int getConnectionTimeout() {
301            return connectionTimeout;
302        }
303    
304        /**
305         * Sets the timeout used to connect to the socket
306         */
307        public void setConnectionTimeout(int connectionTimeout) {
308            this.connectionTimeout = connectionTimeout;
309        }
310    
311        public Boolean getKeepAlive() {
312            return keepAlive;
313        }
314    
315        /**
316         * Enable/disable TCP KEEP_ALIVE mode
317         */
318        public void setKeepAlive(Boolean keepAlive) {
319            this.keepAlive = keepAlive;
320        }
321    
322        public Boolean getTcpNoDelay() {
323            return tcpNoDelay;
324        }
325    
326        /**
327         * Enable/disable the TCP_NODELAY option on the socket
328         */
329        public void setTcpNoDelay(Boolean tcpNoDelay) {
330            this.tcpNoDelay = tcpNoDelay;
331        }
332    
333        /**
334         * @return the ioBufferSize
335         */
336        public int getIoBufferSize() {
337            return this.ioBufferSize;
338        }
339    
340        /**
341         * @param ioBufferSize the ioBufferSize to set
342         */
343        public void setIoBufferSize(int ioBufferSize) {
344            this.ioBufferSize = ioBufferSize;
345        }
346        
347        /**
348         * @return the closeAsync
349         */
350        public boolean isCloseAsync() {
351            return closeAsync;
352        }
353    
354        /**
355         * @param closeAsync the closeAsync to set
356         */
357        public void setCloseAsync(boolean closeAsync) {
358            this.closeAsync = closeAsync;
359        }
360    
361        // Implementation methods
362        // -------------------------------------------------------------------------
363        protected String resolveHostName(String host) throws UnknownHostException {
364            String localName = InetAddress.getLocalHost().getHostName();
365            if (localName != null && isUseLocalHost()) {
366                if (localName.equals(host)) {
367                    return "localhost";
368                }
369            }
370            return host;
371        }
372    
373        /**
374         * Configures the socket for use
375         * 
376         * @param sock
377         * @throws SocketException
378         */
379        protected void initialiseSocket(Socket sock) throws SocketException {
380            if (socketOptions != null) {
381                IntrospectionSupport.setProperties(socket, socketOptions);
382            }
383    
384            try {
385                sock.setReceiveBufferSize(socketBufferSize);
386                sock.setSendBufferSize(socketBufferSize);
387            } catch (SocketException se) {
388                LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
389                LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
390            }
391            sock.setSoTimeout(soTimeout);
392    
393            if (keepAlive != null) {
394                sock.setKeepAlive(keepAlive.booleanValue());
395            }
396            if (tcpNoDelay != null) {
397                sock.setTcpNoDelay(tcpNoDelay.booleanValue());
398            }
399        }
400    
401        protected void doStart() throws Exception {
402            connect();
403            stoppedLatch.set(new CountDownLatch(1));
404            super.doStart();
405        }
406    
407        protected void connect() throws Exception {
408    
409            if (socket == null && socketFactory == null) {
410                throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
411            }
412    
413            InetSocketAddress localAddress = null;
414            InetSocketAddress remoteAddress = null;
415    
416            if (localLocation != null) {
417                localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
418                                                     localLocation.getPort());
419            }
420    
421            if (remoteLocation != null) {
422                String host = resolveHostName(remoteLocation.getHost());
423                remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
424            }
425    
426            if (socket != null) {
427    
428                if (localAddress != null) {
429                    socket.bind(localAddress);
430                }
431    
432                // If it's a server accepted socket.. we don't need to connect it
433                // to a remote address.
434                if (remoteAddress != null) {
435                    if (connectionTimeout >= 0) {
436                        socket.connect(remoteAddress, connectionTimeout);
437                    } else {
438                        socket.connect(remoteAddress);
439                    }
440                }
441    
442            } else {
443                // For SSL sockets.. you can't create an unconnected socket :(
444                // This means the timout option are not supported either.
445                if (localAddress != null) {
446                    socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
447                                                        localAddress.getAddress(), localAddress.getPort());
448                } else {
449                    socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
450                }
451            }
452    
453            initialiseSocket(socket);
454            initializeStreams();
455        }
456    
457        protected void doStop(ServiceStopper stopper) throws Exception {
458            if (LOG.isDebugEnabled()) {
459                LOG.debug("Stopping transport " + this);
460            }
461    
462            // Closing the streams flush the sockets before closing.. if the socket
463            // is hung.. then this hangs the close.
464            // closeStreams();
465            if (socket != null) {
466                if (closeAsync) {
467                    //closing the socket can hang also 
468                    final CountDownLatch latch = new CountDownLatch(1);
469                    
470                    SOCKET_CLOSE.execute(new Runnable() {
471        
472                        public void run() {
473                            try {
474                                socket.close();
475                            } catch (IOException e) {
476                                LOG.debug("Caught exception closing socket",e);
477                            }finally {
478                                latch.countDown();
479                            }
480                        }
481                        
482                    });
483                    latch.await(1,TimeUnit.SECONDS);
484                }else {
485                    try {
486                        socket.close();
487                    } catch (IOException e) {
488                        LOG.debug("Caught exception closing socket",e);
489                    }
490                }
491               
492            }
493        }
494    
495        /**
496         * Override so that stop() blocks until the run thread is no longer running.
497         */
498        @Override
499        public void stop() throws Exception {
500            super.stop();
501            CountDownLatch countDownLatch = stoppedLatch.get();
502            if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
503                countDownLatch.await(1,TimeUnit.SECONDS);
504            }
505        }
506    
507        protected void initializeStreams() throws Exception {
508            TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
509                @Override
510                public int read() throws IOException {
511                    receiveCounter++;
512                    return super.read();
513                }
514                @Override
515                public int read(byte[] b, int off, int len) throws IOException {
516                    receiveCounter++;
517                    return super.read(b, off, len);
518                }
519                @Override
520                public long skip(long n) throws IOException {
521                    receiveCounter++;
522                    return super.skip(n);
523                }
524                @Override
525                protected void fill() throws IOException {
526                    receiveCounter++;
527                    super.fill();
528                }
529            };
530            this.dataIn = new DataInputStream(buffIn);
531            buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
532            this.dataOut = new DataOutputStream(buffOut);
533        }
534    
535        protected void closeStreams() throws IOException {
536            if (dataOut != null) {
537                dataOut.close();
538            }
539            if (dataIn != null) {
540                dataIn.close();
541            }
542        }
543    
544        public void setSocketOptions(Map<String, Object> socketOptions) {
545            this.socketOptions = new HashMap<String, Object>(socketOptions);
546        }
547    
548        public String getRemoteAddress() {
549            if (socket != null) {
550                return "" + socket.getRemoteSocketAddress();
551            }
552            return null;
553        }
554        
555        @Override
556        public <T> T narrow(Class<T> target) {
557            if (target == Socket.class) {
558                return target.cast(socket);
559            } else if ( target == TcpBufferedOutputStream.class) {
560                return target.cast(buffOut);
561            }
562            return super.narrow(target);
563        }
564        
565    
566        static {
567            SOCKET_CLOSE =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
568                public Thread newThread(Runnable runnable) {
569                    Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable);
570                    thread.setPriority(Thread.MAX_PRIORITY);
571                    thread.setDaemon(true);
572                    return thread;
573                }
574            });
575        }
576    
577    
578        public int getReceiveCounter() {
579            return receiveCounter;
580        }
581    }