001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.udp; 018 019 import java.io.EOFException; 020 import java.io.IOException; 021 import java.net.BindException; 022 import java.net.DatagramSocket; 023 import java.net.InetAddress; 024 import java.net.InetSocketAddress; 025 import java.net.SocketAddress; 026 import java.net.SocketException; 027 import java.net.URI; 028 import java.net.UnknownHostException; 029 import java.nio.channels.AsynchronousCloseException; 030 import java.nio.channels.DatagramChannel; 031 032 import org.apache.activemq.Service; 033 import org.apache.activemq.command.Command; 034 import org.apache.activemq.command.Endpoint; 035 import org.apache.activemq.openwire.OpenWireFormat; 036 import org.apache.activemq.transport.Transport; 037 import org.apache.activemq.transport.TransportThreadSupport; 038 import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy; 039 import org.apache.activemq.transport.reliable.ReplayBuffer; 040 import org.apache.activemq.transport.reliable.ReplayStrategy; 041 import org.apache.activemq.transport.reliable.Replayer; 042 import org.apache.activemq.util.IntSequenceGenerator; 043 import org.apache.activemq.util.ServiceStopper; 044 import org.apache.commons.logging.Log; 045 import org.apache.commons.logging.LogFactory; 046 047 /** 048 * An implementation of the {@link Transport} interface using raw UDP 049 * 050 * @version $Revision: 891816 $ 051 */ 052 public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable { 053 private static final Log LOG = LogFactory.getLog(UdpTransport.class); 054 055 private static final int MAX_BIND_ATTEMPTS = 50; 056 private static final long BIND_ATTEMPT_DELAY = 100; 057 058 private CommandChannel commandChannel; 059 private OpenWireFormat wireFormat; 060 private ByteBufferPool bufferPool; 061 private ReplayStrategy replayStrategy = new ExceptionIfDroppedReplayStrategy(); 062 private ReplayBuffer replayBuffer; 063 private int datagramSize = 4 * 1024; 064 private SocketAddress targetAddress; 065 private SocketAddress originalTargetAddress; 066 private DatagramChannel channel; 067 private boolean trace; 068 private boolean useLocalHost = true; 069 private int port; 070 private int minmumWireFormatVersion; 071 private String description; 072 private IntSequenceGenerator sequenceGenerator; 073 private boolean replayEnabled = true; 074 075 protected UdpTransport(OpenWireFormat wireFormat) throws IOException { 076 this.wireFormat = wireFormat; 077 } 078 079 public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException { 080 this(wireFormat); 081 this.targetAddress = createAddress(remoteLocation); 082 description = remoteLocation.toString() + "@"; 083 } 084 085 public UdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException { 086 this(wireFormat); 087 this.targetAddress = socketAddress; 088 this.description = getProtocolName() + "ServerConnection@"; 089 } 090 091 /** 092 * Used by the server transport 093 */ 094 public UdpTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException { 095 this(wireFormat); 096 this.port = port; 097 this.targetAddress = null; 098 this.description = getProtocolName() + "Server@"; 099 } 100 101 /** 102 * Creates a replayer for working with the reliable transport 103 */ 104 public Replayer createReplayer() throws IOException { 105 if (replayEnabled) { 106 return getCommandChannel(); 107 } 108 return null; 109 } 110 111 /** 112 * A one way asynchronous send 113 */ 114 public void oneway(Object command) throws IOException { 115 oneway(command, targetAddress); 116 } 117 118 /** 119 * A one way asynchronous send to a given address 120 */ 121 public void oneway(Object command, SocketAddress address) throws IOException { 122 if (LOG.isDebugEnabled()) { 123 LOG.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command); 124 } 125 checkStarted(); 126 commandChannel.write((Command)command, address); 127 } 128 129 /** 130 * @return pretty print of 'this' 131 */ 132 public String toString() { 133 if (description != null) { 134 return description + port; 135 } else { 136 return getProtocolUriScheme() + targetAddress + "@" + port; 137 } 138 } 139 140 /** 141 * reads packets from a Socket 142 */ 143 public void run() { 144 LOG.trace("Consumer thread starting for: " + toString()); 145 while (!isStopped()) { 146 try { 147 Command command = commandChannel.read(); 148 doConsume(command); 149 } catch (AsynchronousCloseException e) { 150 // DatagramChannel closed 151 try { 152 stop(); 153 } catch (Exception e2) { 154 LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2); 155 } 156 } catch (SocketException e) { 157 // DatagramSocket closed 158 LOG.debug("Socket closed: " + e, e); 159 try { 160 stop(); 161 } catch (Exception e2) { 162 LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2); 163 } 164 } catch (EOFException e) { 165 // DataInputStream closed 166 LOG.debug("Socket closed: " + e, e); 167 try { 168 stop(); 169 } catch (Exception e2) { 170 LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2); 171 } 172 } catch (Exception e) { 173 try { 174 stop(); 175 } catch (Exception e2) { 176 LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2); 177 } 178 if (e instanceof IOException) { 179 onException((IOException)e); 180 } else { 181 LOG.error("Caught: " + e, e); 182 e.printStackTrace(); 183 } 184 } 185 } 186 } 187 188 /** 189 * We have received the WireFormatInfo from the server on the actual channel 190 * we should use for all future communication with the server, so lets set 191 * the target to be the actual channel that the server has chosen for us to 192 * talk on. 193 */ 194 public void setTargetEndpoint(Endpoint newTarget) { 195 if (newTarget instanceof DatagramEndpoint) { 196 DatagramEndpoint endpoint = (DatagramEndpoint)newTarget; 197 SocketAddress address = endpoint.getAddress(); 198 if (address != null) { 199 if (originalTargetAddress == null) { 200 originalTargetAddress = targetAddress; 201 } 202 targetAddress = address; 203 commandChannel.setTargetAddress(address); 204 } 205 } 206 } 207 208 // Properties 209 // ------------------------------------------------------------------------- 210 public boolean isTrace() { 211 return trace; 212 } 213 214 public void setTrace(boolean trace) { 215 this.trace = trace; 216 } 217 218 public int getDatagramSize() { 219 return datagramSize; 220 } 221 222 public void setDatagramSize(int datagramSize) { 223 this.datagramSize = datagramSize; 224 } 225 226 public boolean isUseLocalHost() { 227 return useLocalHost; 228 } 229 230 /** 231 * Sets whether 'localhost' or the actual local host name should be used to 232 * make local connections. On some operating systems such as Macs its not 233 * possible to connect as the local host name so localhost is better. 234 */ 235 public void setUseLocalHost(boolean useLocalHost) { 236 this.useLocalHost = useLocalHost; 237 } 238 239 public CommandChannel getCommandChannel() throws IOException { 240 if (commandChannel == null) { 241 commandChannel = createCommandChannel(); 242 } 243 return commandChannel; 244 } 245 246 /** 247 * Sets the implementation of the command channel to use. 248 */ 249 public void setCommandChannel(CommandDatagramChannel commandChannel) { 250 this.commandChannel = commandChannel; 251 } 252 253 public ReplayStrategy getReplayStrategy() { 254 return replayStrategy; 255 } 256 257 /** 258 * Sets the strategy used to replay missed datagrams 259 */ 260 public void setReplayStrategy(ReplayStrategy replayStrategy) { 261 this.replayStrategy = replayStrategy; 262 } 263 264 public int getPort() { 265 return port; 266 } 267 268 /** 269 * Sets the port to connect on 270 */ 271 public void setPort(int port) { 272 this.port = port; 273 } 274 275 public int getMinmumWireFormatVersion() { 276 return minmumWireFormatVersion; 277 } 278 279 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 280 this.minmumWireFormatVersion = minmumWireFormatVersion; 281 } 282 283 public OpenWireFormat getWireFormat() { 284 return wireFormat; 285 } 286 287 public IntSequenceGenerator getSequenceGenerator() { 288 if (sequenceGenerator == null) { 289 sequenceGenerator = new IntSequenceGenerator(); 290 } 291 return sequenceGenerator; 292 } 293 294 public void setSequenceGenerator(IntSequenceGenerator sequenceGenerator) { 295 this.sequenceGenerator = sequenceGenerator; 296 } 297 298 public boolean isReplayEnabled() { 299 return replayEnabled; 300 } 301 302 /** 303 * Sets whether or not replay should be enabled when using the reliable 304 * transport. i.e. should we maintain a buffer of messages that can be 305 * replayed? 306 */ 307 public void setReplayEnabled(boolean replayEnabled) { 308 this.replayEnabled = replayEnabled; 309 } 310 311 public ByteBufferPool getBufferPool() { 312 if (bufferPool == null) { 313 bufferPool = new DefaultBufferPool(); 314 } 315 return bufferPool; 316 } 317 318 public void setBufferPool(ByteBufferPool bufferPool) { 319 this.bufferPool = bufferPool; 320 } 321 322 public ReplayBuffer getReplayBuffer() { 323 return replayBuffer; 324 } 325 326 public void setReplayBuffer(ReplayBuffer replayBuffer) throws IOException { 327 this.replayBuffer = replayBuffer; 328 getCommandChannel().setReplayBuffer(replayBuffer); 329 } 330 331 // Implementation methods 332 // ------------------------------------------------------------------------- 333 334 /** 335 * Creates an address from the given URI 336 */ 337 protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException { 338 String host = resolveHostName(remoteLocation.getHost()); 339 return new InetSocketAddress(host, remoteLocation.getPort()); 340 } 341 342 protected String resolveHostName(String host) throws UnknownHostException { 343 String localName = InetAddress.getLocalHost().getHostName(); 344 if (localName != null && isUseLocalHost()) { 345 if (localName.equals(host)) { 346 return "localhost"; 347 } 348 } 349 return host; 350 } 351 352 protected void doStart() throws Exception { 353 getCommandChannel().start(); 354 355 super.doStart(); 356 } 357 358 protected CommandChannel createCommandChannel() throws IOException { 359 SocketAddress localAddress = createLocalAddress(); 360 channel = DatagramChannel.open(); 361 362 channel = connect(channel, targetAddress); 363 364 DatagramSocket socket = channel.socket(); 365 bind(socket, localAddress); 366 if (port == 0) { 367 port = socket.getLocalPort(); 368 } 369 370 return createCommandDatagramChannel(); 371 } 372 373 protected CommandChannel createCommandDatagramChannel() { 374 return new CommandDatagramChannel(this, getWireFormat(), getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(), getChannel(), getBufferPool()); 375 } 376 377 protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException { 378 channel.configureBlocking(true); 379 380 if (LOG.isDebugEnabled()) { 381 LOG.debug("Binding to address: " + localAddress); 382 } 383 384 // 385 // We have noticed that on some platfoms like linux, after you close 386 // down 387 // a previously bound socket, it can take a little while before we can 388 // bind it again. 389 // 390 for (int i = 0; i < MAX_BIND_ATTEMPTS; i++) { 391 try { 392 socket.bind(localAddress); 393 return; 394 } catch (BindException e) { 395 if (i + 1 == MAX_BIND_ATTEMPTS) { 396 throw e; 397 } 398 try { 399 Thread.sleep(BIND_ATTEMPT_DELAY); 400 } catch (InterruptedException e1) { 401 Thread.currentThread().interrupt(); 402 throw e; 403 } 404 } 405 } 406 407 } 408 409 protected DatagramChannel connect(DatagramChannel channel, SocketAddress targetAddress2) throws IOException { 410 // TODO 411 // connect to default target address to avoid security checks each time 412 // channel = channel.connect(targetAddress); 413 414 return channel; 415 } 416 417 protected SocketAddress createLocalAddress() { 418 return new InetSocketAddress(port); 419 } 420 421 protected void doStop(ServiceStopper stopper) throws Exception { 422 if (channel != null) { 423 channel.close(); 424 } 425 } 426 427 protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() { 428 return new DatagramHeaderMarshaller(); 429 } 430 431 protected String getProtocolName() { 432 return "Udp"; 433 } 434 435 protected String getProtocolUriScheme() { 436 return "udp://"; 437 } 438 439 protected SocketAddress getTargetAddress() { 440 return targetAddress; 441 } 442 443 protected DatagramChannel getChannel() { 444 return channel; 445 } 446 447 protected void setChannel(DatagramChannel channel) { 448 this.channel = channel; 449 } 450 451 public InetSocketAddress getLocalSocketAddress() { 452 if (channel == null) { 453 return null; 454 } else { 455 return (InetSocketAddress)channel.socket().getLocalSocketAddress(); 456 } 457 } 458 459 public String getRemoteAddress() { 460 if (targetAddress != null) { 461 return "" + targetAddress; 462 } 463 return null; 464 } 465 466 public int getReceiveCounter() { 467 if (commandChannel == null) { 468 return 0; 469 } 470 return commandChannel.getReceiveCounter(); 471 } 472 }