001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.broker.impl; 020 import java.io.IOException; 021 import java.util.ArrayList; 022 import java.util.HashSet; 023 import java.util.Iterator; 024 import java.util.Set; 025 026 import javax.jms.ExceptionListener; 027 import javax.jms.JMSException; 028 import javax.security.auth.Subject; 029 import javax.transaction.xa.XAException; 030 031 import org.activemq.broker.BrokerAdmin; 032 import org.activemq.broker.BrokerClient; 033 import org.activemq.broker.BrokerConnector; 034 import org.activemq.io.util.SpooledBoundedActiveMQMessageQueue; 035 import org.activemq.message.ActiveMQMessage; 036 import org.activemq.message.ActiveMQXid; 037 import org.activemq.message.BrokerAdminCommand; 038 import org.activemq.message.BrokerInfo; 039 import org.activemq.message.CapacityInfo; 040 import org.activemq.message.CleanupConnectionInfo; 041 import org.activemq.message.ConnectionInfo; 042 import org.activemq.message.ConsumerInfo; 043 import org.activemq.message.DurableUnsubscribe; 044 import org.activemq.message.IntResponseReceipt; 045 import org.activemq.message.KeepAlive; 046 import org.activemq.message.MessageAck; 047 import org.activemq.message.Packet; 048 import org.activemq.message.PacketListener; 049 import org.activemq.message.ProducerInfo; 050 import org.activemq.message.Receipt; 051 import org.activemq.message.ResponseReceipt; 052 import org.activemq.message.SessionInfo; 053 import org.activemq.message.TransactionInfo; 054 import org.activemq.message.XATransactionInfo; 055 import org.activemq.transport.NetworkChannel; 056 import org.activemq.transport.NetworkConnector; 057 import org.activemq.transport.TransportChannel; 058 import org.activemq.util.IdGenerator; 059 import org.apache.commons.logging.Log; 060 import org.apache.commons.logging.LogFactory; 061 062 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 063 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 064 import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor; 065 066 /** 067 * A Broker client side proxy representing a JMS Connnection 068 * 069 * @version $Revision: 1.1.1.1 $ 070 */ 071 public class BrokerClientImpl implements BrokerClient, ExceptionListener, PacketListener { 072 073 private static final Log log = LogFactory.getLog(BrokerClientImpl.class); 074 private static final Log commandLog = LogFactory.getLog("org.activemq.broker.CommandTrace"); 075 076 private BrokerConnector brokerConnector; 077 private TransportChannel channel; 078 private ConnectionInfo connectionInfo; 079 private IdGenerator packetIdGenerator; 080 private SynchronizedBoolean closed; 081 private Set activeConsumers; 082 private CopyOnWriteArrayList consumers; 083 private CopyOnWriteArrayList producers; 084 private CopyOnWriteArrayList transactions; 085 private CopyOnWriteArrayList sessions; 086 private SynchronizedBoolean started; 087 private boolean brokerConnection; 088 private boolean clusteredConnection; 089 private String remoteBrokerName; 090 private int capacity = 100; 091 private SpooledBoundedActiveMQMessageQueue spoolQueue; 092 private boolean cleanedUp; 093 private boolean registered; 094 private ArrayList dispatchQueue = new ArrayList(); 095 private Subject subject; 096 private boolean remoteNetworkConnector; 097 098 /** 099 * Default Constructor of BrokerClientImpl 100 */ 101 public BrokerClientImpl() { 102 this.packetIdGenerator = new IdGenerator(); 103 this.closed = new SynchronizedBoolean(false); 104 this.started = new SynchronizedBoolean(false); 105 this.activeConsumers = new HashSet(); 106 this.consumers = new CopyOnWriteArrayList(); 107 this.producers = new CopyOnWriteArrayList(); 108 this.transactions = new CopyOnWriteArrayList(); 109 this.sessions = new CopyOnWriteArrayList(); 110 } 111 112 /** 113 * Initialize the BrokerClient 114 * 115 * @param brokerConnector 116 * @param channel 117 */ 118 public void initialize(BrokerConnector brokerConnector, TransportChannel channel) { 119 this.brokerConnector = brokerConnector; 120 this.channel = channel; 121 this.channel.setPacketListener(this); 122 this.channel.setExceptionListener(this); 123 log.trace("brokerConnectorConnector client initialized"); 124 } 125 126 /** 127 * @return the BrokerConnector this client is associated with 128 */ 129 public BrokerConnector getBrokerConnector() { 130 return this.brokerConnector; 131 } 132 133 /** 134 * @return the connection information for this client 135 */ 136 public ConnectionInfo getConnectionInfo() { 137 return connectionInfo; 138 } 139 140 /** 141 * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) 142 */ 143 public void onException(JMSException jmsEx) { 144 log.info("Client disconnected: " + this); 145 log.debug("Disconnect cuase: ", jmsEx); 146 close(); 147 } 148 149 /** 150 * @return pretty print for this brokerConnector-client 151 */ 152 public String toString() { 153 String str = "brokerConnector-client:(" + hashCode() + ") "; 154 str += connectionInfo == null ? "" : connectionInfo.getClientId(); 155 str += ": " + channel; 156 return str; 157 } 158 159 /** 160 * Dispatch an ActiveMQMessage to the end client 161 * 162 * @param message 163 */ 164 public void dispatch(ActiveMQMessage message) { 165 if (!isSlowConsumer()) { 166 dispatchToClient(message); 167 } 168 else { 169 if (spoolQueue == null) { 170 log.warn("Connection: " + connectionInfo.getClientId() + " is a slow consumer"); 171 String spoolName = brokerConnector.getBrokerInfo().getBrokerName() + "_" + connectionInfo.getClientId(); 172 try { 173 spoolQueue = new SpooledBoundedActiveMQMessageQueue(brokerConnector.getBrokerContainer().getBroker() 174 .getTempDir(), spoolName); 175 final SpooledBoundedActiveMQMessageQueue bpq = spoolQueue; 176 ThreadedExecutor exec = new ThreadedExecutor(); 177 exec.execute(new Runnable() { 178 public void run() { 179 while (!closed.get()) { 180 try { 181 Packet packet = bpq.dequeue(); 182 if (packet != null) { 183 dispatchToClient(packet); 184 } 185 } 186 catch (InterruptedException e) { 187 log.warn("async dispatch got an interupt", e); 188 } 189 catch (JMSException e) { 190 log.error("async dispatch got an problem", e); 191 } 192 } 193 } 194 }); 195 } 196 catch (IOException e) { 197 log.error("Could not create SpooledBoundedQueue for this slow consumer", e); 198 close(); 199 } 200 catch (InterruptedException e) { 201 log.error("Could not create SpooledBoundedQueue for this slow consumer", e); 202 close(); 203 } 204 } 205 if (spoolQueue != null) { 206 try { 207 spoolQueue.enqueue(message); 208 } 209 catch (JMSException e) { 210 log.error( 211 "Could not enqueue message " + message + " to SpooledBoundedQueue for this slow consumer", 212 e); 213 close(); 214 } 215 } 216 } 217 } 218 219 private void dispatchToClient(Packet message) { 220 if (started.get()) { 221 send(message); 222 223 } 224 else { 225 boolean msgSent = false; 226 if (message.isJMSMessage()) { 227 ActiveMQMessage jmsMsg = (ActiveMQMessage) message; 228 if (jmsMsg.getJMSActiveMQDestination().isAdvisory()) { 229 send(message); 230 msgSent = true; 231 } 232 } 233 if (!msgSent) { 234 // If the connection is stopped.. we have to hold the message till it is started. 235 synchronized (started) { 236 dispatchQueue.add(message); 237 } 238 } 239 } 240 } 241 242 /** 243 * @return true if the peer for this Client is itself another Broker 244 */ 245 public boolean isBrokerConnection() { 246 return brokerConnection; 247 } 248 249 /** 250 * @return true id this client is part of a cluster 251 */ 252 public boolean isClusteredConnection() { 253 return clusteredConnection; 254 } 255 256 /** 257 * Get the Capacity for in-progress messages at the peer (probably a JMSConnection) Legimate values between 0-100. 0 258 * capacity representing that the peer cannot process any more messages at the current time 259 * 260 * @return 261 */ 262 public int getCapacity() { 263 return capacity; 264 } 265 266 /** 267 * @return the client id of the remote connection 268 */ 269 public String getClientID() { 270 if (connectionInfo != null) { 271 return connectionInfo.getClientId(); 272 } 273 return null; 274 } 275 276 /** 277 * @return the channel used 278 */ 279 public TransportChannel getChannel() { 280 return channel; 281 } 282 283 /** 284 * Get an indication if the peer should be considered as a slow consumer 285 * 286 * @return true id the peer should be considered as a slow consumer 287 */ 288 public boolean isSlowConsumer() { 289 return capacity <= 20; //don't want to fill the peer completely - as this may effect it's processing! 290 } 291 292 /** 293 * Consume a Packet from the underlying TransportChannel for processing 294 * 295 * @param packet 296 */ 297 public void consume(Packet packet) { 298 if (packet != null) { 299 300 if( commandLog.isDebugEnabled() ) 301 commandLog.debug("broker for "+getClientID()+" received: "+packet); 302 303 Throwable requestEx = null; 304 boolean failed = false; 305 boolean receiptRequired = packet.isReceiptRequired(); 306 short correlationId = packet.getId(); 307 String brokerName = brokerConnector.getBrokerInfo().getBrokerName(); 308 String clusterName = brokerConnector.getBrokerInfo().getClusterName(); 309 try { 310 if (brokerConnection) { 311 if (remoteBrokerName != null && remoteBrokerName.length() > 0) { 312 packet.addBrokerVisited(remoteBrokerName); //got from the remote broker 313 } 314 packet.addBrokerVisited(brokerName); 315 } 316 // Checks if the current broker has already processed this packet. 317 // This is a lazy check, since the broker already received the packet, 318 // but chooses not to process it for cases where there is no remote broker info. 319 else { 320 if (packet.hasVisited(brokerName)) { 321 // Packet has already been processed. Do not process again. 322 return; 323 } else { 324 // Include this broker as a processor of the packet. 325 packet.addBrokerVisited(brokerName); 326 } 327 } 328 329 if (packet.isJMSMessage()) { 330 ActiveMQMessage message = (ActiveMQMessage) packet; 331 332 if (!brokerConnection) { 333 message.setEntryBrokerName(brokerName); 334 message.setEntryClusterName(clusterName); 335 } 336 consumeActiveMQMessage(message); 337 } 338 else { 339 switch (packet.getPacketType()) { 340 case Packet.ACTIVEMQ_MSG_ACK : { 341 MessageAck ack = (MessageAck) packet; 342 consumeMessageAck(ack); 343 break; 344 } 345 case Packet.XA_TRANSACTION_INFO : { 346 XATransactionInfo info = (XATransactionInfo) packet; 347 consumeXATransactionInfo(info); 348 receiptRequired=info.isReceiptRequired(); 349 break; 350 } 351 case Packet.TRANSACTION_INFO : { 352 TransactionInfo info = (TransactionInfo) packet; 353 consumeTransactionInfo(info); 354 break; 355 } 356 case Packet.CONSUMER_INFO : { 357 ConsumerInfo info = (ConsumerInfo) packet; 358 consumeConsumerInfo(info); 359 break; 360 } 361 case Packet.PRODUCER_INFO : { 362 ProducerInfo info = (ProducerInfo) packet; 363 consumeProducerInfo(info); 364 break; 365 } 366 case Packet.SESSION_INFO : { 367 SessionInfo info = (SessionInfo) packet; 368 consumeSessionInfo(info); 369 break; 370 } 371 case Packet.ACTIVEMQ_CONNECTION_INFO : { 372 ConnectionInfo info = (ConnectionInfo) packet; 373 consumeConnectionInfo(info); 374 break; 375 } 376 case Packet.DURABLE_UNSUBSCRIBE : { 377 DurableUnsubscribe ds = (DurableUnsubscribe) packet; 378 brokerConnector.durableUnsubscribe(this, ds); 379 break; 380 } 381 case Packet.CAPACITY_INFO : { 382 CapacityInfo info = (CapacityInfo) packet; 383 consumeCapacityInfo(info); 384 break; 385 } 386 case Packet.CAPACITY_INFO_REQUEST : { 387 updateCapacityInfo(packet.getId()); 388 break; 389 } 390 case Packet.ACTIVEMQ_BROKER_INFO : { 391 consumeBrokerInfo((BrokerInfo) packet); 392 break; 393 } 394 case Packet.KEEP_ALIVE : { 395 // Ignore as the packet contains no additional information to consume 396 break; 397 } 398 case Packet.BROKER_ADMIN_COMMAND : { 399 consumeBrokerAdminCommand((BrokerAdminCommand) packet); 400 break; 401 } 402 case Packet.CLEANUP_CONNECTION_INFO : { 403 consumeCleanupConnectionInfo((CleanupConnectionInfo) packet); 404 break; 405 } 406 default : { 407 log.warn("Unknown Packet received: " + packet); 408 break; 409 } 410 } 411 } 412 } 413 catch (Throwable e) { 414 requestEx = e; 415 log.warn("caught exception consuming packet: " + packet, e); 416 failed = true; 417 } 418 if (receiptRequired){ 419 sendReceipt(correlationId, requestEx, failed); 420 } 421 } 422 } 423 424 /** 425 * @param cleanupInfo 426 * @throws JMSException 427 */ 428 private void consumeCleanupConnectionInfo(CleanupConnectionInfo cleanupInfo) throws JMSException { 429 try { 430 431 for (Iterator i = consumers.iterator(); i.hasNext();) { 432 ConsumerInfo info = (ConsumerInfo) i.next(); 433 info.setStarted(false); 434 this.brokerConnector.deregisterMessageConsumer(this, info); 435 } 436 for (Iterator i = producers.iterator(); i.hasNext();) { 437 ProducerInfo info = (ProducerInfo) i.next(); 438 info.setStarted(false); 439 this.brokerConnector.deregisterMessageProducer(this, info); 440 } 441 for (Iterator i = sessions.iterator(); i.hasNext();) { 442 SessionInfo info = (SessionInfo) i.next(); 443 info.setStarted(false); 444 this.brokerConnector.deregisterSession(this, info); 445 } 446 for (Iterator i = transactions.iterator(); i.hasNext();) { 447 this.brokerConnector.rollbackTransaction(this, i.next().toString()); 448 } 449 this.brokerConnector.deregisterClient(this, connectionInfo); 450 registered = false; 451 452 } finally { 453 // whatever happens, lets make sure we unregister & clean things 454 // down 455 if (log.isDebugEnabled()) { 456 log.debug(this + " has stopped"); 457 } 458 this.consumers.clear(); 459 this.producers.clear(); 460 this.transactions.clear(); 461 this.sessions.clear(); 462 } 463 464 } 465 466 /** 467 * @param command 468 * @throws JMSException 469 */ 470 private void consumeBrokerAdminCommand(BrokerAdminCommand command) throws JMSException { 471 BrokerAdmin brokerAdmin = brokerConnector.getBrokerContainer().getBroker().getBrokerAdmin(); 472 if (BrokerAdminCommand.CREATE_DESTINATION.equals(command.getCommand())) { 473 brokerAdmin.createMessageContainer(command.getDestination()); 474 } 475 else if (BrokerAdminCommand.DESTROY_DESTINATION.equals(command.getCommand())) { 476 brokerAdmin.destoryMessageContainer(command.getDestination()); 477 } 478 else if (BrokerAdminCommand.EMPTY_DESTINATION.equals(command.getCommand())) { 479 brokerAdmin.getMessageContainerAdmin(command.getDestination()).empty(); 480 } 481 else if (BrokerAdminCommand.SHUTDOWN_SERVER_VM.equals(command.getCommand())) { 482 if (Boolean.getBoolean("enable.vm.shutdown")) { 483 log.info("processing command=" + BrokerAdminCommand.SHUTDOWN_SERVER_VM); 484 System.exit(1); 485 } else 486 { 487 log.warn("ignoring command=" + BrokerAdminCommand.SHUTDOWN_SERVER_VM + ", enable.vm.shutdown=false"); 488 } 489 } 490 else { 491 throw new JMSException("Broker Admin Command type: " + command.getCommand() + " not recognized."); 492 } 493 } 494 495 /** 496 * Register/deregister MessageConsumer with the Broker 497 * 498 * @param info 499 * @throws JMSException 500 */ 501 public void consumeConsumerInfo(ConsumerInfo info) throws JMSException { 502 String localBrokerName = brokerConnector.getBrokerInfo().getBrokerName(); 503 if (info.isStarted()) { 504 consumers.add(info); 505 if (this.activeConsumers.add(info)) { 506 this.brokerConnector.registerMessageConsumer(this, info); 507 } 508 } 509 else { 510 consumers.remove(info); 511 if (activeConsumers.remove(info)) { 512 this.brokerConnector.deregisterMessageConsumer(this, info); 513 } 514 } 515 } 516 517 /** 518 * Update the peer Connection about the Broker's capacity for messages 519 * 520 * @param capacity 521 */ 522 public void updateBrokerCapacity(int capacity) { 523 CapacityInfo info = new CapacityInfo(); 524 info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName()); 525 info.setCapacity(capacity); 526 info.setFlowControlTimeout(getFlowControlTimeout(capacity)); 527 send(info); 528 } 529 530 /** 531 * register with the Broker 532 * 533 * @param info 534 * @throws JMSException 535 */ 536 public void consumeConnectionInfo(ConnectionInfo info) throws JMSException { 537 this.connectionInfo = info; 538 if (info.isClosed()) { 539 try { 540 cleanUp(); 541 if (info.isReceiptRequired()){ 542 sendReceipt(info.getId(), null, false); 543 } 544 info.setReceiptRequired(false); 545 try { 546 Thread.sleep(500); 547 } 548 catch (Throwable e) { 549 } 550 } 551 finally { 552 close(); 553 } 554 } 555 else { 556 if (!registered) { 557 this.brokerConnector.registerClient(this, info); 558 registered = true; 559 } 560 synchronized (started) { 561 //set transport hint 562 if (info.getProperties() != null && info.getProperties().getProperty(ConnectionInfo.NO_DELAY_PROPERTY) != null){ 563 boolean noDelay = new Boolean(info.getProperties().getProperty(ConnectionInfo.NO_DELAY_PROPERTY)).booleanValue(); 564 channel.setNoDelay(noDelay); 565 566 } 567 if (!started.get() && info.isStarted()) { 568 started.set(true); 569 // Dispatch any queued 570 log.debug(this + " has started running client version " + info.getClientVersion() 571 + " , wire format = " + info.getWireFormatVersion()); 572 //go through consumers, producers, and sessions - setting their clientId (which might not have been set) 573 for (Iterator i = consumers.iterator();i.hasNext();) { 574 ConsumerInfo ci = (ConsumerInfo) i.next(); 575 ci.setClientId(info.getClientId()); 576 } 577 for (Iterator i = producers.iterator();i.hasNext();) { 578 ProducerInfo pi = (ProducerInfo) i.next(); 579 pi.setClientId(info.getClientId()); 580 } 581 for (Iterator i = sessions.iterator();i.hasNext();) { 582 SessionInfo si = (SessionInfo) i.next(); 583 si.setClientId(info.getClientId()); 584 } 585 for (int i = 0;i < dispatchQueue.size();i++) { 586 ActiveMQMessage msg = (ActiveMQMessage) dispatchQueue.get(i); 587 dispatch(msg); 588 } 589 dispatchQueue.clear(); 590 } 591 if (started.get() && !info.isStarted()) { 592 started.set(false); 593 log.debug(this + " has stopped"); 594 } 595 } 596 } 597 } 598 599 /** 600 * start consuming messages 601 * 602 * @throws JMSException 603 */ 604 public void start() throws JMSException { 605 channel.start(); 606 } 607 608 /** 609 * stop consuming messages 610 * 611 * @throws JMSException 612 */ 613 public void stop() throws JMSException { 614 log.trace("Stopping channel: " + channel); 615 channel.stop(); 616 } 617 618 /** 619 * cleanup 620 */ 621 public synchronized void cleanUp() { 622 // we could be called here from 2 different code paths 623 // based on if we get a transport failure or we do a clean shutdown 624 // so lets only run this stuff once 625 if (!cleanedUp) { 626 cleanedUp = true; 627 try { 628 try { 629 for (Iterator i = consumers.iterator();i.hasNext();) { 630 ConsumerInfo info = (ConsumerInfo) i.next(); 631 info.setStarted(false); 632 this.brokerConnector.deregisterMessageConsumer(this, info); 633 } 634 for (Iterator i = producers.iterator();i.hasNext();) { 635 ProducerInfo info = (ProducerInfo) i.next(); 636 info.setStarted(false); 637 this.brokerConnector.deregisterMessageProducer(this, info); 638 } 639 for (Iterator i = sessions.iterator();i.hasNext();) { 640 SessionInfo info = (SessionInfo) i.next(); 641 info.setStarted(false); 642 this.brokerConnector.deregisterSession(this, info); 643 } 644 for (Iterator i = transactions.iterator();i.hasNext();) { 645 this.brokerConnector.rollbackTransaction(this, i.next().toString()); 646 } 647 } 648 finally { 649 // whatever happens, lets make sure we unregister & clean things down 650 if (log.isDebugEnabled()) { 651 log.debug(this + " has stopped"); 652 } 653 this.consumers.clear(); 654 this.producers.clear(); 655 this.transactions.clear(); 656 this.sessions.clear(); 657 this.brokerConnector.deregisterClient(this, connectionInfo); 658 registered = false; 659 } 660 } 661 catch (JMSException e) { 662 log.warn("failed to de-register Broker client: " + e, e); 663 } 664 } 665 else { 666 log.debug("We are ignoring a duplicate cleanup() method called for: " + this); 667 } 668 } 669 670 // Implementation methods 671 //------------------------------------------------------------------------- 672 protected void send(Packet packet) { 673 if (!closed.get()) { 674 try { 675 if (brokerConnection) { 676 String brokerName = brokerConnector.getBrokerContainer().getBroker().getBrokerName(); 677 packet.addBrokerVisited(brokerName); 678 if (packet.hasVisited(remoteBrokerName)) { 679 if (log.isDebugEnabled()) { 680 log.debug("Not Forwarding: " + remoteBrokerName + " has already been visited by packet: " 681 + packet); 682 } 683 return; 684 } 685 } 686 packet.setId(this.packetIdGenerator.getNextShortSequence()); 687 if( commandLog.isDebugEnabled() ) 688 commandLog.debug("broker for "+getClientID()+" sending: "+packet); 689 this.channel.asyncSend(packet); 690 } 691 catch (JMSException e) { 692 log.warn(this + " caught exception ", e); 693 close(); 694 } 695 } 696 } 697 698 /** 699 * validate the connection 700 * @param timeout 701 * @throws JMSException 702 */ 703 public void validateConnection(int timeout) throws JMSException { 704 KeepAlive packet = new KeepAlive(); 705 packet.setReceiptRequired(true); 706 packet.setId(this.packetIdGenerator.getNextShortSequence()); 707 // In most cases, if the transport is dead due to network errors 708 // the network error will be recognised immediately and an exception 709 // thrown. If the duplicate client ids are due to misconfiguration, 710 // we make sure that we do not terminate the "right" connection 711 // prematurely by using a long timeout here. If the existing client 712 // is working heavily and/or over a slow link, it might take some time 713 // for it to respond. In such a case, the new client is misconfigured 714 // and can wait for a while before being kicked out. 715 716 Receipt r = getChannel().send(packet, timeout); 717 if (r == null) throw new JMSException("Client did not respond in time"); 718 719 } 720 721 protected void close() { 722 if (closed.commit(false, true)) { 723 this.channel.stop(); 724 log.debug(this + " has closed"); 725 } 726 } 727 728 /** 729 * Send message to Broker 730 * 731 * @param message 732 * @throws JMSException 733 */ 734 private void consumeActiveMQMessage(ActiveMQMessage message) throws JMSException { 735 this.brokerConnector.sendMessage(this, message); 736 } 737 738 /** 739 * Send Message acknowledge to the Broker 740 * 741 * @param ack 742 * @throws JMSException 743 */ 744 private void consumeMessageAck(MessageAck ack) throws JMSException { 745 this.brokerConnector.acknowledgeMessage(this, ack); 746 } 747 748 /** 749 * Handle transaction start/commit/rollback 750 * 751 * @param info 752 * @throws JMSException 753 */ 754 private void consumeTransactionInfo(TransactionInfo info) throws JMSException { 755 if (info.getType() == TransactionInfo.START) { 756 transactions.add(info.getTransactionId()); 757 this.brokerConnector.startTransaction(this, info.getTransactionId()); 758 } 759 else { 760 if (info.getType() == TransactionInfo.ROLLBACK) { 761 this.brokerConnector.rollbackTransaction(this, info.getTransactionId()); 762 } 763 else if (info.getType() == TransactionInfo.COMMIT) { 764 this.brokerConnector.commitTransaction(this, info.getTransactionId()); 765 } 766 transactions.remove(info.getTransactionId()); 767 } 768 } 769 770 /** 771 * Handle XA transaction start/prepare/commit/rollback 772 * 773 * @param info 774 * @throws JMSException 775 * @throws XAException 776 */ 777 private void consumeXATransactionInfo(XATransactionInfo info) throws JMSException, XAException { 778 if (info.getType() == XATransactionInfo.START) { 779 this.brokerConnector.startTransaction(this, info.getXid()); 780 } 781 else if (info.getType() == XATransactionInfo.XA_RECOVER) { 782 ActiveMQXid rc[] = this.brokerConnector.getPreparedTransactions(this); 783 if( info.isReceiptRequired()) { 784 // We will be sending our own receipt.. 785 info.setReceiptRequired(false); 786 // Send the receipt.. 787 ResponseReceipt receipt = new ResponseReceipt(); 788 receipt.setCorrelationId(info.getId()); 789 receipt.setResult(rc); 790 send(receipt); 791 } 792 } 793 else if (info.getType() == XATransactionInfo.GET_RM_ID) { 794 String rc = this.brokerConnector.getResourceManagerId(this); 795 if( info.isReceiptRequired()) { 796 // We will be sending our own receipt.. 797 info.setReceiptRequired(false); 798 // Send the receipt.. 799 ResponseReceipt receipt = new ResponseReceipt(); 800 receipt.setId(this.packetIdGenerator.getNextShortSequence()); 801 receipt.setCorrelationId(info.getId()); 802 receipt.setResult(rc); 803 send(receipt); 804 } 805 } 806 else if (info.getType() == XATransactionInfo.END) { 807 // we don't do anything.. 808 } 809 else { 810 if (info.getType() == XATransactionInfo.PRE_COMMIT) { 811 int rc = this.brokerConnector.prepareTransaction(this, info.getXid()); 812 // We will be sending our own receipt.. 813 if( info.isReceiptRequired()) { 814 info.setReceiptRequired(false); 815 // Send the receipt.. 816 IntResponseReceipt receipt = new IntResponseReceipt(); 817 receipt.setId(this.packetIdGenerator.getNextShortSequence()); 818 receipt.setCorrelationId(info.getId()); 819 receipt.setResult(rc); 820 send(receipt); 821 } 822 } 823 else if (info.getType() == XATransactionInfo.ROLLBACK) { 824 this.brokerConnector.rollbackTransaction(this, info.getXid()); 825 } 826 else if (info.getType() == XATransactionInfo.COMMIT_ONE_PHASE) { 827 this.brokerConnector.commitTransaction(this, info.getXid(), true); 828 } 829 else if (info.getType() == XATransactionInfo.COMMIT) { 830 this.brokerConnector.commitTransaction(this, info.getXid(), false); 831 } 832 else { 833 throw new JMSException("Packet type: " + info.getType() + " not recognized."); 834 } 835 } 836 } 837 838 /** 839 * register/deregister MessageProducer in the Broker 840 * 841 * @param info 842 * @throws JMSException 843 */ 844 private void consumeProducerInfo(ProducerInfo info) throws JMSException { 845 if (info.isStarted()) { 846 producers.add(info); 847 this.brokerConnector.registerMessageProducer(this, info); 848 } 849 else { 850 producers.remove(info); 851 this.brokerConnector.deregisterMessageProducer(this, info); 852 } 853 } 854 855 /** 856 * register/deregister Session in a Broker 857 * 858 * @param info 859 * @throws JMSException 860 */ 861 private void consumeSessionInfo(SessionInfo info) throws JMSException { 862 if (info.isStarted()) { 863 sessions.add(info); 864 this.brokerConnector.registerSession(this, info); 865 } 866 else { 867 sessions.remove(info); 868 this.brokerConnector.deregisterSession(this, info); 869 } 870 } 871 872 /** 873 * Update capacity for the peer 874 * 875 * @param info 876 */ 877 private void consumeCapacityInfo(CapacityInfo info) { 878 this.capacity = info.getCapacity(); 879 } 880 881 private void updateCapacityInfo(short correlationId) { 882 CapacityInfo info = new CapacityInfo(); 883 info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName()); 884 info.setCorrelationId(correlationId); 885 info.setCapacity(this.brokerConnector.getBrokerCapacity()); 886 info.setFlowControlTimeout(getFlowControlTimeout(info.getCapacity())); 887 send(info); 888 } 889 890 private long getFlowControlTimeout(int capacity) { 891 long result = -1; 892 if (capacity <= 0) { 893 result = 10000; 894 } 895 else if (capacity <= 10) { 896 result = 1000; 897 } 898 else if (capacity <= 20) { 899 result = 10; 900 } 901 return result; 902 } 903 904 private void consumeBrokerInfo(final BrokerInfo info) { 905 brokerConnection = true; 906 started.set(true); 907 remoteBrokerName = info.getBrokerName(); 908 if (remoteBrokerName == null || remoteBrokerName.length() == 0) { 909 log.warn("No remote broker name available!"); 910 } 911 else { 912 if (log.isDebugEnabled()) { 913 log.debug("Received broker info from: " + remoteBrokerName + " on client: " + channel); 914 } 915 } 916 String clusterName = getBrokerConnector().getBrokerContainer().getBroker().getBrokerClusterName(); 917 if (clusterName.equals(info.getClusterName())) { 918 clusteredConnection = true; 919 } 920 if (!remoteNetworkConnector && info.isRemote()) { 921 try { 922 final NetworkConnector networkConnector = new NetworkConnector(brokerConnector.getBrokerContainer()); 923 networkConnector.getThreadPool().execute(new Runnable() { 924 public void run() { 925 try { 926 NetworkChannel networkChannel = new NetworkChannel(networkConnector, brokerConnector 927 .getBrokerContainer(), channel, info.getBrokerName(), info.getClusterName()); 928 networkConnector.addNetworkChannel(networkChannel); 929 brokerConnector.getBrokerContainer().addNetworkConnector(networkConnector); 930 networkConnector.start(); 931 } 932 catch (JMSException e) { 933 log.error("Failed to create reverse remote channel", e); 934 } 935 } 936 }); 937 log.info("Started reverse remote channel to " + remoteBrokerName); 938 remoteNetworkConnector = true; 939 } 940 catch (InterruptedException e) { 941 log.error("Failed to create reverse remote channel", e); 942 } 943 } 944 } 945 946 947 private void sendReceipt(short correlationId, Throwable requestEx, boolean failed) { 948 Receipt receipt = new Receipt(); 949 receipt.setCorrelationId(correlationId); 950 receipt.setBrokerName(brokerConnector.getBrokerInfo().getBrokerName()); 951 receipt.setClusterName(brokerConnector.getBrokerInfo().getClusterName()); 952 receipt.setException(requestEx); 953 receipt.setFailed(failed); 954 send(receipt); 955 } 956 957 /** 958 * @param subject 959 */ 960 public void setSubject(Subject subject) { 961 this.subject = subject; 962 } 963 964 /** 965 * @return the subject 966 */ 967 public Subject getSubject() { 968 return subject; 969 } 970 }