001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.LinkedList; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Properties; 027 import java.util.concurrent.ConcurrentHashMap; 028 import java.util.concurrent.CountDownLatch; 029 import java.util.concurrent.TimeUnit; 030 import java.util.concurrent.atomic.AtomicBoolean; 031 import java.util.concurrent.atomic.AtomicInteger; 032 import java.util.concurrent.atomic.AtomicReference; 033 import java.util.concurrent.locks.ReentrantReadWriteLock; 034 import org.apache.activemq.broker.ft.MasterBroker; 035 import org.apache.activemq.broker.region.ConnectionStatistics; 036 import org.apache.activemq.broker.region.RegionBroker; 037 import org.apache.activemq.command.BrokerInfo; 038 import org.apache.activemq.command.Command; 039 import org.apache.activemq.command.CommandTypes; 040 import org.apache.activemq.command.ConnectionControl; 041 import org.apache.activemq.command.ConnectionError; 042 import org.apache.activemq.command.ConnectionId; 043 import org.apache.activemq.command.ConnectionInfo; 044 import org.apache.activemq.command.ConsumerControl; 045 import org.apache.activemq.command.ConsumerId; 046 import org.apache.activemq.command.ConsumerInfo; 047 import org.apache.activemq.command.ControlCommand; 048 import org.apache.activemq.command.DataArrayResponse; 049 import org.apache.activemq.command.DestinationInfo; 050 import org.apache.activemq.command.ExceptionResponse; 051 import org.apache.activemq.command.FlushCommand; 052 import org.apache.activemq.command.IntegerResponse; 053 import org.apache.activemq.command.KeepAliveInfo; 054 import org.apache.activemq.command.Message; 055 import org.apache.activemq.command.MessageAck; 056 import org.apache.activemq.command.MessageDispatch; 057 import org.apache.activemq.command.MessageDispatchNotification; 058 import org.apache.activemq.command.MessagePull; 059 import org.apache.activemq.command.ProducerAck; 060 import org.apache.activemq.command.ProducerId; 061 import org.apache.activemq.command.ProducerInfo; 062 import org.apache.activemq.command.RemoveSubscriptionInfo; 063 import org.apache.activemq.command.Response; 064 import org.apache.activemq.command.SessionId; 065 import org.apache.activemq.command.SessionInfo; 066 import org.apache.activemq.command.ShutdownInfo; 067 import org.apache.activemq.command.TransactionId; 068 import org.apache.activemq.command.TransactionInfo; 069 import org.apache.activemq.command.WireFormatInfo; 070 import org.apache.activemq.network.DemandForwardingBridge; 071 import org.apache.activemq.network.NetworkBridgeConfiguration; 072 import org.apache.activemq.network.NetworkBridgeFactory; 073 import org.apache.activemq.security.MessageAuthorizationPolicy; 074 import org.apache.activemq.state.CommandVisitor; 075 import org.apache.activemq.state.ConnectionState; 076 import org.apache.activemq.state.ConsumerState; 077 import org.apache.activemq.state.ProducerState; 078 import org.apache.activemq.state.SessionState; 079 import org.apache.activemq.state.TransactionState; 080 import org.apache.activemq.thread.DefaultThreadPools; 081 import org.apache.activemq.thread.Task; 082 import org.apache.activemq.thread.TaskRunner; 083 import org.apache.activemq.thread.TaskRunnerFactory; 084 import org.apache.activemq.transaction.Transaction; 085 import org.apache.activemq.transport.DefaultTransportListener; 086 import org.apache.activemq.transport.ResponseCorrelator; 087 import org.apache.activemq.transport.Transport; 088 import org.apache.activemq.transport.TransportFactory; 089 import org.apache.activemq.util.IntrospectionSupport; 090 import org.apache.activemq.util.MarshallingSupport; 091 import org.apache.activemq.util.ServiceSupport; 092 import org.apache.activemq.util.URISupport; 093 import org.apache.commons.logging.Log; 094 import org.apache.commons.logging.LogFactory; 095 096 import static org.apache.activemq.thread.DefaultThreadPools.*; 097 /** 098 * @version $Revision: 1.8 $ 099 */ 100 public class TransportConnection implements Connection, Task, CommandVisitor { 101 private static final Log LOG = LogFactory.getLog(TransportConnection.class); 102 private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName() + ".Transport"); 103 private static final Log SERVICELOG = LogFactory.getLog(TransportConnection.class.getName() + ".Service"); 104 // Keeps track of the broker and connector that created this connection. 105 protected final Broker broker; 106 protected final TransportConnector connector; 107 // Keeps track of the state of the connections. 108 // protected final ConcurrentHashMap localConnectionStates=new 109 // ConcurrentHashMap(); 110 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; 111 // The broker and wireformat info that was exchanged. 112 protected BrokerInfo brokerInfo; 113 protected final List<Command> dispatchQueue = new LinkedList<Command>(); 114 protected TaskRunner taskRunner; 115 protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>(); 116 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); 117 private MasterBroker masterBroker; 118 private final Transport transport; 119 private MessageAuthorizationPolicy messageAuthorizationPolicy; 120 private WireFormatInfo wireFormatInfo; 121 // Used to do async dispatch.. this should perhaps be pushed down into the 122 // transport layer.. 123 private boolean inServiceException; 124 private ConnectionStatistics statistics = new ConnectionStatistics(); 125 private boolean manageable; 126 private boolean slow; 127 private boolean markedCandidate; 128 private boolean blockedCandidate; 129 private boolean blocked; 130 private boolean connected; 131 private boolean active; 132 private boolean starting; 133 private boolean pendingStop; 134 private long timeStamp; 135 private final AtomicBoolean stopping = new AtomicBoolean(false); 136 private CountDownLatch stopped = new CountDownLatch(1); 137 private final AtomicBoolean asyncException = new AtomicBoolean(false); 138 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>(); 139 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>(); 140 private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); 141 private ConnectionContext context; 142 private boolean networkConnection; 143 private boolean faultTolerantConnection; 144 private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 145 private DemandForwardingBridge duplexBridge; 146 private final TaskRunnerFactory taskRunnerFactory; 147 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); 148 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); 149 150 /** 151 * @param connector 152 * @param transport 153 * @param broker 154 * @param taskRunnerFactory 155 * - can be null if you want direct dispatch to the transport 156 * else commands are sent async. 157 */ 158 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, 159 TaskRunnerFactory taskRunnerFactory) { 160 this.connector = connector; 161 this.broker = broker; 162 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); 163 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class); 164 brokerConnectionStates = rb.getConnectionStates(); 165 if (connector != null) { 166 this.statistics.setParent(connector.getStatistics()); 167 } 168 this.taskRunnerFactory = taskRunnerFactory; 169 this.transport = transport; 170 this.transport.setTransportListener(new DefaultTransportListener() { 171 public void onCommand(Object o) { 172 serviceLock.readLock().lock(); 173 try { 174 if (!(o instanceof Command)) { 175 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); 176 } 177 Command command = (Command) o; 178 Response response = service(command); 179 if (response != null) { 180 dispatchSync(response); 181 } 182 } finally { 183 serviceLock.readLock().unlock(); 184 } 185 } 186 187 public void onException(IOException exception) { 188 serviceLock.readLock().lock(); 189 try { 190 serviceTransportException(exception); 191 } finally { 192 serviceLock.readLock().unlock(); 193 } 194 } 195 }); 196 connected = true; 197 } 198 199 /** 200 * Returns the number of messages to be dispatched to this connection 201 * 202 * @return size of dispatch queue 203 */ 204 public int getDispatchQueueSize() { 205 synchronized (dispatchQueue) { 206 return dispatchQueue.size(); 207 } 208 } 209 210 public void serviceTransportException(IOException e) { 211 BrokerService bService = connector.getBrokerService(); 212 if (bService.isShutdownOnSlaveFailure()) { 213 if (brokerInfo != null) { 214 if (brokerInfo.isSlaveBroker()) { 215 LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e); 216 try { 217 doStop(); 218 bService.stop(); 219 } catch (Exception ex) { 220 LOG.warn("Failed to stop the master", ex); 221 } 222 } 223 } 224 } 225 if (!stopping.get()) { 226 transportException.set(e); 227 if (TRANSPORTLOG.isDebugEnabled()) { 228 TRANSPORTLOG.debug("Transport failed: " + e, e); 229 } 230 stopAsync(); 231 } 232 } 233 234 /** 235 * Calls the serviceException method in an async thread. Since handling a 236 * service exception closes a socket, we should not tie up broker threads 237 * since client sockets may hang or cause deadlocks. 238 * 239 * @param e 240 */ 241 public void serviceExceptionAsync(final IOException e) { 242 if (asyncException.compareAndSet(false, true)) { 243 new Thread("Async Exception Handler") { 244 public void run() { 245 serviceException(e); 246 } 247 }.start(); 248 } 249 } 250 251 /** 252 * Closes a clients connection due to a detected error. Errors are ignored 253 * if: the client is closing or broker is closing. Otherwise, the connection 254 * error transmitted to the client before stopping it's transport. 255 */ 256 public void serviceException(Throwable e) { 257 // are we a transport exception such as not being able to dispatch 258 // synchronously to a transport 259 if (e instanceof IOException) { 260 serviceTransportException((IOException) e); 261 } else if (e.getClass() == BrokerStoppedException.class) { 262 // Handle the case where the broker is stopped 263 // But the client is still connected. 264 if (!stopping.get()) { 265 if (SERVICELOG.isDebugEnabled()) { 266 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection."); 267 } 268 ConnectionError ce = new ConnectionError(); 269 ce.setException(e); 270 dispatchSync(ce); 271 // Wait a little bit to try to get the output buffer to flush 272 // the exption notification to the client. 273 try { 274 Thread.sleep(500); 275 } catch (InterruptedException ie) { 276 Thread.currentThread().interrupt(); 277 } 278 // Worst case is we just kill the connection before the 279 // notification gets to him. 280 stopAsync(); 281 } 282 } else if (!stopping.get() && !inServiceException) { 283 inServiceException = true; 284 try { 285 SERVICELOG.warn("Async error occurred: " + e, e); 286 ConnectionError ce = new ConnectionError(); 287 ce.setException(e); 288 dispatchAsync(ce); 289 } finally { 290 inServiceException = false; 291 } 292 } 293 } 294 295 public Response service(Command command) { 296 Response response = null; 297 boolean responseRequired = command.isResponseRequired(); 298 int commandId = command.getCommandId(); 299 try { 300 response = command.visit(this); 301 } catch (Throwable e) { 302 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { 303 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async") 304 + " command: " + command + ", exception: " + e, e); 305 } 306 if (responseRequired) { 307 response = new ExceptionResponse(e); 308 } else { 309 serviceException(e); 310 } 311 } 312 if (responseRequired) { 313 if (response == null) { 314 response = new Response(); 315 } 316 response.setCorrelationId(commandId); 317 } 318 // The context may have been flagged so that the response is not 319 // sent. 320 if (context != null) { 321 if (context.isDontSendReponse()) { 322 context.setDontSendReponse(false); 323 response = null; 324 } 325 context = null; 326 } 327 return response; 328 } 329 330 public Response processKeepAlive(KeepAliveInfo info) throws Exception { 331 return null; 332 } 333 334 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { 335 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info); 336 return null; 337 } 338 339 public Response processWireFormat(WireFormatInfo info) throws Exception { 340 wireFormatInfo = info; 341 protocolVersion.set(info.getVersion()); 342 return null; 343 } 344 345 public Response processShutdown(ShutdownInfo info) throws Exception { 346 stopAsync(); 347 return null; 348 } 349 350 public Response processFlush(FlushCommand command) throws Exception { 351 return null; 352 } 353 354 public Response processBeginTransaction(TransactionInfo info) throws Exception { 355 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 356 context = null; 357 if (cs != null) { 358 context = cs.getContext(); 359 } 360 if (cs == null) { 361 throw new NullPointerException("Context is null"); 362 } 363 // Avoid replaying dup commands 364 if (cs.getTransactionState(info.getTransactionId()) == null) { 365 cs.addTransactionState(info.getTransactionId()); 366 broker.beginTransaction(context, info.getTransactionId()); 367 } 368 return null; 369 } 370 371 public Response processEndTransaction(TransactionInfo info) throws Exception { 372 // No need to do anything. This packet is just sent by the client 373 // make sure he is synced with the server as commit command could 374 // come from a different connection. 375 return null; 376 } 377 378 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 379 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 380 context = null; 381 if (cs != null) { 382 context = cs.getContext(); 383 } 384 if (cs == null) { 385 throw new NullPointerException("Context is null"); 386 } 387 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 388 if (transactionState == null) { 389 throw new IllegalStateException("Cannot prepare a transaction that had not been started: " 390 + info.getTransactionId()); 391 } 392 // Avoid dups. 393 if (!transactionState.isPrepared()) { 394 transactionState.setPrepared(true); 395 int result = broker.prepareTransaction(context, info.getTransactionId()); 396 transactionState.setPreparedResult(result); 397 IntegerResponse response = new IntegerResponse(result); 398 return response; 399 } else { 400 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult()); 401 return response; 402 } 403 } 404 405 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 406 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 407 context = cs.getContext(); 408 cs.removeTransactionState(info.getTransactionId()); 409 broker.commitTransaction(context, info.getTransactionId(), true); 410 return null; 411 } 412 413 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 414 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 415 context = cs.getContext(); 416 cs.removeTransactionState(info.getTransactionId()); 417 broker.commitTransaction(context, info.getTransactionId(), false); 418 return null; 419 } 420 421 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 422 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 423 context = cs.getContext(); 424 cs.removeTransactionState(info.getTransactionId()); 425 broker.rollbackTransaction(context, info.getTransactionId()); 426 return null; 427 } 428 429 public Response processForgetTransaction(TransactionInfo info) throws Exception { 430 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 431 context = cs.getContext(); 432 broker.forgetTransaction(context, info.getTransactionId()); 433 return null; 434 } 435 436 public Response processRecoverTransactions(TransactionInfo info) throws Exception { 437 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 438 context = cs.getContext(); 439 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context); 440 return new DataArrayResponse(preparedTransactions); 441 } 442 443 public Response processMessage(Message messageSend) throws Exception { 444 ProducerId producerId = messageSend.getProducerId(); 445 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 446 broker.send(producerExchange, messageSend); 447 return null; 448 } 449 450 public Response processMessageAck(MessageAck ack) throws Exception { 451 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); 452 broker.acknowledge(consumerExchange, ack); 453 return null; 454 } 455 456 public Response processMessagePull(MessagePull pull) throws Exception { 457 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); 458 } 459 460 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { 461 broker.processDispatchNotification(notification); 462 return null; 463 } 464 465 public Response processAddDestination(DestinationInfo info) throws Exception { 466 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 467 broker.addDestinationInfo(cs.getContext(), info); 468 if (info.getDestination().isTemporary()) { 469 cs.addTempDestination(info); 470 } 471 return null; 472 } 473 474 public Response processRemoveDestination(DestinationInfo info) throws Exception { 475 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 476 broker.removeDestinationInfo(cs.getContext(), info); 477 if (info.getDestination().isTemporary()) { 478 cs.removeTempDestination(info.getDestination()); 479 } 480 return null; 481 } 482 483 public Response processAddProducer(ProducerInfo info) throws Exception { 484 SessionId sessionId = info.getProducerId().getParentId(); 485 ConnectionId connectionId = sessionId.getParentId(); 486 TransportConnectionState cs = lookupConnectionState(connectionId); 487 SessionState ss = cs.getSessionState(sessionId); 488 if (ss == null) { 489 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " 490 + sessionId); 491 } 492 // Avoid replaying dup commands 493 if (!ss.getProducerIds().contains(info.getProducerId())) { 494 broker.addProducer(cs.getContext(), info); 495 try { 496 ss.addProducer(info); 497 } catch (IllegalStateException e) { 498 broker.removeProducer(cs.getContext(), info); 499 } 500 } 501 return null; 502 } 503 504 public Response processRemoveProducer(ProducerId id) throws Exception { 505 SessionId sessionId = id.getParentId(); 506 ConnectionId connectionId = sessionId.getParentId(); 507 TransportConnectionState cs = lookupConnectionState(connectionId); 508 SessionState ss = cs.getSessionState(sessionId); 509 if (ss == null) { 510 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " 511 + sessionId); 512 } 513 ProducerState ps = ss.removeProducer(id); 514 if (ps == null) { 515 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id); 516 } 517 removeProducerBrokerExchange(id); 518 broker.removeProducer(cs.getContext(), ps.getInfo()); 519 return null; 520 } 521 522 public Response processAddConsumer(ConsumerInfo info) throws Exception { 523 SessionId sessionId = info.getConsumerId().getParentId(); 524 ConnectionId connectionId = sessionId.getParentId(); 525 TransportConnectionState cs = lookupConnectionState(connectionId); 526 SessionState ss = cs.getSessionState(sessionId); 527 if (ss == null) { 528 throw new IllegalStateException(broker.getBrokerName() 529 + " Cannot add a consumer to a session that had not been registered: " + sessionId); 530 } 531 // Avoid replaying dup commands 532 if (!ss.getConsumerIds().contains(info.getConsumerId())) { 533 broker.addConsumer(cs.getContext(), info); 534 try { 535 ss.addConsumer(info); 536 } catch (IllegalStateException e) { 537 broker.removeConsumer(cs.getContext(), info); 538 } 539 } 540 return null; 541 } 542 543 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { 544 SessionId sessionId = id.getParentId(); 545 ConnectionId connectionId = sessionId.getParentId(); 546 TransportConnectionState cs = lookupConnectionState(connectionId); 547 if (cs == null) { 548 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: " 549 + connectionId); 550 } 551 SessionState ss = cs.getSessionState(sessionId); 552 if (ss == null) { 553 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " 554 + sessionId); 555 } 556 ConsumerState consumerState = ss.removeConsumer(id); 557 if (consumerState == null) { 558 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); 559 } 560 ConsumerInfo info = consumerState.getInfo(); 561 info.setLastDeliveredSequenceId(lastDeliveredSequenceId); 562 broker.removeConsumer(cs.getContext(), consumerState.getInfo()); 563 removeConsumerBrokerExchange(id); 564 return null; 565 } 566 567 public Response processAddSession(SessionInfo info) throws Exception { 568 ConnectionId connectionId = info.getSessionId().getParentId(); 569 TransportConnectionState cs = lookupConnectionState(connectionId); 570 // Avoid replaying dup commands 571 if (!cs.getSessionIds().contains(info.getSessionId())) { 572 broker.addSession(cs.getContext(), info); 573 try { 574 cs.addSession(info); 575 } catch (IllegalStateException e) { 576 e.printStackTrace(); 577 broker.removeSession(cs.getContext(), info); 578 } 579 } 580 return null; 581 } 582 583 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { 584 ConnectionId connectionId = id.getParentId(); 585 TransportConnectionState cs = lookupConnectionState(connectionId); 586 if (cs == null) { 587 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId); 588 } 589 SessionState session = cs.getSessionState(id); 590 if (session == null) { 591 throw new IllegalStateException("Cannot remove session that had not been registered: " + id); 592 } 593 // Don't let new consumers or producers get added while we are closing 594 // this down. 595 session.shutdown(); 596 // Cascade the connection stop to the consumers and producers. 597 for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) { 598 ConsumerId consumerId = (ConsumerId) iter.next(); 599 try { 600 processRemoveConsumer(consumerId, lastDeliveredSequenceId); 601 } catch (Throwable e) { 602 LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e); 603 } 604 } 605 for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) { 606 ProducerId producerId = (ProducerId) iter.next(); 607 try { 608 processRemoveProducer(producerId); 609 } catch (Throwable e) { 610 LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e); 611 } 612 } 613 cs.removeSession(id); 614 broker.removeSession(cs.getContext(), session.getInfo()); 615 return null; 616 } 617 618 public Response processAddConnection(ConnectionInfo info) throws Exception { 619 // if the broker service has slave attached, wait for the slave to be 620 // attached to allow client connection. slave connection is fine 621 if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave() 622 && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) { 623 ServiceSupport.dispose(transport); 624 return new ExceptionResponse(new Exception("Master's slave not attached yet.")); 625 } 626 // Older clients should have been defaulting this field to true.. but 627 // they were not. 628 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) { 629 info.setClientMaster(true); 630 } 631 TransportConnectionState state; 632 // Make sure 2 concurrent connections by the same ID only generate 1 633 // TransportConnectionState object. 634 synchronized (brokerConnectionStates) { 635 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId()); 636 if (state == null) { 637 state = new TransportConnectionState(info, this); 638 brokerConnectionStates.put(info.getConnectionId(), state); 639 } 640 state.incrementReference(); 641 } 642 // If there are 2 concurrent connections for the same connection id, 643 // then last one in wins, we need to sync here 644 // to figure out the winner. 645 synchronized (state.getConnectionMutex()) { 646 if (state.getConnection() != this) { 647 LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress()); 648 state.getConnection().stop(); 649 LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: " 650 + state.getConnection().getRemoteAddress()); 651 state.setConnection(this); 652 state.reset(info); 653 } 654 } 655 registerConnectionState(info.getConnectionId(), state); 656 LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress()); 657 // Setup the context. 658 String clientId = info.getClientId(); 659 context = new ConnectionContext(); 660 context.setBroker(broker); 661 context.setClientId(clientId); 662 context.setClientMaster(info.isClientMaster()); 663 context.setConnection(this); 664 context.setConnectionId(info.getConnectionId()); 665 context.setConnector(connector); 666 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 667 context.setNetworkConnection(networkConnection); 668 context.setFaultTolerant(faultTolerantConnection); 669 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 670 context.setUserName(info.getUserName()); 671 context.setWireFormatInfo(wireFormatInfo); 672 this.manageable = info.isManageable(); 673 state.setContext(context); 674 state.setConnection(this); 675 try { 676 broker.addConnection(context, info); 677 } catch (Exception e) { 678 brokerConnectionStates.remove(info); 679 LOG.warn("Failed to add Connection", e); 680 throw e; 681 } 682 if (info.isManageable() && broker.isFaultTolerantConfiguration()) { 683 // send ConnectionCommand 684 ConnectionControl command = new ConnectionControl(); 685 command.setFaultTolerant(broker.isFaultTolerantConfiguration()); 686 dispatchAsync(command); 687 } 688 return null; 689 } 690 691 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) 692 throws InterruptedException { 693 LOG.debug("remove connection id: " + id); 694 TransportConnectionState cs = lookupConnectionState(id); 695 if (cs != null) { 696 // Don't allow things to be added to the connection state while we 697 // are 698 // shutting down. 699 cs.shutdown(); 700 // Cascade the connection stop to the sessions. 701 for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) { 702 SessionId sessionId = (SessionId) iter.next(); 703 try { 704 processRemoveSession(sessionId, lastDeliveredSequenceId); 705 } catch (Throwable e) { 706 SERVICELOG.warn("Failed to remove session " + sessionId, e); 707 } 708 } 709 // Cascade the connection stop to temp destinations. 710 for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) { 711 DestinationInfo di = (DestinationInfo) iter.next(); 712 try { 713 broker.removeDestination(cs.getContext(), di.getDestination(), 0); 714 } catch (Throwable e) { 715 SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e); 716 } 717 iter.remove(); 718 } 719 try { 720 broker.removeConnection(cs.getContext(), cs.getInfo(), null); 721 } catch (Throwable e) { 722 SERVICELOG.warn("Failed to remove connection " + cs.getInfo(), e); 723 } 724 TransportConnectionState state = unregisterConnectionState(id); 725 if (state != null) { 726 synchronized (brokerConnectionStates) { 727 // If we are the last reference, we should remove the state 728 // from the broker. 729 if (state.decrementReference() == 0) { 730 brokerConnectionStates.remove(id); 731 } 732 } 733 } 734 } 735 return null; 736 } 737 738 public Response processProducerAck(ProducerAck ack) throws Exception { 739 // A broker should not get ProducerAck messages. 740 return null; 741 } 742 743 public Connector getConnector() { 744 return connector; 745 } 746 747 public void dispatchSync(Command message) { 748 // getStatistics().getEnqueues().increment(); 749 try { 750 processDispatch(message); 751 } catch (IOException e) { 752 serviceExceptionAsync(e); 753 } 754 } 755 756 public void dispatchAsync(Command message) { 757 if (!stopping.get()) { 758 // getStatistics().getEnqueues().increment(); 759 if (taskRunner == null) { 760 dispatchSync(message); 761 } else { 762 synchronized (dispatchQueue) { 763 dispatchQueue.add(message); 764 } 765 try { 766 taskRunner.wakeup(); 767 } catch (InterruptedException e) { 768 Thread.currentThread().interrupt(); 769 } 770 } 771 } else { 772 if (message.isMessageDispatch()) { 773 MessageDispatch md = (MessageDispatch) message; 774 Runnable sub = md.getTransmitCallback(); 775 broker.postProcessDispatch(md); 776 if (sub != null) { 777 sub.run(); 778 } 779 } 780 } 781 } 782 783 protected void processDispatch(Command command) throws IOException { 784 final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); 785 try { 786 if (!stopping.get()) { 787 if (messageDispatch != null) { 788 broker.preProcessDispatch(messageDispatch); 789 } 790 dispatch(command); 791 } 792 } finally { 793 if (messageDispatch != null) { 794 Runnable sub = messageDispatch.getTransmitCallback(); 795 broker.postProcessDispatch(messageDispatch); 796 if (sub != null) { 797 sub.run(); 798 } 799 } 800 // getStatistics().getDequeues().increment(); 801 } 802 } 803 804 public boolean iterate() { 805 try { 806 if (stopping.get()) { 807 if (dispatchStopped.compareAndSet(false, true)) { 808 if (transportException.get() == null) { 809 try { 810 dispatch(new ShutdownInfo()); 811 } catch (Throwable ignore) { 812 } 813 } 814 dispatchStoppedLatch.countDown(); 815 } 816 return false; 817 } 818 if (!dispatchStopped.get()) { 819 Command command = null; 820 synchronized (dispatchQueue) { 821 if (dispatchQueue.isEmpty()) { 822 return false; 823 } 824 command = dispatchQueue.remove(0); 825 } 826 processDispatch(command); 827 return true; 828 } 829 return false; 830 } catch (IOException e) { 831 if (dispatchStopped.compareAndSet(false, true)) { 832 dispatchStoppedLatch.countDown(); 833 } 834 serviceExceptionAsync(e); 835 return false; 836 } 837 } 838 839 /** 840 * Returns the statistics for this connection 841 */ 842 public ConnectionStatistics getStatistics() { 843 return statistics; 844 } 845 846 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 847 return messageAuthorizationPolicy; 848 } 849 850 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 851 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 852 } 853 854 public boolean isManageable() { 855 return manageable; 856 } 857 858 public void start() throws Exception { 859 starting = true; 860 try { 861 synchronized (this) { 862 if (taskRunnerFactory != null) { 863 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " 864 + getRemoteAddress()); 865 } else { 866 taskRunner = null; 867 } 868 transport.start(); 869 active = true; 870 dispatchAsync(connector.getBrokerInfo()); 871 connector.onStarted(this); 872 } 873 } catch (Exception e) { 874 // Force clean up on an error starting up. 875 stop(); 876 throw e; 877 } finally { 878 // stop() can be called from within the above block, 879 // but we want to be sure start() completes before 880 // stop() runs, so queue the stop until right now: 881 starting = false; 882 if (pendingStop) { 883 LOG.debug("Calling the delayed stop()"); 884 stop(); 885 } 886 } 887 } 888 889 public void stop() throws Exception { 890 synchronized (this) { 891 pendingStop = true; 892 if (starting) { 893 LOG.debug("stop() called in the middle of start(). Delaying..."); 894 return; 895 } 896 } 897 stopAsync(); 898 while (!stopped.await(5, TimeUnit.SECONDS)) { 899 LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown."); 900 } 901 } 902 903 public void stopAsync() { 904 // If we're in the middle of starting 905 // then go no further... for now. 906 if (stopping.compareAndSet(false, true)) { 907 // Let all the connection contexts know we are shutting down 908 // so that in progress operations can notice and unblock. 909 List<TransportConnectionState> connectionStates = listConnectionStates(); 910 for (TransportConnectionState cs : connectionStates) { 911 cs.getContext().getStopping().set(true); 912 } 913 try { 914 getDefaultTaskRunnerFactory().execute(new Runnable(){ 915 public void run() { 916 serviceLock.writeLock().lock(); 917 try { 918 doStop(); 919 } catch (Throwable e) { 920 LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress() 921 + "': ", e); 922 } finally { 923 stopped.countDown(); 924 serviceLock.writeLock().unlock(); 925 } 926 } 927 }); 928 } catch (Throwable t) { 929 LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t); 930 stopped.countDown(); 931 } 932 } 933 } 934 935 @Override 936 public String toString() { 937 return "Transport Connection to: " + transport.getRemoteAddress(); 938 } 939 940 protected void doStop() throws Exception, InterruptedException { 941 LOG.debug("Stopping connection: " + transport.getRemoteAddress()); 942 connector.onStopped(this); 943 try { 944 synchronized (this) { 945 if (masterBroker != null) { 946 masterBroker.stop(); 947 } 948 if (duplexBridge != null) { 949 duplexBridge.stop(); 950 } 951 } 952 } catch (Exception ignore) { 953 LOG.trace("Exception caught stopping", ignore); 954 } 955 try { 956 transport.stop(); 957 LOG.debug("Stopped transport: " + transport.getRemoteAddress()); 958 } catch (Exception e) { 959 LOG.debug("Could not stop transport: " + e, e); 960 } 961 if (taskRunner != null) { 962 taskRunner.shutdown(1); 963 } 964 active = false; 965 // Run the MessageDispatch callbacks so that message references get 966 // cleaned up. 967 synchronized (dispatchQueue) { 968 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) { 969 Command command = iter.next(); 970 if (command.isMessageDispatch()) { 971 MessageDispatch md = (MessageDispatch) command; 972 Runnable sub = md.getTransmitCallback(); 973 broker.postProcessDispatch(md); 974 if (sub != null) { 975 sub.run(); 976 } 977 } 978 } 979 dispatchQueue.clear(); 980 } 981 // 982 // Remove all logical connection associated with this connection 983 // from the broker. 984 if (!broker.isStopped()) { 985 List<TransportConnectionState> connectionStates = listConnectionStates(); 986 connectionStates = listConnectionStates(); 987 for (TransportConnectionState cs : connectionStates) { 988 cs.getContext().getStopping().set(true); 989 try { 990 LOG.debug("Cleaning up connection resources: " + getRemoteAddress()); 991 processRemoveConnection(cs.getInfo().getConnectionId(), 0l); 992 } catch (Throwable ignore) { 993 ignore.printStackTrace(); 994 } 995 } 996 if (brokerInfo != null) { 997 broker.removeBroker(this, brokerInfo); 998 } 999 } 1000 LOG.debug("Connection Stopped: " + getRemoteAddress()); 1001 } 1002 1003 /** 1004 * @return Returns the blockedCandidate. 1005 */ 1006 public boolean isBlockedCandidate() { 1007 return blockedCandidate; 1008 } 1009 1010 /** 1011 * @param blockedCandidate 1012 * The blockedCandidate to set. 1013 */ 1014 public void setBlockedCandidate(boolean blockedCandidate) { 1015 this.blockedCandidate = blockedCandidate; 1016 } 1017 1018 /** 1019 * @return Returns the markedCandidate. 1020 */ 1021 public boolean isMarkedCandidate() { 1022 return markedCandidate; 1023 } 1024 1025 /** 1026 * @param markedCandidate 1027 * The markedCandidate to set. 1028 */ 1029 public void setMarkedCandidate(boolean markedCandidate) { 1030 this.markedCandidate = markedCandidate; 1031 if (!markedCandidate) { 1032 timeStamp = 0; 1033 blockedCandidate = false; 1034 } 1035 } 1036 1037 /** 1038 * @param slow 1039 * The slow to set. 1040 */ 1041 public void setSlow(boolean slow) { 1042 this.slow = slow; 1043 } 1044 1045 /** 1046 * @return true if the Connection is slow 1047 */ 1048 public boolean isSlow() { 1049 return slow; 1050 } 1051 1052 /** 1053 * @return true if the Connection is potentially blocked 1054 */ 1055 public boolean isMarkedBlockedCandidate() { 1056 return markedCandidate; 1057 } 1058 1059 /** 1060 * Mark the Connection, so we can deem if it's collectable on the next sweep 1061 */ 1062 public void doMark() { 1063 if (timeStamp == 0) { 1064 timeStamp = System.currentTimeMillis(); 1065 } 1066 } 1067 1068 /** 1069 * @return if after being marked, the Connection is still writing 1070 */ 1071 public boolean isBlocked() { 1072 return blocked; 1073 } 1074 1075 /** 1076 * @return true if the Connection is connected 1077 */ 1078 public boolean isConnected() { 1079 return connected; 1080 } 1081 1082 /** 1083 * @param blocked 1084 * The blocked to set. 1085 */ 1086 public void setBlocked(boolean blocked) { 1087 this.blocked = blocked; 1088 } 1089 1090 /** 1091 * @param connected 1092 * The connected to set. 1093 */ 1094 public void setConnected(boolean connected) { 1095 this.connected = connected; 1096 } 1097 1098 /** 1099 * @return true if the Connection is active 1100 */ 1101 public boolean isActive() { 1102 return active; 1103 } 1104 1105 /** 1106 * @param active 1107 * The active to set. 1108 */ 1109 public void setActive(boolean active) { 1110 this.active = active; 1111 } 1112 1113 /** 1114 * @return true if the Connection is starting 1115 */ 1116 public synchronized boolean isStarting() { 1117 return starting; 1118 } 1119 1120 public synchronized boolean isNetworkConnection() { 1121 return networkConnection; 1122 } 1123 1124 protected synchronized void setStarting(boolean starting) { 1125 this.starting = starting; 1126 } 1127 1128 /** 1129 * @return true if the Connection needs to stop 1130 */ 1131 public synchronized boolean isPendingStop() { 1132 return pendingStop; 1133 } 1134 1135 protected synchronized void setPendingStop(boolean pendingStop) { 1136 this.pendingStop = pendingStop; 1137 } 1138 1139 public Response processBrokerInfo(BrokerInfo info) { 1140 if (info.isSlaveBroker()) { 1141 BrokerService bService = connector.getBrokerService(); 1142 // Do we only support passive slaves - or does the slave want to be 1143 // passive ? 1144 boolean passive = bService.isPassiveSlave() || info.isPassiveSlave(); 1145 if (passive == false) { 1146 1147 // stream messages from this broker (the master) to 1148 // the slave 1149 MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class); 1150 masterBroker = new MasterBroker(parent, transport); 1151 masterBroker.startProcessing(); 1152 } 1153 LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached"); 1154 bService.slaveConnectionEstablished(); 1155 } else if (info.isNetworkConnection() && info.isDuplexConnection()) { 1156 // so this TransportConnection is the rear end of a network bridge 1157 // We have been requested to create a two way pipe ... 1158 try { 1159 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); 1160 Map<String, String> props = createMap(properties); 1161 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 1162 IntrospectionSupport.setProperties(config, props, ""); 1163 config.setBrokerName(broker.getBrokerName()); 1164 URI uri = broker.getVmConnectorURI(); 1165 HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParamters(uri)); 1166 map.put("network", "true"); 1167 map.put("async", "false"); 1168 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 1169 Transport localTransport = TransportFactory.connect(uri); 1170 Transport remoteBridgeTransport = new ResponseCorrelator(transport); 1171 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport); 1172 duplexBridge.setBrokerService(broker.getBrokerService()); 1173 // now turn duplex off this side 1174 info.setDuplexConnection(false); 1175 duplexBridge.setCreatedByDuplex(true); 1176 duplexBridge.duplexStart(this, brokerInfo, info); 1177 LOG.info("Created Duplex Bridge back to " + info.getBrokerName()); 1178 return null; 1179 } catch (Exception e) { 1180 LOG.error("Creating duplex network bridge", e); 1181 } 1182 } 1183 // We only expect to get one broker info command per connection 1184 if (this.brokerInfo != null) { 1185 LOG.warn("Unexpected extra broker info command received: " + info); 1186 } 1187 this.brokerInfo = info; 1188 broker.addBroker(this, info); 1189 networkConnection = true; 1190 List<TransportConnectionState> connectionStates = listConnectionStates(); 1191 for (TransportConnectionState cs : connectionStates) { 1192 cs.getContext().setNetworkConnection(true); 1193 } 1194 return null; 1195 } 1196 1197 @SuppressWarnings("unchecked") 1198 private HashMap<String, String> createMap(Properties properties) { 1199 return new HashMap(properties); 1200 } 1201 1202 protected void dispatch(Command command) throws IOException { 1203 try { 1204 setMarkedCandidate(true); 1205 transport.oneway(command); 1206 } finally { 1207 setMarkedCandidate(false); 1208 } 1209 } 1210 1211 public String getRemoteAddress() { 1212 return transport.getRemoteAddress(); 1213 } 1214 1215 public String getConnectionId() { 1216 List<TransportConnectionState> connectionStates = listConnectionStates(); 1217 for (TransportConnectionState cs : connectionStates) { 1218 if (cs.getInfo().getClientId() != null) { 1219 return cs.getInfo().getClientId(); 1220 } 1221 return cs.getInfo().getConnectionId().toString(); 1222 } 1223 return null; 1224 } 1225 1226 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) { 1227 ProducerBrokerExchange result = producerExchanges.get(id); 1228 if (result == null) { 1229 synchronized (producerExchanges) { 1230 result = new ProducerBrokerExchange(); 1231 TransportConnectionState state = lookupConnectionState(id); 1232 context = state.getContext(); 1233 result.setConnectionContext(context); 1234 SessionState ss = state.getSessionState(id.getParentId()); 1235 if (ss != null) { 1236 result.setProducerState(ss.getProducerState(id)); 1237 ProducerState producerState = ss.getProducerState(id); 1238 if (producerState != null && producerState.getInfo() != null) { 1239 ProducerInfo info = producerState.getInfo(); 1240 result.setMutable(info.getDestination() == null || info.getDestination().isComposite()); 1241 } 1242 } 1243 producerExchanges.put(id, result); 1244 } 1245 } else { 1246 context = result.getConnectionContext(); 1247 } 1248 return result; 1249 } 1250 1251 private void removeProducerBrokerExchange(ProducerId id) { 1252 synchronized (producerExchanges) { 1253 producerExchanges.remove(id); 1254 } 1255 } 1256 1257 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) { 1258 ConsumerBrokerExchange result = consumerExchanges.get(id); 1259 if (result == null) { 1260 synchronized (consumerExchanges) { 1261 result = new ConsumerBrokerExchange(); 1262 TransportConnectionState state = lookupConnectionState(id); 1263 context = state.getContext(); 1264 result.setConnectionContext(context); 1265 SessionState ss = state.getSessionState(id.getParentId()); 1266 if (ss != null) { 1267 ConsumerState cs = ss.getConsumerState(id); 1268 if (cs != null) { 1269 ConsumerInfo info = cs.getInfo(); 1270 if (info != null) { 1271 if (info.getDestination() != null && info.getDestination().isPattern()) { 1272 result.setWildcard(true); 1273 } 1274 } 1275 } 1276 } 1277 consumerExchanges.put(id, result); 1278 } 1279 } 1280 return result; 1281 } 1282 1283 private void removeConsumerBrokerExchange(ConsumerId id) { 1284 synchronized (consumerExchanges) { 1285 consumerExchanges.remove(id); 1286 } 1287 } 1288 1289 public int getProtocolVersion() { 1290 return protocolVersion.get(); 1291 } 1292 1293 public Response processControlCommand(ControlCommand command) throws Exception { 1294 String control = command.getCommand(); 1295 if (control != null && control.equals("shutdown")) { 1296 System.exit(0); 1297 } 1298 return null; 1299 } 1300 1301 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { 1302 return null; 1303 } 1304 1305 public Response processConnectionControl(ConnectionControl control) throws Exception { 1306 if (control != null) { 1307 faultTolerantConnection = control.isFaultTolerant(); 1308 } 1309 return null; 1310 } 1311 1312 public Response processConnectionError(ConnectionError error) throws Exception { 1313 return null; 1314 } 1315 1316 public Response processConsumerControl(ConsumerControl control) throws Exception { 1317 return null; 1318 } 1319 1320 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId, 1321 TransportConnectionState state) { 1322 TransportConnectionState cs = null; 1323 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) { 1324 // swap implementations 1325 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); 1326 newRegister.intialize(connectionStateRegister); 1327 connectionStateRegister = newRegister; 1328 } 1329 cs = connectionStateRegister.registerConnectionState(connectionId, state); 1330 return cs; 1331 } 1332 1333 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { 1334 return connectionStateRegister.unregisterConnectionState(connectionId); 1335 } 1336 1337 protected synchronized List<TransportConnectionState> listConnectionStates() { 1338 return connectionStateRegister.listConnectionStates(); 1339 } 1340 1341 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) { 1342 return connectionStateRegister.lookupConnectionState(connectionId); 1343 } 1344 1345 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { 1346 return connectionStateRegister.lookupConnectionState(id); 1347 } 1348 1349 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) { 1350 return connectionStateRegister.lookupConnectionState(id); 1351 } 1352 1353 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) { 1354 return connectionStateRegister.lookupConnectionState(id); 1355 } 1356 1357 protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { 1358 return connectionStateRegister.lookupConnectionState(connectionId); 1359 } 1360 }