001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport; 020 import java.util.Iterator; 021 import java.util.Map; 022 import javax.jms.JMSException; 023 import javax.jms.Session; 024 import org.apache.commons.logging.Log; 025 import org.apache.commons.logging.LogFactory; 026 import org.activemq.ActiveMQConnection; 027 import org.activemq.ActiveMQConnectionFactory; 028 import org.activemq.ActiveMQPrefetchPolicy; 029 import org.activemq.advisories.ConnectionAdvisor; 030 import org.activemq.advisories.ConnectionAdvisoryEvent; 031 import org.activemq.advisories.ConnectionAdvisoryEventListener; 032 import org.activemq.broker.BrokerClient; 033 import org.activemq.broker.BrokerContainer; 034 import org.activemq.broker.ConsumerInfoListener; 035 import org.activemq.message.ActiveMQDestination; 036 import org.activemq.message.BrokerInfo; 037 import org.activemq.message.ConsumerInfo; 038 import org.activemq.message.Receipt; 039 import org.activemq.service.MessageContainerManager; 040 import org.activemq.service.Service; 041 import org.activemq.transport.composite.CompositeTransportChannel; 042 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 043 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 044 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 045 046 /** 047 * Represents a broker's connection with a single remote broker which bridges the two brokers to form a network. <p/> 048 * The NetworkChannel contains a JMS connection with the remote broker. <p/>New subscriptions on the local broker are 049 * multiplexed into the JMS connection so that messages published on the remote broker can be replayed onto the local 050 * broker. 051 * 052 * @version $Revision: 1.1.1.1 $ 053 */ 054 public class NetworkChannel 055 implements 056 Service, 057 ConsumerInfoListener, 058 ConnectionAdvisoryEventListener, 059 TransportStatusEventListener { 060 private static final Log log = LogFactory.getLog(NetworkChannel.class); 061 protected String uri; 062 protected BrokerContainer brokerContainer; 063 protected ActiveMQConnection localConnection; 064 protected ActiveMQConnection remoteConnection; 065 protected ConcurrentHashMap topicConsumerMap; 066 protected ConcurrentHashMap queueConsumerMap; 067 protected String remoteUserName; 068 protected String remotePassword; 069 protected String remoteBrokerName; 070 protected String remoteClusterName; 071 protected int maximumRetries = 0; 072 protected long reconnectSleepTime = 2000L; 073 protected PooledExecutor threadPool; 074 private boolean remote = false; 075 private SynchronizedBoolean started = new SynchronizedBoolean(false); 076 private SynchronizedBoolean connected = new SynchronizedBoolean(false); 077 private SynchronizedBoolean stopped = new SynchronizedBoolean(false); 078 private ConnectionAdvisor connectionAdvisor; 079 private ActiveMQPrefetchPolicy localPrefetchPolicy; 080 private ActiveMQPrefetchPolicy remotePrefetchPolicy; 081 private boolean demandBasedForwarding = true; 082 083 /** 084 * Default constructor 085 */ 086 public NetworkChannel() { 087 this.topicConsumerMap = new ConcurrentHashMap(); 088 this.queueConsumerMap = new ConcurrentHashMap(); 089 } 090 091 /** 092 * Default Constructor 093 * 094 * @param tp 095 */ 096 public NetworkChannel(PooledExecutor tp) { 097 this(); 098 this.threadPool = tp; 099 } 100 101 /** 102 * Constructor 103 * 104 * @param connector 105 * @param brokerContainer 106 * @param uri 107 */ 108 public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, String uri) { 109 this(connector.threadPool); 110 this.brokerContainer = brokerContainer; 111 this.uri = uri; 112 } 113 114 /** 115 * Create a NetworkConnector from a TransportChannel 116 * 117 * @param connector 118 * @param brokerContainer 119 * @param channel 120 * @param remoteBrokerName 121 * @param remoteclusterName 122 * @throws JMSException 123 */ 124 public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, TransportChannel channel, 125 String remoteBrokerName, String remoteclusterName) throws JMSException { 126 this(connector.threadPool); 127 this.brokerContainer = brokerContainer; 128 this.uri = ""; 129 this.remoteBrokerName = remoteBrokerName; 130 this.remoteClusterName = remoteclusterName; 131 ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(); 132 fac.setJ2EEcompliant(false); 133 fac.setTurboBoost(true); 134 remoteConnection = new ActiveMQConnection(fac, remoteUserName, remotePassword, channel); 135 remoteConnection.setClientID("Boondocks:" + remoteClusterName + ":" + remoteBrokerName); 136 remoteConnection.setQuickClose(true); 137 remoteConnection.start(); 138 BrokerInfo info = new BrokerInfo(); 139 info.setBrokerName(brokerContainer.getBroker().getBrokerName()); 140 info.setClusterName(brokerContainer.getBroker().getBrokerClusterName()); 141 channel.asyncSend(info); 142 remote = true; 143 } 144 145 /** 146 * @see org.activemq.transport.TransportStatusEventListener#statusChanged(org.activemq.transport.TransportStatusEvent) 147 */ 148 public void statusChanged(TransportStatusEvent event) { 149 if (event != null 150 && (event.getChannelStatus() == TransportStatusEvent.CONNECTED 151 || event.getChannelStatus() == TransportStatusEvent.RECONNECTED)) { 152 connected.set(true); 153 }else { 154 connected.set(false); 155 } 156 } 157 158 private void doSetConnected() { 159 synchronized (connected) { 160 connected.set(true); 161 connected.notifyAll(); 162 } 163 } 164 165 /** 166 * @return text info on this 167 */ 168 public String toString() { 169 return "NetworkChannel{ " + ", uri = '" + uri + "' " + ", remoteBrokerName = '" + remoteBrokerName + "' " 170 + " }"; 171 } 172 173 /** 174 * Start the channel 175 */ 176 public void start() { 177 if (started.commit(false, true)) { 178 try { 179 stopped.set(false); 180 threadPool.execute(new Runnable() { 181 public void run() { 182 String originalName = Thread.currentThread().getName(); 183 try { 184 Thread.currentThread().setName("NetworkChannel Initiator to " + uri); 185 initialize(); 186 startSubscriptions(); 187 log.info("Started NetworkChannel to " + uri); 188 } 189 catch (JMSException jmsEx) { 190 log.error("Failed to start NetworkChannel: " + uri, jmsEx); 191 } 192 finally { 193 Thread.currentThread().setName(originalName); 194 } 195 } 196 }); 197 } 198 catch (InterruptedException e) { 199 log.warn("Failed to start - interuppted", e); 200 } 201 } 202 } 203 204 /** 205 * stop the channel 206 * 207 * @throws JMSException on error 208 */ 209 public void stop() throws JMSException { 210 if (started.commit(true, false)) { 211 stopped.set(true); 212 topicConsumerMap.clear(); 213 if (remoteConnection != null) { 214 remoteConnection.close(); 215 remoteConnection = null; 216 } 217 if (localConnection != null) { 218 localConnection.close(); 219 localConnection = null; 220 } 221 for (Iterator i = topicConsumerMap.values().iterator();i.hasNext();) { 222 NetworkMessageBridge consumer = (NetworkMessageBridge) i.next(); 223 consumer.stop(); 224 } 225 } 226 } 227 228 /** 229 * Listen for new Consumer events at this broker 230 * 231 * @param client 232 * @param info 233 */ 234 public void onConsumerInfo(final BrokerClient client, final ConsumerInfo info) { 235 String brokerName = client.getBrokerConnector().getBrokerInfo().getBrokerName(); 236 if (!client.isClusteredConnection()) { 237 if (connected.get()) { 238 if (!info.hasVisited(remoteBrokerName)) { 239 if (info.isStarted()) { 240 addConsumerInfo(info); 241 } 242 else { 243 removeConsumerInfo(info); 244 } 245 } 246 } 247 else { 248 try { 249 threadPool.execute(new Runnable() { 250 public void run() { 251 if (!client.isClusteredConnection()) { 252 if (!info.hasVisited(remoteBrokerName)) { 253 synchronized (connected) { 254 while (!connected.get() && !stopped.get()) { 255 try { 256 connected.wait(500); 257 } 258 catch (InterruptedException e) { 259 log.debug("interuppted", e); 260 } 261 } 262 if (info.isStarted()) { 263 addConsumerInfo(info); 264 } 265 else { 266 removeConsumerInfo(info); 267 } 268 } 269 } 270 } 271 } 272 }); 273 } 274 catch (InterruptedException e) { 275 log.warn("Failed to process ConsumerInfo: " + info, e); 276 } 277 } 278 } 279 } 280 281 /** 282 * @return the uri of the broker(s) this channel is connected to 283 */ 284 public String getUri() { 285 return uri; 286 } 287 288 /** 289 * set the uri of the broker(s) this channel is connected to 290 * 291 * @param uri 292 */ 293 public void setUri(String uri) { 294 this.uri = uri; 295 } 296 297 /** 298 * @return Returns the remotePassword. 299 */ 300 public String getRemotePassword() { 301 return remotePassword; 302 } 303 304 /** 305 * @param remotePassword The remotePassword to set. 306 */ 307 public void setRemotePassword(String remotePassword) { 308 this.remotePassword = remotePassword; 309 } 310 311 /** 312 * @return Returns the remoteUserName. 313 */ 314 public String getRemoteUserName() { 315 return remoteUserName; 316 } 317 318 /** 319 * @param remoteUserName The remoteUserName to set. 320 */ 321 public void setRemoteUserName(String remoteUserName) { 322 this.remoteUserName = remoteUserName; 323 } 324 325 /** 326 * @return Returns the brokerContainer. 327 */ 328 public BrokerContainer getBrokerContainer() { 329 return brokerContainer; 330 } 331 332 /** 333 * @param brokerContainer The brokerContainer to set. 334 */ 335 public void setBrokerContainer(BrokerContainer brokerContainer) { 336 this.brokerContainer = brokerContainer; 337 } 338 339 public int getMaximumRetries() { 340 return maximumRetries; 341 } 342 343 public void setMaximumRetries(int maximumRetries) { 344 this.maximumRetries = maximumRetries; 345 } 346 347 public long getReconnectSleepTime() { 348 return reconnectSleepTime; 349 } 350 351 public void setReconnectSleepTime(long reconnectSleepTime) { 352 this.reconnectSleepTime = reconnectSleepTime; 353 } 354 355 public String getRemoteBrokerName() { 356 return remoteBrokerName; 357 } 358 359 public void setRemoteBrokerName(String remoteBrokerName) { 360 this.remoteBrokerName = remoteBrokerName; 361 } 362 363 /** 364 * @return Returns the threadPool. 365 */ 366 protected PooledExecutor getThreadPool() { 367 return threadPool; 368 } 369 370 /** 371 * @param threadPool The threadPool to set. 372 */ 373 protected void setThreadPool(PooledExecutor threadPool) { 374 this.threadPool = threadPool; 375 } 376 377 private synchronized ActiveMQConnection getLocalConnection() throws JMSException { 378 if (localConnection == null) { 379 initializeLocal(); 380 } 381 return localConnection; 382 } 383 384 private synchronized ActiveMQConnection getRemoteConnection() throws JMSException { 385 if (remoteConnection == null) { 386 initializeRemote(); 387 } 388 return remoteConnection; 389 } 390 391 /** 392 * @return Returns the localPrefetchPolicy. 393 */ 394 public ActiveMQPrefetchPolicy getLocalPrefetchPolicy() { 395 return localPrefetchPolicy; 396 } 397 398 /** 399 * @param localPrefetchPolicy The localPrefetchPolicy to set. 400 */ 401 public void setLocalPrefetchPolicy(ActiveMQPrefetchPolicy localPrefetchPolicy) { 402 this.localPrefetchPolicy = localPrefetchPolicy; 403 } 404 405 /** 406 * @return Returns the remotePrefetchPolicy. 407 */ 408 public ActiveMQPrefetchPolicy getRemotePrefetchPolicy() { 409 return remotePrefetchPolicy; 410 } 411 412 /** 413 * @param remotePrefetchPolicy The remotePrefetchPolicy to set. 414 */ 415 public void setRemotePrefetchPolicy(ActiveMQPrefetchPolicy remotePrefetchPolicy) { 416 this.remotePrefetchPolicy = remotePrefetchPolicy; 417 } 418 419 /** 420 * @return Returns the demandBasedForwarding. 421 */ 422 public boolean isDemandBasedForwarding() { 423 return demandBasedForwarding; 424 } 425 426 /** 427 * @param demandBasedForwarding The demandBasedForwarding to set. 428 */ 429 public void setDemandBasedForwarding(boolean demandBasedForwarding) { 430 this.demandBasedForwarding = demandBasedForwarding; 431 } 432 433 // Implementation methods 434 //------------------------------------------------------------------------- 435 /** 436 * Implementation of ConnectionAdvisoryEventListener 437 * 438 * @param event 439 */ 440 public void onEvent(ConnectionAdvisoryEvent event) { 441 String localBrokerName = brokerContainer.getBroker().getBrokerName(); 442 if (!event.getInfo().isClosed()) { 443 brokerContainer.registerRemoteClientID(event.getInfo().getClientId()); 444 } 445 else { 446 brokerContainer.deregisterRemoteClientID(event.getInfo().getClientId()); 447 } 448 } 449 450 private void addConsumerInfo(ConsumerInfo info) { 451 addConsumerInfo(info.getDestination(), info.getDestination().isTopic(), info.isDurableTopic()); 452 } 453 454 private void addConsumerInfo(ActiveMQDestination destination, boolean topic, boolean durableTopic) { 455 Map map = topic ? topicConsumerMap : queueConsumerMap; 456 NetworkMessageBridge bridge = (NetworkMessageBridge) map.get(destination.getPhysicalName()); 457 if (bridge == null) { 458 bridge = createBridge(map, destination, durableTopic); 459 } 460 else if (durableTopic && !bridge.isDurableTopic() && !demandBasedForwarding) { 461 //upgrade our subscription 462 bridge.decrementReferenceCount(); 463 upgradeBridge(bridge); 464 } 465 bridge.incrementReferenceCount(); 466 } 467 468 private void upgradeBridge(NetworkMessageBridge bridge) { 469 try { 470 remoteConnection.stop(); 471 bridge.upgrade(); 472 } 473 catch (JMSException e) { 474 log.warn("Could not upgrade the NetworkMessageBridge to a durable subscription for destination: " 475 + bridge.getDestination(), e); 476 } 477 try { 478 remoteConnection.start(); 479 } 480 catch (JMSException e) { 481 log.error("Failed to restart the NetworkMessageBridge", e); 482 } 483 } 484 485 private NetworkMessageBridge createBridge(Map map, ActiveMQDestination destination, boolean durableTopic) { 486 NetworkMessageBridge bridge = new NetworkMessageBridge(); 487 try { 488 bridge.setDestination(destination); 489 bridge.setDurableTopic(durableTopic); 490 bridge.setLocalBrokerName(brokerContainer.getBroker().getBrokerName()); 491 bridge.setLocalSession(getLocalConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE)); 492 bridge.setRemoteSession(getRemoteConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE)); 493 map.put(destination.getPhysicalName(), bridge); 494 bridge.start(); 495 log.info("started NetworkMessageBridge for destination: " + destination + " -- NetworkChannel: " 496 + this.toString()); 497 } 498 catch (JMSException jmsEx) { 499 log.error("Failed to start NetworkMessageBridge for destination: " + destination, jmsEx); 500 } 501 return bridge; 502 } 503 504 private void removeConsumerInfo(final ConsumerInfo info) { 505 final String physicalName = info.getDestination().getPhysicalName(); 506 Map map = (demandBasedForwarding || info.getDestination().isTopic()) ? topicConsumerMap : queueConsumerMap; 507 final NetworkMessageBridge bridge = (NetworkMessageBridge) map.get(physicalName); 508 if (bridge != null) { 509 if (bridge.decrementReferenceCount() <= 0) { 510 try { 511 threadPool.execute(new Runnable() { 512 public void run() { 513 bridge.stop(); 514 topicConsumerMap.remove(physicalName); 515 log.info("stopped MetworkMessageBridge for destination: " + info.getDestination()); 516 } 517 }); 518 } 519 catch (InterruptedException e) { 520 log.warn("got interrupted stoping NetworkBridge", e); 521 } 522 } 523 } 524 } 525 526 private void startSubscriptions() { 527 if (!demandBasedForwarding) { 528 if (!remote) { 529 MessageContainerManager mcm = brokerContainer.getBroker().getPersistentTopicContainerManager(); 530 if (mcm != null) { 531 Map map = mcm.getLocalDestinations(); 532 startSubscriptions(map, true, true); 533 } 534 mcm = brokerContainer.getBroker().getTransientTopicContainerManager(); 535 if (mcm != null) { 536 Map map = mcm.getLocalDestinations(); 537 startSubscriptions(map, true, false); 538 } 539 mcm = brokerContainer.getBroker().getTransientQueueContainerManager(); 540 if (mcm != null) { 541 Map map = mcm.getLocalDestinations(); 542 startSubscriptions(map, false, false); 543 } 544 mcm = brokerContainer.getBroker().getPersistentQueueContainerManager(); 545 if (mcm != null) { 546 Map map = mcm.getLocalDestinations(); 547 startSubscriptions(map, false, false); 548 } 549 } 550 } 551 } 552 553 private void startSubscriptions(Map destinations, boolean topic, boolean durableTopic) { 554 if (destinations != null) { 555 for (Iterator i = destinations.values().iterator();i.hasNext();) { 556 ActiveMQDestination dest = (ActiveMQDestination) i.next(); 557 addConsumerInfo(dest, topic, durableTopic); 558 } 559 } 560 } 561 562 protected void initialize() throws JMSException { 563 // force lazy construction 564 initializeLocal(); 565 initializeRemote(); 566 brokerContainer.getBroker().addConsumerInfoListener(NetworkChannel.this); 567 } 568 569 private synchronized void initializeRemote() throws JMSException { 570 if (remoteConnection == null) { 571 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(remoteUserName, remotePassword, uri); 572 //factory.setTurboBoost(true); 573 factory.setJ2EEcompliant(false); 574 factory.setQuickClose(true); 575 factory.setInternalConnection(true); 576 remoteConnection = (ActiveMQConnection) factory.createConnection(); 577 TransportChannel transportChannel = remoteConnection.getTransportChannel(); 578 if (transportChannel instanceof CompositeTransportChannel) { 579 CompositeTransportChannel composite = (CompositeTransportChannel) transportChannel; 580 composite.setMaximumRetries(maximumRetries); 581 composite.setFailureSleepTime(reconnectSleepTime); 582 composite.setIncrementTimeout(false); 583 } 584 transportChannel.addTransportStatusEventListener(this); 585 remoteConnection.setClientID(brokerContainer.getBroker().getBrokerName() + "_NetworkChannel"); 586 remoteConnection.start(); 587 BrokerInfo info = new BrokerInfo(); 588 info.setBrokerName(brokerContainer.getBroker().getBrokerName()); 589 info.setClusterName(brokerContainer.getBroker().getBrokerClusterName()); 590 Receipt receipt = remoteConnection.syncSendRequest(info); 591 if (receipt != null) { 592 remoteBrokerName = receipt.getBrokerName(); 593 remoteClusterName = receipt.getClusterName(); 594 } 595 connectionAdvisor = new ConnectionAdvisor(remoteConnection); 596 connectionAdvisor.addListener(this); 597 connectionAdvisor.start(); 598 if (remotePrefetchPolicy != null) { 599 remoteConnection.setPrefetchPolicy(remotePrefetchPolicy); 600 } 601 } 602 doSetConnected(); 603 } 604 605 private void initializeLocal() throws JMSException { 606 String brokerName = brokerContainer.getBroker().getBrokerName(); 607 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + brokerName); 608 factory.setTurboBoost(true); 609 factory.setJ2EEcompliant(false); 610 factory.setBrokerName(brokerName); 611 factory.setQuickClose(true); 612 factory.setInternalConnection(true); 613 localConnection = (ActiveMQConnection) factory.createConnection(); 614 localConnection.start(); 615 BrokerInfo info = new BrokerInfo(); 616 info.setBrokerName(remoteBrokerName); 617 info.setClusterName(remoteClusterName); 618 localConnection.asyncSendPacket(info); 619 if (localPrefetchPolicy != null) { 620 localConnection.setPrefetchPolicy(localPrefetchPolicy); 621 } 622 } 623 624 /*private synchronized void releaseRemote() throws JMSException { 625 if (remoteConnection != null) { 626 TransportChannel transportChannel = remoteConnection.getTransportChannel(); 627 transportChannel.stop(); 628 if (connectionAdvisor != null) { 629 connectionAdvisor.stop(); 630 } 631 try { 632 remoteConnection.stop(); 633 } catch (JMSException e) { 634 // ignore this exception, since the remote broker is most likely down 635 } 636 remoteConnection = null; 637 } 638 }*/ 639 640 }