001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.fanout; 018 019 import java.io.IOException; 020 import java.io.InterruptedIOException; 021 import java.net.URI; 022 import java.util.ArrayList; 023 import java.util.Iterator; 024 import java.util.concurrent.ConcurrentHashMap; 025 import java.util.concurrent.atomic.AtomicInteger; 026 027 import org.apache.activemq.command.Command; 028 import org.apache.activemq.command.ConsumerInfo; 029 import org.apache.activemq.command.Message; 030 import org.apache.activemq.command.Response; 031 import org.apache.activemq.state.ConnectionStateTracker; 032 import org.apache.activemq.thread.DefaultThreadPools; 033 import org.apache.activemq.thread.Task; 034 import org.apache.activemq.thread.TaskRunner; 035 import org.apache.activemq.transport.CompositeTransport; 036 import org.apache.activemq.transport.DefaultTransportListener; 037 import org.apache.activemq.transport.FutureResponse; 038 import org.apache.activemq.transport.ResponseCallback; 039 import org.apache.activemq.transport.Transport; 040 import org.apache.activemq.transport.TransportFactory; 041 import org.apache.activemq.transport.TransportListener; 042 import org.apache.activemq.util.IOExceptionSupport; 043 import org.apache.activemq.util.ServiceStopper; 044 import org.apache.activemq.util.ServiceSupport; 045 import org.apache.commons.logging.Log; 046 import org.apache.commons.logging.LogFactory; 047 048 /** 049 * A Transport that fans out a connection to multiple brokers. 050 * 051 * @version $Revision$ 052 */ 053 public class FanoutTransport implements CompositeTransport { 054 055 private static final Log LOG = LogFactory.getLog(FanoutTransport.class); 056 057 private TransportListener transportListener; 058 private boolean disposed; 059 private boolean connected; 060 061 private final Object reconnectMutex = new Object(); 062 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 063 private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>(); 064 065 private final TaskRunner reconnectTask; 066 private boolean started; 067 068 private ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>(); 069 private int connectedCount; 070 071 private int minAckCount = 2; 072 073 private long initialReconnectDelay = 10; 074 private long maxReconnectDelay = 1000 * 30; 075 private long backOffMultiplier = 2; 076 private boolean useExponentialBackOff = true; 077 private int maxReconnectAttempts; 078 private Exception connectionFailure; 079 private FanoutTransportHandler primary; 080 private boolean fanOutQueues = false; 081 082 static class RequestCounter { 083 084 final Command command; 085 final AtomicInteger ackCount; 086 087 RequestCounter(Command command, int count) { 088 this.command = command; 089 this.ackCount = new AtomicInteger(count); 090 } 091 092 public String toString() { 093 return command.getCommandId() + "=" + ackCount.get(); 094 } 095 } 096 097 class FanoutTransportHandler extends DefaultTransportListener { 098 099 private final URI uri; 100 private Transport transport; 101 102 private int connectFailures; 103 private long reconnectDelay = initialReconnectDelay; 104 private long reconnectDate; 105 106 public FanoutTransportHandler(URI uri) { 107 this.uri = uri; 108 } 109 110 public void onCommand(Object o) { 111 Command command = (Command)o; 112 if (command.isResponse()) { 113 Integer id = new Integer(((Response)command).getCorrelationId()); 114 RequestCounter rc = requestMap.get(id); 115 if (rc != null) { 116 if (rc.ackCount.decrementAndGet() <= 0) { 117 requestMap.remove(id); 118 transportListenerOnCommand(command); 119 } 120 } else { 121 transportListenerOnCommand(command); 122 } 123 } else { 124 transportListenerOnCommand(command); 125 } 126 } 127 128 public void onException(IOException error) { 129 try { 130 synchronized (reconnectMutex) { 131 if (transport == null) { 132 return; 133 } 134 135 LOG.debug("Transport failed, starting up reconnect task", error); 136 137 ServiceSupport.dispose(transport); 138 transport = null; 139 connectedCount--; 140 if (primary == this) { 141 primary = null; 142 } 143 reconnectTask.wakeup(); 144 } 145 } catch (InterruptedException e) { 146 Thread.currentThread().interrupt(); 147 if (transportListener != null) { 148 transportListener.onException(new InterruptedIOException()); 149 } 150 } 151 } 152 } 153 154 public FanoutTransport() throws InterruptedIOException { 155 // Setup a task that is used to reconnect the a connection async. 156 reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { 157 public boolean iterate() { 158 return doConnect(); 159 } 160 }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this)); 161 } 162 163 /** 164 * @return 165 */ 166 private boolean doConnect() { 167 long closestReconnectDate = 0; 168 synchronized (reconnectMutex) { 169 170 if (disposed || connectionFailure != null) { 171 reconnectMutex.notifyAll(); 172 } 173 174 if (transports.size() == connectedCount || disposed || connectionFailure != null) { 175 return false; 176 } else { 177 178 if (transports.isEmpty()) { 179 // connectionFailure = new IOException("No uris available to 180 // connect to."); 181 } else { 182 183 // Try to connect them up. 184 Iterator<FanoutTransportHandler> iter = transports.iterator(); 185 for (int i = 0; iter.hasNext() && !disposed; i++) { 186 187 long now = System.currentTimeMillis(); 188 189 FanoutTransportHandler fanoutHandler = iter.next(); 190 if (fanoutHandler.transport != null) { 191 continue; 192 } 193 194 // Are we waiting a little to try to reconnect this one? 195 if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) { 196 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) { 197 closestReconnectDate = fanoutHandler.reconnectDate; 198 } 199 continue; 200 } 201 202 URI uri = fanoutHandler.uri; 203 try { 204 LOG.debug("Stopped: " + this); 205 LOG.debug("Attempting connect to: " + uri); 206 Transport t = TransportFactory.compositeConnect(uri); 207 fanoutHandler.transport = t; 208 t.setTransportListener(fanoutHandler); 209 if (started) { 210 restoreTransport(fanoutHandler); 211 } 212 LOG.debug("Connection established"); 213 fanoutHandler.reconnectDelay = initialReconnectDelay; 214 fanoutHandler.connectFailures = 0; 215 if (primary == null) { 216 primary = fanoutHandler; 217 } 218 connectedCount++; 219 } catch (Exception e) { 220 LOG.debug("Connect fail to: " + uri + ", reason: " + e); 221 222 if( fanoutHandler.transport !=null ) { 223 ServiceSupport.dispose(fanoutHandler.transport); 224 fanoutHandler.transport=null; 225 } 226 227 if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) { 228 LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)"); 229 connectionFailure = e; 230 reconnectMutex.notifyAll(); 231 return false; 232 } else { 233 234 if (useExponentialBackOff) { 235 // Exponential increment of reconnect delay. 236 fanoutHandler.reconnectDelay *= backOffMultiplier; 237 if (fanoutHandler.reconnectDelay > maxReconnectDelay) { 238 fanoutHandler.reconnectDelay = maxReconnectDelay; 239 } 240 } 241 242 fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay; 243 244 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) { 245 closestReconnectDate = fanoutHandler.reconnectDate; 246 } 247 } 248 } 249 } 250 if (transports.size() == connectedCount || disposed) { 251 reconnectMutex.notifyAll(); 252 return false; 253 } 254 255 } 256 } 257 258 } 259 260 try { 261 long reconnectDelay = closestReconnectDate - System.currentTimeMillis(); 262 if (reconnectDelay > 0) { 263 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); 264 Thread.sleep(reconnectDelay); 265 } 266 } catch (InterruptedException e1) { 267 Thread.currentThread().interrupt(); 268 } 269 return true; 270 } 271 272 public void start() throws Exception { 273 synchronized (reconnectMutex) { 274 LOG.debug("Started."); 275 if (started) { 276 return; 277 } 278 started = true; 279 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 280 FanoutTransportHandler th = iter.next(); 281 if (th.transport != null) { 282 restoreTransport(th); 283 } 284 } 285 connected=true; 286 } 287 } 288 289 public void stop() throws Exception { 290 synchronized (reconnectMutex) { 291 ServiceStopper ss = new ServiceStopper(); 292 293 if (!started) { 294 return; 295 } 296 started = false; 297 disposed = true; 298 connected=false; 299 300 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 301 FanoutTransportHandler th = iter.next(); 302 if (th.transport != null) { 303 ss.stop(th.transport); 304 } 305 } 306 307 LOG.debug("Stopped: " + this); 308 ss.throwFirstException(); 309 } 310 reconnectTask.shutdown(); 311 } 312 313 public int getMinAckCount() { 314 return minAckCount; 315 } 316 317 public void setMinAckCount(int minAckCount) { 318 this.minAckCount = minAckCount; 319 } 320 321 public long getInitialReconnectDelay() { 322 return initialReconnectDelay; 323 } 324 325 public void setInitialReconnectDelay(long initialReconnectDelay) { 326 this.initialReconnectDelay = initialReconnectDelay; 327 } 328 329 public long getMaxReconnectDelay() { 330 return maxReconnectDelay; 331 } 332 333 public void setMaxReconnectDelay(long maxReconnectDelay) { 334 this.maxReconnectDelay = maxReconnectDelay; 335 } 336 337 public long getReconnectDelayExponent() { 338 return backOffMultiplier; 339 } 340 341 public void setReconnectDelayExponent(long reconnectDelayExponent) { 342 this.backOffMultiplier = reconnectDelayExponent; 343 } 344 345 public int getMaxReconnectAttempts() { 346 return maxReconnectAttempts; 347 } 348 349 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 350 this.maxReconnectAttempts = maxReconnectAttempts; 351 } 352 353 public void oneway(Object o) throws IOException { 354 final Command command = (Command)o; 355 try { 356 synchronized (reconnectMutex) { 357 358 // Wait for transport to be connected. 359 while (connectedCount < minAckCount && !disposed && connectionFailure == null) { 360 LOG.debug("Waiting for at least " + minAckCount + " transports to be connected."); 361 reconnectMutex.wait(1000); 362 } 363 364 // Still not fully connected. 365 if (connectedCount < minAckCount) { 366 367 Exception error; 368 369 // Throw the right kind of error.. 370 if (disposed) { 371 error = new IOException("Transport disposed."); 372 } else if (connectionFailure != null) { 373 error = connectionFailure; 374 } else { 375 error = new IOException("Unexpected failure."); 376 } 377 378 if (error instanceof IOException) { 379 throw (IOException)error; 380 } 381 throw IOExceptionSupport.create(error); 382 } 383 384 // If it was a request and it was not being tracked by 385 // the state tracker, 386 // then hold it in the requestMap so that we can replay 387 // it later. 388 boolean fanout = isFanoutCommand(command); 389 if (stateTracker.track(command) == null && command.isResponseRequired()) { 390 int size = fanout ? minAckCount : 1; 391 requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size)); 392 } 393 394 // Send the message. 395 if (fanout) { 396 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 397 FanoutTransportHandler th = iter.next(); 398 if (th.transport != null) { 399 try { 400 th.transport.oneway(command); 401 } catch (IOException e) { 402 LOG.debug("Send attempt: failed."); 403 th.onException(e); 404 } 405 } 406 } 407 } else { 408 try { 409 primary.transport.oneway(command); 410 } catch (IOException e) { 411 LOG.debug("Send attempt: failed."); 412 primary.onException(e); 413 } 414 } 415 416 } 417 } catch (InterruptedException e) { 418 // Some one may be trying to stop our thread. 419 Thread.currentThread().interrupt(); 420 throw new InterruptedIOException(); 421 } 422 } 423 424 /** 425 * @param command 426 * @return 427 */ 428 private boolean isFanoutCommand(Command command) { 429 if (command.isMessage()) { 430 if( fanOutQueues ) { 431 return true; 432 } 433 return ((Message)command).getDestination().isTopic(); 434 } 435 if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) { 436 return false; 437 } 438 return true; 439 } 440 441 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 442 throw new AssertionError("Unsupported Method"); 443 } 444 445 public Object request(Object command) throws IOException { 446 throw new AssertionError("Unsupported Method"); 447 } 448 449 public Object request(Object command, int timeout) throws IOException { 450 throw new AssertionError("Unsupported Method"); 451 } 452 453 public void reconnect() { 454 LOG.debug("Waking up reconnect task"); 455 try { 456 reconnectTask.wakeup(); 457 } catch (InterruptedException e) { 458 Thread.currentThread().interrupt(); 459 } 460 } 461 462 public TransportListener getTransportListener() { 463 return transportListener; 464 } 465 466 public void setTransportListener(TransportListener commandListener) { 467 this.transportListener = commandListener; 468 } 469 470 public <T> T narrow(Class<T> target) { 471 472 if (target.isAssignableFrom(getClass())) { 473 return target.cast(this); 474 } 475 476 synchronized (reconnectMutex) { 477 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 478 FanoutTransportHandler th = iter.next(); 479 if (th.transport != null) { 480 T rc = th.transport.narrow(target); 481 if (rc != null) { 482 return rc; 483 } 484 } 485 } 486 } 487 488 return null; 489 490 } 491 492 protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException { 493 th.transport.start(); 494 stateTracker.setRestoreConsumers(th.transport == primary); 495 stateTracker.restore(th.transport); 496 for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) { 497 RequestCounter rc = iter2.next(); 498 th.transport.oneway(rc.command); 499 } 500 } 501 502 public void add(URI uris[]) { 503 504 synchronized (reconnectMutex) { 505 for (int i = 0; i < uris.length; i++) { 506 URI uri = uris[i]; 507 508 boolean match = false; 509 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 510 FanoutTransportHandler th = iter.next(); 511 if (th.uri.equals(uri)) { 512 match = true; 513 break; 514 } 515 } 516 if (!match) { 517 FanoutTransportHandler th = new FanoutTransportHandler(uri); 518 transports.add(th); 519 reconnect(); 520 } 521 } 522 } 523 524 } 525 526 public void remove(URI uris[]) { 527 528 synchronized (reconnectMutex) { 529 for (int i = 0; i < uris.length; i++) { 530 URI uri = uris[i]; 531 532 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 533 FanoutTransportHandler th = iter.next(); 534 if (th.uri.equals(uri)) { 535 if (th.transport != null) { 536 ServiceSupport.dispose(th.transport); 537 connectedCount--; 538 } 539 iter.remove(); 540 break; 541 } 542 } 543 } 544 } 545 546 } 547 548 public void reconnect(URI uri) throws IOException { 549 add(new URI[]{uri}); 550 551 } 552 553 554 public String getRemoteAddress() { 555 if (primary != null) { 556 if (primary.transport != null) { 557 return primary.transport.getRemoteAddress(); 558 } 559 } 560 return null; 561 } 562 563 protected void transportListenerOnCommand(Command command) { 564 if (transportListener != null) { 565 transportListener.onCommand(command); 566 } 567 } 568 569 public boolean isFaultTolerant() { 570 return true; 571 } 572 573 public boolean isFanOutQueues() { 574 return fanOutQueues; 575 } 576 577 public void setFanOutQueues(boolean fanOutQueues) { 578 this.fanOutQueues = fanOutQueues; 579 } 580 581 public boolean isDisposed() { 582 return disposed; 583 } 584 585 586 public boolean isConnected() { 587 return connected; 588 } 589 590 public int getReceiveCounter() { 591 int rc = 0; 592 synchronized (reconnectMutex) { 593 for (FanoutTransportHandler th : transports) { 594 if (th.transport != null) { 595 rc += th.transport.getReceiveCounter(); 596 } 597 } 598 } 599 return rc; 600 } 601 }