001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport.tcp; 020 021 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; 022 import EDU.oswego.cs.dl.util.concurrent.BoundedChannel; 023 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; 024 import EDU.oswego.cs.dl.util.concurrent.Executor; 025 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 026 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 import org.activemq.io.WireFormat; 030 import org.activemq.io.WireFormatLoader; 031 import org.activemq.message.Packet; 032 import org.activemq.transport.TransportChannelSupport; 033 import org.activemq.transport.TransportStatusEvent; 034 import org.activemq.util.JMSExceptionHelper; 035 036 import javax.jms.JMSException; 037 import java.io.BufferedInputStream; 038 import java.io.DataInputStream; 039 import java.io.DataOutputStream; 040 import java.io.EOFException; 041 import java.io.IOException; 042 import java.io.InterruptedIOException; 043 import java.net.InetAddress; 044 import java.net.InetSocketAddress; 045 import java.net.Socket; 046 import java.net.SocketAddress; 047 import java.net.SocketException; 048 import java.net.SocketTimeoutException; 049 import java.net.URI; 050 import java.net.UnknownHostException; 051 052 /** 053 * A tcp implementation of a TransportChannel 054 * 055 * @version $Revision: 1.2 $ 056 */ 057 public class TcpTransportChannel extends TransportChannelSupport implements Runnable { 058 private static final int DEFAULT_SOCKET_BUFFER_SIZE = 64 * 1024; 059 private static final Log log = LogFactory.getLog(TcpTransportChannel.class); 060 protected Socket socket; 061 protected DataOutputStream dataOut; 062 protected DataInputStream dataIn; 063 064 private WireFormatLoader wireFormatLoader; 065 private SynchronizedBoolean closed; 066 private SynchronizedBoolean started; 067 private Object outboundLock; 068 private Executor executor; 069 private Thread thread; 070 private boolean useAsyncSend = false; 071 private int soTimeout = 10000; 072 private int socketBufferSize = DEFAULT_SOCKET_BUFFER_SIZE; 073 private BoundedChannel exceptionsList; 074 private TcpTransportServerChannel serverChannel; 075 076 /** 077 * Construct basic helpers 078 * 079 * @param wireFormat 080 */ 081 protected TcpTransportChannel(WireFormat wireFormat) { 082 super(wireFormat); 083 this.wireFormatLoader = new WireFormatLoader(wireFormat); 084 closed = new SynchronizedBoolean(false); 085 started = new SynchronizedBoolean(false); 086 // there's not much point logging all exceptions, lets just keep a few around 087 exceptionsList = new BoundedLinkedQueue(10); 088 outboundLock = new Object(); 089 setUseAsyncSend(useAsyncSend); 090 super.setCachingEnabled(true); 091 } 092 093 /** 094 * Connect to a remote Node - e.g. a Broker 095 * 096 * @param wireFormat 097 * @param remoteLocation 098 * @throws JMSException 099 */ 100 public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException { 101 this(wireFormat); 102 try { 103 this.socket = createSocket(remoteLocation); 104 initializeStreams(); 105 } 106 catch (Exception ioe) { 107 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed. " + "URI was: " 108 + remoteLocation + " Reason: " + ioe, ioe); 109 } 110 } 111 112 /** 113 * Connect to a remote Node - e.g. a Broker 114 * 115 * @param wireFormat 116 * @param remoteLocation 117 * @param localLocation - e.g. local InetAddress and local port 118 * @throws JMSException 119 */ 120 public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation, URI localLocation) throws JMSException { 121 this(wireFormat); 122 try { 123 this.socket = createSocket(remoteLocation, localLocation); 124 initializeStreams(); 125 } 126 catch (Exception ioe) { 127 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe); 128 } 129 } 130 131 /** 132 * Initialize from a ServerSocket 133 * @param serverChannel 134 * @param wireFormat 135 * @param socket 136 * @param executor 137 * @throws JMSException 138 */ 139 public TcpTransportChannel(TcpTransportServerChannel serverChannel,WireFormat wireFormat, Socket socket, Executor executor) throws JMSException { 140 this(wireFormat); 141 this.socket = socket; 142 this.executor = executor; 143 this.serverChannel = serverChannel; 144 setServerSide(true); 145 try { 146 initialiseSocket(socket); 147 initializeStreams(); 148 } 149 catch (IOException ioe) { 150 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe); 151 } 152 } 153 154 public TcpTransportChannel(WireFormat wireFormat, Socket socket, Executor executor) throws JMSException { 155 this(wireFormat); 156 this.socket = socket; 157 this.executor = executor; 158 try { 159 initialiseSocket(socket); 160 initializeStreams(); 161 } 162 catch (IOException ioe) { 163 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe); 164 } 165 } 166 167 /** 168 * start listeneing for events 169 * 170 * @throws JMSException if an error occurs 171 */ 172 public void start() throws JMSException { 173 if (started.commit(false, true)) { 174 thread = new Thread(this, toString()); 175 try { 176 if (isServerSide()) { 177 thread.setDaemon(true); 178 readWireFormat(); 179 getWireFormat().registerTransportStreams(dataOut, dataIn); 180 getWireFormat().initiateServerSideProtocol(); 181 } 182 else { 183 getWireFormat().registerTransportStreams(dataOut, dataIn); 184 thread.setPriority(Thread.NORM_PRIORITY + 2); 185 } 186 //enable caching on the wire format 187 currentWireFormat.setCachingEnabled(isCachingEnabled()); 188 thread.start(); 189 //send the wire format 190 if (!isServerSide()) { 191 getWireFormat().initiateClientSideProtocol(); 192 } 193 fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.CONNECTED)); 194 } 195 catch (EOFException e) { 196 doClose(e); 197 } 198 catch (IOException e) { 199 JMSException jmsEx = new JMSException("start failed: " + e.getMessage()); 200 jmsEx.initCause(e); 201 jmsEx.setLinkedException(e); 202 throw jmsEx; 203 } 204 } 205 } 206 207 protected void readWireFormat() throws JMSException, IOException { 208 WireFormat wf = wireFormatLoader.getWireFormat(dataIn); 209 if (wf != null) { 210 setWireFormat(wf); 211 } 212 } 213 214 /** 215 * close the channel 216 */ 217 public void stop() { 218 if (closed.commit(false, true)) { 219 super.stop(); 220 try { 221 if (executor != null) { 222 stopExecutor(executor); 223 } 224 closeStreams(); 225 socket.close(); 226 } 227 catch (Exception e) { 228 log.warn("Caught while closing: " + e + ". Now Closed", e); 229 } 230 } 231 closed.set(true); 232 if (this.serverChannel != null){ 233 this.serverChannel.removeClient(this); 234 } 235 } 236 237 public void forceDisconnect() { 238 log.debug("Forcing disconnect"); 239 if (socket != null && socket.isConnected()) { 240 try { 241 socket.close(); 242 } 243 catch (IOException e) { 244 // Ignore 245 } 246 } 247 } 248 249 /** 250 * Asynchronously send a Packet 251 * 252 * @param packet 253 * @throws JMSException 254 */ 255 public void asyncSend(final Packet packet) throws JMSException { 256 if (executor != null) { 257 try { 258 executor.execute(new Runnable() { 259 public void run() { 260 try { 261 if (!isClosed()) { 262 doAsyncSend(packet); 263 } 264 } 265 catch (JMSException e) { 266 try { 267 exceptionsList.put(e); 268 } 269 catch (InterruptedException e1) { 270 log.warn("Failed to add element to exception list: " + e1); 271 } 272 } 273 } 274 }); 275 } 276 catch (InterruptedException e) { 277 log.info("Caught: " + e, e); 278 } 279 try { 280 JMSException e = (JMSException) exceptionsList.poll(0); 281 if (e != null) { 282 throw e; 283 } 284 } 285 catch (InterruptedException e1) { 286 log.warn("Failed to remove element to exception list: " + e1); 287 } 288 } 289 else { 290 doAsyncSend(packet); 291 } 292 } 293 294 /** 295 * @return false 296 */ 297 public boolean isMulticast() { 298 return false; 299 } 300 301 /** 302 * reads packets from a Socket 303 */ 304 public void run() { 305 log.trace("TCP consumer thread starting"); 306 int count = 0; 307 while (!isClosed()) { 308 if (isServerSide() && ++count > 500) { 309 count = 0; 310 Thread.yield(); 311 } 312 try { 313 Packet packet = getWireFormat().readPacket(dataIn); 314 if (packet != null) { 315 doConsumePacket(packet); 316 } 317 } 318 catch (SocketTimeoutException e) { 319 //onAsyncException(JMSExceptionHelper.newJMSException(e)); 320 } 321 catch (InterruptedIOException e) { 322 // TODO confirm that this really is a bug in the AS/400 JVM 323 // Patch for AS/400 JVM 324 // lets ignore these exceptions 325 // as they typically just indicate the thread was interupted 326 // while waiting for input, not that the socket is in error 327 //onAsyncException(JMSExceptionHelper.newJMSException(e)); 328 } 329 catch (IOException e) { 330 doClose(e); 331 } 332 } 333 } 334 335 public boolean isClosed() { 336 return closed.get(); 337 } 338 339 /** 340 * pretty print for object 341 * 342 * @return String representation of this object 343 */ 344 public String toString() { 345 return "TcpTransportChannel: " + socket; 346 } 347 348 /** 349 * @return the socket used by the TcpTransportChannel 350 */ 351 public Socket getSocket() { 352 return socket; 353 } 354 355 /** 356 * Can this wireformat process packets of this version 357 * 358 * @param version the version number to test 359 * @return true if can accept the version 360 */ 361 public boolean canProcessWireFormatVersion(int version) { 362 return getWireFormat().canProcessWireFormatVersion(version); 363 } 364 365 /** 366 * @return the current version of this wire format 367 */ 368 public int getCurrentWireFormatVersion() { 369 return getWireFormat().getCurrentWireFormatVersion(); 370 } 371 372 // Properties 373 //------------------------------------------------------------------------- 374 375 /** 376 * @return true if packets are enqueued to a separate queue before dispatching 377 */ 378 public boolean isUseAsyncSend() { 379 return useAsyncSend; 380 } 381 382 /** 383 * set the useAsync flag 384 * 385 * @param useAsyncSend 386 */ 387 public void setUseAsyncSend(boolean useAsyncSend) { 388 this.useAsyncSend = useAsyncSend; 389 try { 390 if (useAsyncSend && executor==null ) { 391 PooledExecutor pe = new PooledExecutor(new BoundedBuffer(10), 1); 392 pe.waitWhenBlocked(); 393 pe.setKeepAliveTime(1000); 394 executor = pe; 395 } 396 else if (!useAsyncSend && executor != null) { 397 stopExecutor(executor); 398 } 399 } 400 catch (Exception e) { 401 log.warn("problem closing executor", e); 402 } 403 } 404 405 406 407 /** 408 * @return the current so timeout used on the socket 409 */ 410 public int getSoTimeout() { 411 return soTimeout; 412 } 413 414 /** 415 * set the socket so timeout 416 * 417 * @param soTimeout 418 * @throws JMSException 419 */ 420 public void setSoTimeout(int soTimeout) throws JMSException { 421 this.soTimeout = soTimeout; 422 if (this.socket != null){ 423 try { 424 socket.setSoTimeout(soTimeout); 425 } 426 catch (SocketException e) { 427 JMSException jmsEx = new JMSException("Failed to set soTimeout: ", e.getMessage()); 428 jmsEx.setLinkedException(e); 429 throw jmsEx; 430 } 431 } 432 } 433 434 /** 435 * @param noDelay The noDelay to set. 436 */ 437 public void setNoDelay(boolean noDelay) { 438 super.setNoDelay(noDelay); 439 if (socket != null){ 440 try { 441 socket.setTcpNoDelay(noDelay); 442 } 443 catch (SocketException e) { 444 log.warn("failed to set noDelay on the socket");//should never happen 445 } 446 } 447 } 448 449 /** 450 * @return Returns the socketBufferSize. 451 */ 452 public int getSocketBufferSize() { 453 return socketBufferSize; 454 } 455 /** 456 * @param socketBufferSize The socketBufferSize to set. 457 */ 458 public void setSocketBufferSize(int socketBufferSize) { 459 this.socketBufferSize = socketBufferSize; 460 } 461 // Implementation methods 462 //------------------------------------------------------------------------- 463 /** 464 * Actually performs the async send of a packet 465 * 466 * @param packet 467 * @return a response or null 468 * @throws JMSException 469 */ 470 protected Packet doAsyncSend(Packet packet) throws JMSException { 471 Packet response = null; 472 try { 473 synchronized (outboundLock) { 474 response = getWireFormat().writePacket(packet, dataOut); 475 dataOut.flush(); 476 } 477 } 478 catch (IOException e) { 479 // if (closed.get()) { 480 // log.trace("Caught exception while closed: " + e, e); 481 // } 482 // else { 483 JMSException exception = JMSExceptionHelper.newJMSException("asyncSend failed: " + e, e); 484 onAsyncException(exception); 485 throw exception; 486 // } 487 } 488 catch (JMSException e) { 489 if (isClosed()) { 490 log.trace("Caught exception while closed: " + e, e); 491 } 492 else { 493 throw e; 494 } 495 } 496 return response; 497 } 498 499 protected void doClose(Exception ex) { 500 if (!isClosed()) { 501 if (!pendingStop) { 502 setPendingStop(true); 503 setTransportConnected(false); 504 if (ex instanceof EOFException) { 505 if (!isServerSide() && !isUsedInternally()){ 506 log.warn("Peer closed connection", ex); 507 } 508 fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED)); 509 onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex)); 510 } 511 else { 512 fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED)); 513 onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex)); 514 } 515 } 516 stop(); 517 } 518 } 519 520 /** 521 * Configures the socket for use 522 * @param sock 523 * @throws SocketException 524 */ 525 protected void initialiseSocket(Socket sock) throws SocketException { 526 try { 527 sock.setReceiveBufferSize(socketBufferSize); 528 sock.setSendBufferSize(socketBufferSize); 529 } 530 catch (SocketException se) { 531 log.debug("Cannot set socket buffer size = " + socketBufferSize, se); 532 } 533 sock.setSoTimeout(soTimeout); 534 sock.setTcpNoDelay(isNoDelay()); 535 } 536 537 protected void initializeStreams() throws IOException{ 538 BufferedInputStream buffIn = new BufferedInputStream(socket.getInputStream(),8192); 539 this.dataIn = new DataInputStream(buffIn); 540 TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(),8192); 541 this.dataOut = new DataOutputStream(buffOut); 542 } 543 544 protected void closeStreams() throws IOException { 545 if (dataOut != null) { 546 dataOut.close(); 547 } 548 if (dataIn != null) { 549 dataIn.close(); 550 } 551 } 552 553 /** 554 * Factory method to create a new socket 555 * 556 * @param remoteLocation the URI to connect to 557 * @return the newly created socket 558 * @throws UnknownHostException 559 * @throws IOException 560 */ 561 protected Socket createSocket(URI remoteLocation) throws UnknownHostException, IOException { 562 SocketAddress sockAddress = new InetSocketAddress(remoteLocation.getHost(), remoteLocation.getPort()); 563 Socket sock = new Socket(); 564 initialiseSocket(sock); 565 sock.connect(sockAddress); 566 return sock; 567 } 568 569 /** 570 * Factory method to create a new socket 571 * 572 * @param remoteLocation 573 * @param localLocation 574 * @return @throws IOException 575 * @throws IOException 576 * @throws UnknownHostException 577 */ 578 protected Socket createSocket(URI remoteLocation, URI localLocation) throws IOException, UnknownHostException { 579 SocketAddress sockAddress = new InetSocketAddress(remoteLocation.getHost(), remoteLocation.getPort()); 580 SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort()); 581 Socket sock = new Socket(); 582 initialiseSocket(sock); 583 sock.bind(localAddress); 584 sock.connect(sockAddress); 585 return sock; 586 } 587 588 }