001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.LinkedList;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Properties;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CountDownLatch;
029    import java.util.concurrent.TimeUnit;
030    import java.util.concurrent.atomic.AtomicBoolean;
031    import java.util.concurrent.atomic.AtomicInteger;
032    import java.util.concurrent.atomic.AtomicReference;
033    import java.util.concurrent.locks.ReentrantReadWriteLock;
034    import org.apache.activemq.broker.ft.MasterBroker;
035    import org.apache.activemq.broker.region.ConnectionStatistics;
036    import org.apache.activemq.broker.region.RegionBroker;
037    import org.apache.activemq.command.BrokerInfo;
038    import org.apache.activemq.command.Command;
039    import org.apache.activemq.command.CommandTypes;
040    import org.apache.activemq.command.ConnectionControl;
041    import org.apache.activemq.command.ConnectionError;
042    import org.apache.activemq.command.ConnectionId;
043    import org.apache.activemq.command.ConnectionInfo;
044    import org.apache.activemq.command.ConsumerControl;
045    import org.apache.activemq.command.ConsumerId;
046    import org.apache.activemq.command.ConsumerInfo;
047    import org.apache.activemq.command.ControlCommand;
048    import org.apache.activemq.command.DataArrayResponse;
049    import org.apache.activemq.command.DestinationInfo;
050    import org.apache.activemq.command.ExceptionResponse;
051    import org.apache.activemq.command.FlushCommand;
052    import org.apache.activemq.command.IntegerResponse;
053    import org.apache.activemq.command.KeepAliveInfo;
054    import org.apache.activemq.command.Message;
055    import org.apache.activemq.command.MessageAck;
056    import org.apache.activemq.command.MessageDispatch;
057    import org.apache.activemq.command.MessageDispatchNotification;
058    import org.apache.activemq.command.MessagePull;
059    import org.apache.activemq.command.ProducerAck;
060    import org.apache.activemq.command.ProducerId;
061    import org.apache.activemq.command.ProducerInfo;
062    import org.apache.activemq.command.RemoveSubscriptionInfo;
063    import org.apache.activemq.command.Response;
064    import org.apache.activemq.command.SessionId;
065    import org.apache.activemq.command.SessionInfo;
066    import org.apache.activemq.command.ShutdownInfo;
067    import org.apache.activemq.command.TransactionId;
068    import org.apache.activemq.command.TransactionInfo;
069    import org.apache.activemq.command.WireFormatInfo;
070    import org.apache.activemq.network.DemandForwardingBridge;
071    import org.apache.activemq.network.NetworkBridgeConfiguration;
072    import org.apache.activemq.network.NetworkBridgeFactory;
073    import org.apache.activemq.security.MessageAuthorizationPolicy;
074    import org.apache.activemq.state.CommandVisitor;
075    import org.apache.activemq.state.ConnectionState;
076    import org.apache.activemq.state.ConsumerState;
077    import org.apache.activemq.state.ProducerState;
078    import org.apache.activemq.state.SessionState;
079    import org.apache.activemq.state.TransactionState;
080    import org.apache.activemq.thread.DefaultThreadPools;
081    import org.apache.activemq.thread.Task;
082    import org.apache.activemq.thread.TaskRunner;
083    import org.apache.activemq.thread.TaskRunnerFactory;
084    import org.apache.activemq.transaction.Transaction;
085    import org.apache.activemq.transport.DefaultTransportListener;
086    import org.apache.activemq.transport.ResponseCorrelator;
087    import org.apache.activemq.transport.Transport;
088    import org.apache.activemq.transport.TransportFactory;
089    import org.apache.activemq.util.IntrospectionSupport;
090    import org.apache.activemq.util.MarshallingSupport;
091    import org.apache.activemq.util.ServiceSupport;
092    import org.apache.activemq.util.URISupport;
093    import org.apache.commons.logging.Log;
094    import org.apache.commons.logging.LogFactory;
095    
096    import static org.apache.activemq.thread.DefaultThreadPools.*;
097    /**
098     * @version $Revision: 1.8 $
099     */
100    public class TransportConnection implements Connection, Task, CommandVisitor {
101        private static final Log LOG = LogFactory.getLog(TransportConnection.class);
102        private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName() + ".Transport");
103        private static final Log SERVICELOG = LogFactory.getLog(TransportConnection.class.getName() + ".Service");
104        // Keeps track of the broker and connector that created this connection.
105        protected final Broker broker;
106        protected final TransportConnector connector;
107        // Keeps track of the state of the connections.
108        // protected final ConcurrentHashMap localConnectionStates=new
109        // ConcurrentHashMap();
110        protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
111        // The broker and wireformat info that was exchanged.
112        protected BrokerInfo brokerInfo;
113        protected final List<Command> dispatchQueue = new LinkedList<Command>();
114        protected TaskRunner taskRunner;
115        protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
116        protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
117        private MasterBroker masterBroker;
118        private final Transport transport;
119        private MessageAuthorizationPolicy messageAuthorizationPolicy;
120        private WireFormatInfo wireFormatInfo;
121        // Used to do async dispatch.. this should perhaps be pushed down into the
122        // transport layer..
123        private boolean inServiceException;
124        private ConnectionStatistics statistics = new ConnectionStatistics();
125        private boolean manageable;
126        private boolean slow;
127        private boolean markedCandidate;
128        private boolean blockedCandidate;
129        private boolean blocked;
130        private boolean connected;
131        private boolean active;
132        private boolean starting;
133        private boolean pendingStop;
134        private long timeStamp;
135        private final AtomicBoolean stopping = new AtomicBoolean(false);
136        private CountDownLatch stopped = new CountDownLatch(1);
137        private final AtomicBoolean asyncException = new AtomicBoolean(false);
138        private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
139        private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
140        private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
141        private ConnectionContext context;
142        private boolean networkConnection;
143        private boolean faultTolerantConnection;
144        private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
145        private DemandForwardingBridge duplexBridge;
146        private final TaskRunnerFactory taskRunnerFactory;
147        private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
148        private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
149    
150        /**
151         * @param connector
152         * @param transport
153         * @param broker
154         * @param taskRunnerFactory
155         *            - can be null if you want direct dispatch to the transport
156         *            else commands are sent async.
157         */
158        public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
159                TaskRunnerFactory taskRunnerFactory) {
160            this.connector = connector;
161            this.broker = broker;
162            this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
163            RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
164            brokerConnectionStates = rb.getConnectionStates();
165            if (connector != null) {
166                this.statistics.setParent(connector.getStatistics());
167            }
168            this.taskRunnerFactory = taskRunnerFactory;
169            this.transport = transport;
170            this.transport.setTransportListener(new DefaultTransportListener() {
171                public void onCommand(Object o) {
172                    serviceLock.readLock().lock();
173                    try {
174                        if (!(o instanceof Command)) {
175                            throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
176                        }
177                        Command command = (Command) o;
178                        Response response = service(command);
179                        if (response != null) {
180                            dispatchSync(response);
181                        }
182                    } finally {
183                        serviceLock.readLock().unlock();
184                    }
185                }
186    
187                public void onException(IOException exception) {
188                    serviceLock.readLock().lock();
189                    try {
190                        serviceTransportException(exception);
191                    } finally {
192                        serviceLock.readLock().unlock();
193                    }
194                }
195            });
196            connected = true;
197        }
198    
199        /**
200         * Returns the number of messages to be dispatched to this connection
201         * 
202         * @return size of dispatch queue
203         */
204        public int getDispatchQueueSize() {
205            synchronized (dispatchQueue) {
206                return dispatchQueue.size();
207            }
208        }
209    
210        public void serviceTransportException(IOException e) {
211            BrokerService bService = connector.getBrokerService();
212            if (bService.isShutdownOnSlaveFailure()) {
213                if (brokerInfo != null) {
214                    if (brokerInfo.isSlaveBroker()) {
215                        LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
216                        try {
217                            doStop();
218                            bService.stop();
219                        } catch (Exception ex) {
220                            LOG.warn("Failed to stop the master", ex);
221                        }
222                    }
223                }
224            }
225            if (!stopping.get()) {
226                transportException.set(e);
227                if (TRANSPORTLOG.isDebugEnabled()) {
228                    TRANSPORTLOG.debug("Transport failed: " + e, e);
229                }
230                stopAsync();
231            }
232        }
233    
234        /**
235         * Calls the serviceException method in an async thread. Since handling a
236         * service exception closes a socket, we should not tie up broker threads
237         * since client sockets may hang or cause deadlocks.
238         * 
239         * @param e
240         */
241        public void serviceExceptionAsync(final IOException e) {
242            if (asyncException.compareAndSet(false, true)) {
243                new Thread("Async Exception Handler") {
244                    public void run() {
245                        serviceException(e);
246                    }
247                }.start();
248            }
249        }
250    
251        /**
252         * Closes a clients connection due to a detected error. Errors are ignored
253         * if: the client is closing or broker is closing. Otherwise, the connection
254         * error transmitted to the client before stopping it's transport.
255         */
256        public void serviceException(Throwable e) {
257            // are we a transport exception such as not being able to dispatch
258            // synchronously to a transport
259            if (e instanceof IOException) {
260                serviceTransportException((IOException) e);
261            } else if (e.getClass() == BrokerStoppedException.class) {
262                // Handle the case where the broker is stopped
263                // But the client is still connected.
264                if (!stopping.get()) {
265                    if (SERVICELOG.isDebugEnabled()) {
266                        SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
267                    }
268                    ConnectionError ce = new ConnectionError();
269                    ce.setException(e);
270                    dispatchSync(ce);
271                    // Wait a little bit to try to get the output buffer to flush
272                    // the exption notification to the client.
273                    try {
274                        Thread.sleep(500);
275                    } catch (InterruptedException ie) {
276                        Thread.currentThread().interrupt();
277                    }
278                    // Worst case is we just kill the connection before the
279                    // notification gets to him.
280                    stopAsync();
281                }
282            } else if (!stopping.get() && !inServiceException) {
283                inServiceException = true;
284                try {
285                    SERVICELOG.warn("Async error occurred: " + e, e);
286                    ConnectionError ce = new ConnectionError();
287                    ce.setException(e);
288                    dispatchAsync(ce);
289                } finally {
290                    inServiceException = false;
291                }
292            }
293        }
294    
295        public Response service(Command command) {
296            Response response = null;
297            boolean responseRequired = command.isResponseRequired();
298            int commandId = command.getCommandId();
299            try {
300                response = command.visit(this);
301            } catch (Throwable e) {
302                if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
303                    SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
304                            + " command: " + command + ", exception: " + e, e);
305                }
306                if (responseRequired) {
307                    response = new ExceptionResponse(e);
308                } else {
309                    serviceException(e);
310                }
311            }
312            if (responseRequired) {
313                if (response == null) {
314                    response = new Response();
315                }
316                response.setCorrelationId(commandId);
317            }
318            // The context may have been flagged so that the response is not
319            // sent.
320            if (context != null) {
321                if (context.isDontSendReponse()) {
322                    context.setDontSendReponse(false);
323                    response = null;
324                }
325                context = null;
326            }
327            return response;
328        }
329    
330        public Response processKeepAlive(KeepAliveInfo info) throws Exception {
331            return null;
332        }
333    
334        public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
335            broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
336            return null;
337        }
338    
339        public Response processWireFormat(WireFormatInfo info) throws Exception {
340            wireFormatInfo = info;
341            protocolVersion.set(info.getVersion());
342            return null;
343        }
344    
345        public Response processShutdown(ShutdownInfo info) throws Exception {
346            stopAsync();
347            return null;
348        }
349    
350        public Response processFlush(FlushCommand command) throws Exception {
351            return null;
352        }
353    
354        public Response processBeginTransaction(TransactionInfo info) throws Exception {
355            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
356            context = null;
357            if (cs != null) {
358                context = cs.getContext();
359            }
360            if (cs == null) {
361                throw new NullPointerException("Context is null");
362            }
363            // Avoid replaying dup commands
364            if (cs.getTransactionState(info.getTransactionId()) == null) {
365                cs.addTransactionState(info.getTransactionId());
366                broker.beginTransaction(context, info.getTransactionId());
367            }
368            return null;
369        }
370    
371        public Response processEndTransaction(TransactionInfo info) throws Exception {
372            // No need to do anything. This packet is just sent by the client
373            // make sure he is synced with the server as commit command could
374            // come from a different connection.
375            return null;
376        }
377    
378        public Response processPrepareTransaction(TransactionInfo info) throws Exception {
379            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
380            context = null;
381            if (cs != null) {
382                context = cs.getContext();
383            }
384            if (cs == null) {
385                throw new NullPointerException("Context is null");
386            }
387            TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
388            if (transactionState == null) {
389                throw new IllegalStateException("Cannot prepare a transaction that had not been started: "
390                        + info.getTransactionId());
391            }
392            // Avoid dups.
393            if (!transactionState.isPrepared()) {
394                transactionState.setPrepared(true);
395                int result = broker.prepareTransaction(context, info.getTransactionId());
396                transactionState.setPreparedResult(result);
397                IntegerResponse response = new IntegerResponse(result);
398                return response;
399            } else {
400                IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
401                return response;
402            }
403        }
404    
405        public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
406            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
407            context = cs.getContext();
408            cs.removeTransactionState(info.getTransactionId());
409            broker.commitTransaction(context, info.getTransactionId(), true);
410            return null;
411        }
412    
413        public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
414            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
415            context = cs.getContext();
416            cs.removeTransactionState(info.getTransactionId());
417            broker.commitTransaction(context, info.getTransactionId(), false);
418            return null;
419        }
420    
421        public Response processRollbackTransaction(TransactionInfo info) throws Exception {
422            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
423            context = cs.getContext();
424            cs.removeTransactionState(info.getTransactionId());
425            broker.rollbackTransaction(context, info.getTransactionId());
426            return null;
427        }
428    
429        public Response processForgetTransaction(TransactionInfo info) throws Exception {
430            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
431            context = cs.getContext();
432            broker.forgetTransaction(context, info.getTransactionId());
433            return null;
434        }
435    
436        public Response processRecoverTransactions(TransactionInfo info) throws Exception {
437            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
438            context = cs.getContext();
439            TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
440            return new DataArrayResponse(preparedTransactions);
441        }
442    
443        public Response processMessage(Message messageSend) throws Exception {
444            ProducerId producerId = messageSend.getProducerId();
445            ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
446            broker.send(producerExchange, messageSend);
447            return null;
448        }
449    
450        public Response processMessageAck(MessageAck ack) throws Exception {
451            ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
452            broker.acknowledge(consumerExchange, ack);
453            return null;
454        }
455    
456        public Response processMessagePull(MessagePull pull) throws Exception {
457            return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
458        }
459    
460        public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
461            broker.processDispatchNotification(notification);
462            return null;
463        }
464    
465        public Response processAddDestination(DestinationInfo info) throws Exception {
466            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
467            broker.addDestinationInfo(cs.getContext(), info);
468            if (info.getDestination().isTemporary()) {
469                cs.addTempDestination(info);
470            }
471            return null;
472        }
473    
474        public Response processRemoveDestination(DestinationInfo info) throws Exception {
475            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
476            broker.removeDestinationInfo(cs.getContext(), info);
477            if (info.getDestination().isTemporary()) {
478                cs.removeTempDestination(info.getDestination());
479            }
480            return null;
481        }
482    
483        public Response processAddProducer(ProducerInfo info) throws Exception {
484            SessionId sessionId = info.getProducerId().getParentId();
485            ConnectionId connectionId = sessionId.getParentId();
486            TransportConnectionState cs = lookupConnectionState(connectionId);
487            SessionState ss = cs.getSessionState(sessionId);
488            if (ss == null) {
489                throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
490                        + sessionId);
491            }
492            // Avoid replaying dup commands
493            if (!ss.getProducerIds().contains(info.getProducerId())) {
494                broker.addProducer(cs.getContext(), info);
495                try {
496                    ss.addProducer(info);
497                } catch (IllegalStateException e) {
498                    broker.removeProducer(cs.getContext(), info);
499                }
500            }
501            return null;
502        }
503    
504        public Response processRemoveProducer(ProducerId id) throws Exception {
505            SessionId sessionId = id.getParentId();
506            ConnectionId connectionId = sessionId.getParentId();
507            TransportConnectionState cs = lookupConnectionState(connectionId);
508            SessionState ss = cs.getSessionState(sessionId);
509            if (ss == null) {
510                throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
511                        + sessionId);
512            }
513            ProducerState ps = ss.removeProducer(id);
514            if (ps == null) {
515                throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
516            }
517            removeProducerBrokerExchange(id);
518            broker.removeProducer(cs.getContext(), ps.getInfo());
519            return null;
520        }
521    
522        public Response processAddConsumer(ConsumerInfo info) throws Exception {
523            SessionId sessionId = info.getConsumerId().getParentId();
524            ConnectionId connectionId = sessionId.getParentId();
525            TransportConnectionState cs = lookupConnectionState(connectionId);
526            SessionState ss = cs.getSessionState(sessionId);
527            if (ss == null) {
528                throw new IllegalStateException(broker.getBrokerName()
529                        + " Cannot add a consumer to a session that had not been registered: " + sessionId);
530            }
531            // Avoid replaying dup commands
532            if (!ss.getConsumerIds().contains(info.getConsumerId())) {
533                broker.addConsumer(cs.getContext(), info);
534                try {
535                    ss.addConsumer(info);
536                } catch (IllegalStateException e) {
537                    broker.removeConsumer(cs.getContext(), info);
538                }
539            }
540            return null;
541        }
542    
543        public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
544            SessionId sessionId = id.getParentId();
545            ConnectionId connectionId = sessionId.getParentId();
546            TransportConnectionState cs = lookupConnectionState(connectionId);
547            if (cs == null) {
548                throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
549                        + connectionId);
550            }
551            SessionState ss = cs.getSessionState(sessionId);
552            if (ss == null) {
553                throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
554                        + sessionId);
555            }
556            ConsumerState consumerState = ss.removeConsumer(id);
557            if (consumerState == null) {
558                throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
559            }
560            ConsumerInfo info = consumerState.getInfo();
561            info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
562            broker.removeConsumer(cs.getContext(), consumerState.getInfo());
563            removeConsumerBrokerExchange(id);
564            return null;
565        }
566    
567        public Response processAddSession(SessionInfo info) throws Exception {
568            ConnectionId connectionId = info.getSessionId().getParentId();
569            TransportConnectionState cs = lookupConnectionState(connectionId);
570            // Avoid replaying dup commands
571            if (!cs.getSessionIds().contains(info.getSessionId())) {
572                broker.addSession(cs.getContext(), info);
573                try {
574                    cs.addSession(info);
575                } catch (IllegalStateException e) {
576                    e.printStackTrace();
577                    broker.removeSession(cs.getContext(), info);
578                }
579            }
580            return null;
581        }
582    
583        public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
584            ConnectionId connectionId = id.getParentId();
585            TransportConnectionState cs = lookupConnectionState(connectionId);
586            if (cs == null) {
587                throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
588            }
589            SessionState session = cs.getSessionState(id);
590            if (session == null) {
591                throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
592            }
593            // Don't let new consumers or producers get added while we are closing
594            // this down.
595            session.shutdown();
596            // Cascade the connection stop to the consumers and producers.
597            for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
598                ConsumerId consumerId = (ConsumerId) iter.next();
599                try {
600                    processRemoveConsumer(consumerId, lastDeliveredSequenceId);
601                } catch (Throwable e) {
602                    LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
603                }
604            }
605            for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
606                ProducerId producerId = (ProducerId) iter.next();
607                try {
608                    processRemoveProducer(producerId);
609                } catch (Throwable e) {
610                    LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
611                }
612            }
613            cs.removeSession(id);
614            broker.removeSession(cs.getContext(), session.getInfo());
615            return null;
616        }
617    
618        public Response processAddConnection(ConnectionInfo info) throws Exception {
619            // if the broker service has slave attached, wait for the slave to be
620            // attached to allow client connection. slave connection is fine
621            if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
622                    && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
623                ServiceSupport.dispose(transport);
624                return new ExceptionResponse(new Exception("Master's slave not attached yet."));
625            }
626            // Older clients should have been defaulting this field to true.. but
627            // they were not.
628            if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
629                info.setClientMaster(true);
630            }
631            TransportConnectionState state;
632            // Make sure 2 concurrent connections by the same ID only generate 1
633            // TransportConnectionState object.
634            synchronized (brokerConnectionStates) {
635                state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
636                if (state == null) {
637                    state = new TransportConnectionState(info, this);
638                    brokerConnectionStates.put(info.getConnectionId(), state);
639                }
640                state.incrementReference();
641            }
642            // If there are 2 concurrent connections for the same connection id,
643            // then last one in wins, we need to sync here
644            // to figure out the winner.
645            synchronized (state.getConnectionMutex()) {
646                if (state.getConnection() != this) {
647                    LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
648                    state.getConnection().stop();
649                    LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
650                            + state.getConnection().getRemoteAddress());
651                    state.setConnection(this);
652                    state.reset(info);
653                }
654            }
655            registerConnectionState(info.getConnectionId(), state);
656            LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress());
657            // Setup the context.
658            String clientId = info.getClientId();
659            context = new ConnectionContext();
660            context.setBroker(broker);
661            context.setClientId(clientId);
662            context.setClientMaster(info.isClientMaster());
663            context.setConnection(this);
664            context.setConnectionId(info.getConnectionId());
665            context.setConnector(connector);
666            context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
667            context.setNetworkConnection(networkConnection);
668            context.setFaultTolerant(faultTolerantConnection);
669            context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
670            context.setUserName(info.getUserName());
671            context.setWireFormatInfo(wireFormatInfo);
672            this.manageable = info.isManageable();
673            state.setContext(context);
674            state.setConnection(this);
675            try {
676                broker.addConnection(context, info);
677            } catch (Exception e) {
678                brokerConnectionStates.remove(info);
679                LOG.warn("Failed to add Connection", e);
680                throw e;
681            }
682            if (info.isManageable() && broker.isFaultTolerantConfiguration()) {
683                // send ConnectionCommand
684                ConnectionControl command = new ConnectionControl();
685                command.setFaultTolerant(broker.isFaultTolerantConfiguration());
686                dispatchAsync(command);
687            }
688            return null;
689        }
690    
691        public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
692                throws InterruptedException {
693            LOG.debug("remove connection id: " + id);
694            TransportConnectionState cs = lookupConnectionState(id);
695            if (cs != null) {
696                // Don't allow things to be added to the connection state while we
697                // are
698                // shutting down.
699                cs.shutdown();
700                // Cascade the connection stop to the sessions.
701                for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
702                    SessionId sessionId = (SessionId) iter.next();
703                    try {
704                        processRemoveSession(sessionId, lastDeliveredSequenceId);
705                    } catch (Throwable e) {
706                        SERVICELOG.warn("Failed to remove session " + sessionId, e);
707                    }
708                }
709                // Cascade the connection stop to temp destinations.
710                for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) {
711                    DestinationInfo di = (DestinationInfo) iter.next();
712                    try {
713                        broker.removeDestination(cs.getContext(), di.getDestination(), 0);
714                    } catch (Throwable e) {
715                        SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
716                    }
717                    iter.remove();
718                }
719                try {
720                    broker.removeConnection(cs.getContext(), cs.getInfo(), null);
721                } catch (Throwable e) {
722                    SERVICELOG.warn("Failed to remove connection " + cs.getInfo(), e);
723                }
724                TransportConnectionState state = unregisterConnectionState(id);
725                if (state != null) {
726                    synchronized (brokerConnectionStates) {
727                        // If we are the last reference, we should remove the state
728                        // from the broker.
729                        if (state.decrementReference() == 0) {
730                            brokerConnectionStates.remove(id);
731                        }
732                    }
733                }
734            }
735            return null;
736        }
737    
738        public Response processProducerAck(ProducerAck ack) throws Exception {
739            // A broker should not get ProducerAck messages.
740            return null;
741        }
742    
743        public Connector getConnector() {
744            return connector;
745        }
746    
747        public void dispatchSync(Command message) {
748            // getStatistics().getEnqueues().increment();
749            try {
750                processDispatch(message);
751            } catch (IOException e) {
752                serviceExceptionAsync(e);
753            }
754        }
755    
756        public void dispatchAsync(Command message) {
757            if (!stopping.get()) {
758                // getStatistics().getEnqueues().increment();
759                if (taskRunner == null) {
760                    dispatchSync(message);
761                } else {
762                    synchronized (dispatchQueue) {
763                        dispatchQueue.add(message);
764                    }
765                    try {
766                        taskRunner.wakeup();
767                    } catch (InterruptedException e) {
768                        Thread.currentThread().interrupt();
769                    }
770                }
771            } else {
772                if (message.isMessageDispatch()) {
773                    MessageDispatch md = (MessageDispatch) message;
774                    Runnable sub = md.getTransmitCallback();
775                    broker.postProcessDispatch(md);
776                    if (sub != null) {
777                        sub.run();
778                    }
779                }
780            }
781        }
782    
783        protected void processDispatch(Command command) throws IOException {
784            final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
785            try {
786                if (!stopping.get()) {
787                    if (messageDispatch != null) {
788                        broker.preProcessDispatch(messageDispatch);
789                    }
790                    dispatch(command);
791                }
792            } finally {
793                if (messageDispatch != null) {
794                    Runnable sub = messageDispatch.getTransmitCallback();
795                    broker.postProcessDispatch(messageDispatch);
796                    if (sub != null) {
797                        sub.run();
798                    }
799                }
800                // getStatistics().getDequeues().increment();
801            }
802        }
803    
804        public boolean iterate() {
805            try {
806                if (stopping.get()) {
807                    if (dispatchStopped.compareAndSet(false, true)) {
808                        if (transportException.get() == null) {
809                            try {
810                                dispatch(new ShutdownInfo());
811                            } catch (Throwable ignore) {
812                            }
813                        }
814                        dispatchStoppedLatch.countDown();
815                    }
816                    return false;
817                }
818                if (!dispatchStopped.get()) {
819                    Command command = null;
820                    synchronized (dispatchQueue) {
821                        if (dispatchQueue.isEmpty()) {
822                            return false;
823                        }
824                        command = dispatchQueue.remove(0);
825                    }
826                    processDispatch(command);
827                    return true;
828                }
829                return false;
830            } catch (IOException e) {
831                if (dispatchStopped.compareAndSet(false, true)) {
832                    dispatchStoppedLatch.countDown();
833                }
834                serviceExceptionAsync(e);
835                return false;
836            }
837        }
838    
839        /**
840         * Returns the statistics for this connection
841         */
842        public ConnectionStatistics getStatistics() {
843            return statistics;
844        }
845    
846        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
847            return messageAuthorizationPolicy;
848        }
849    
850        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
851            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
852        }
853    
854        public boolean isManageable() {
855            return manageable;
856        }
857    
858        public void start() throws Exception {
859            starting = true;
860            try {
861                synchronized (this) {
862                    if (taskRunnerFactory != null) {
863                        taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
864                                + getRemoteAddress());
865                    } else {
866                        taskRunner = null;
867                    }
868                    transport.start();
869                    active = true;
870                    dispatchAsync(connector.getBrokerInfo());
871                    connector.onStarted(this);
872                }
873            } catch (Exception e) {
874                // Force clean up on an error starting up.
875                stop();
876                throw e;
877            } finally {
878                // stop() can be called from within the above block,
879                // but we want to be sure start() completes before
880                // stop() runs, so queue the stop until right now:
881                starting = false;
882                if (pendingStop) {
883                    LOG.debug("Calling the delayed stop()");
884                    stop();
885                }
886            }
887        }
888    
889        public void stop() throws Exception {
890            synchronized (this) {
891                pendingStop = true;
892                if (starting) {
893                    LOG.debug("stop() called in the middle of start(). Delaying...");
894                    return;
895                }
896            }
897            stopAsync();
898            while (!stopped.await(5, TimeUnit.SECONDS)) {
899                LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
900            }
901        }
902    
903        public void stopAsync() {
904            // If we're in the middle of starting
905            // then go no further... for now.
906            if (stopping.compareAndSet(false, true)) {
907                // Let all the connection contexts know we are shutting down
908                // so that in progress operations can notice and unblock.
909                List<TransportConnectionState> connectionStates = listConnectionStates();
910                for (TransportConnectionState cs : connectionStates) {
911                    cs.getContext().getStopping().set(true);
912                }
913                try {
914                    getDefaultTaskRunnerFactory().execute(new Runnable(){
915                        public void run() {
916                            serviceLock.writeLock().lock();
917                            try {
918                                doStop();
919                            } catch (Throwable e) {
920                                LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress()
921                                        + "': ", e);
922                            } finally {
923                                stopped.countDown();
924                                serviceLock.writeLock().unlock();
925                            }
926                        }
927                    });
928                } catch (Throwable t) {
929                    LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
930                    stopped.countDown();
931                }
932            }
933        }
934    
935        @Override
936        public String toString() {
937            return "Transport Connection to: " + transport.getRemoteAddress();
938        }
939    
940        protected void doStop() throws Exception, InterruptedException {
941            LOG.debug("Stopping connection: " + transport.getRemoteAddress());
942            connector.onStopped(this);
943            try {
944                synchronized (this) {
945                    if (masterBroker != null) {
946                        masterBroker.stop();
947                    }
948                    if (duplexBridge != null) {
949                        duplexBridge.stop();
950                    }
951                }
952            } catch (Exception ignore) {
953                LOG.trace("Exception caught stopping", ignore);
954            }
955            try {
956                transport.stop();
957                LOG.debug("Stopped transport: " + transport.getRemoteAddress());
958            } catch (Exception e) {
959                LOG.debug("Could not stop transport: " + e, e);
960            }
961            if (taskRunner != null) {
962                taskRunner.shutdown(1);
963            }
964            active = false;
965            // Run the MessageDispatch callbacks so that message references get
966            // cleaned up.
967            synchronized (dispatchQueue) {
968                for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
969                    Command command = iter.next();
970                    if (command.isMessageDispatch()) {
971                        MessageDispatch md = (MessageDispatch) command;
972                        Runnable sub = md.getTransmitCallback();
973                        broker.postProcessDispatch(md);
974                        if (sub != null) {
975                            sub.run();
976                        }
977                    }
978                }
979                dispatchQueue.clear();
980            }
981            //
982            // Remove all logical connection associated with this connection
983            // from the broker.
984            if (!broker.isStopped()) {
985                List<TransportConnectionState> connectionStates = listConnectionStates();
986                connectionStates = listConnectionStates();
987                for (TransportConnectionState cs : connectionStates) {
988                    cs.getContext().getStopping().set(true);
989                    try {
990                        LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
991                        processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
992                    } catch (Throwable ignore) {
993                        ignore.printStackTrace();
994                    }
995                }
996                if (brokerInfo != null) {
997                    broker.removeBroker(this, brokerInfo);
998                }
999            }
1000            LOG.debug("Connection Stopped: " + getRemoteAddress());
1001        }
1002    
1003        /**
1004         * @return Returns the blockedCandidate.
1005         */
1006        public boolean isBlockedCandidate() {
1007            return blockedCandidate;
1008        }
1009    
1010        /**
1011         * @param blockedCandidate
1012         *            The blockedCandidate to set.
1013         */
1014        public void setBlockedCandidate(boolean blockedCandidate) {
1015            this.blockedCandidate = blockedCandidate;
1016        }
1017    
1018        /**
1019         * @return Returns the markedCandidate.
1020         */
1021        public boolean isMarkedCandidate() {
1022            return markedCandidate;
1023        }
1024    
1025        /**
1026         * @param markedCandidate
1027         *            The markedCandidate to set.
1028         */
1029        public void setMarkedCandidate(boolean markedCandidate) {
1030            this.markedCandidate = markedCandidate;
1031            if (!markedCandidate) {
1032                timeStamp = 0;
1033                blockedCandidate = false;
1034            }
1035        }
1036    
1037        /**
1038         * @param slow
1039         *            The slow to set.
1040         */
1041        public void setSlow(boolean slow) {
1042            this.slow = slow;
1043        }
1044    
1045        /**
1046         * @return true if the Connection is slow
1047         */
1048        public boolean isSlow() {
1049            return slow;
1050        }
1051    
1052        /**
1053         * @return true if the Connection is potentially blocked
1054         */
1055        public boolean isMarkedBlockedCandidate() {
1056            return markedCandidate;
1057        }
1058    
1059        /**
1060         * Mark the Connection, so we can deem if it's collectable on the next sweep
1061         */
1062        public void doMark() {
1063            if (timeStamp == 0) {
1064                timeStamp = System.currentTimeMillis();
1065            }
1066        }
1067    
1068        /**
1069         * @return if after being marked, the Connection is still writing
1070         */
1071        public boolean isBlocked() {
1072            return blocked;
1073        }
1074    
1075        /**
1076         * @return true if the Connection is connected
1077         */
1078        public boolean isConnected() {
1079            return connected;
1080        }
1081    
1082        /**
1083         * @param blocked
1084         *            The blocked to set.
1085         */
1086        public void setBlocked(boolean blocked) {
1087            this.blocked = blocked;
1088        }
1089    
1090        /**
1091         * @param connected
1092         *            The connected to set.
1093         */
1094        public void setConnected(boolean connected) {
1095            this.connected = connected;
1096        }
1097    
1098        /**
1099         * @return true if the Connection is active
1100         */
1101        public boolean isActive() {
1102            return active;
1103        }
1104    
1105        /**
1106         * @param active
1107         *            The active to set.
1108         */
1109        public void setActive(boolean active) {
1110            this.active = active;
1111        }
1112    
1113        /**
1114         * @return true if the Connection is starting
1115         */
1116        public synchronized boolean isStarting() {
1117            return starting;
1118        }
1119    
1120        public synchronized boolean isNetworkConnection() {
1121            return networkConnection;
1122        }
1123    
1124        protected synchronized void setStarting(boolean starting) {
1125            this.starting = starting;
1126        }
1127    
1128        /**
1129         * @return true if the Connection needs to stop
1130         */
1131        public synchronized boolean isPendingStop() {
1132            return pendingStop;
1133        }
1134    
1135        protected synchronized void setPendingStop(boolean pendingStop) {
1136            this.pendingStop = pendingStop;
1137        }
1138    
1139        public Response processBrokerInfo(BrokerInfo info) {
1140            if (info.isSlaveBroker()) {
1141                BrokerService bService = connector.getBrokerService();
1142                // Do we only support passive slaves - or does the slave want to be
1143                // passive ?
1144                boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
1145                if (passive == false) {
1146                    
1147                    // stream messages from this broker (the master) to
1148                    // the slave
1149                    MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
1150                    masterBroker = new MasterBroker(parent, transport);
1151                    masterBroker.startProcessing();
1152                }
1153                LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached");
1154                bService.slaveConnectionEstablished();
1155            } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1156                // so this TransportConnection is the rear end of a network bridge
1157                // We have been requested to create a two way pipe ...
1158                try {
1159                    Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1160                    Map<String, String> props = createMap(properties);
1161                    NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1162                    IntrospectionSupport.setProperties(config, props, "");
1163                    config.setBrokerName(broker.getBrokerName());
1164                    URI uri = broker.getVmConnectorURI();
1165                    HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParamters(uri));
1166                    map.put("network", "true");
1167                    map.put("async", "false");
1168                    uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
1169                    Transport localTransport = TransportFactory.connect(uri);
1170                    Transport remoteBridgeTransport = new ResponseCorrelator(transport);
1171                    duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport);
1172                    duplexBridge.setBrokerService(broker.getBrokerService());
1173                    // now turn duplex off this side
1174                    info.setDuplexConnection(false);
1175                    duplexBridge.setCreatedByDuplex(true);
1176                    duplexBridge.duplexStart(this, brokerInfo, info);
1177                    LOG.info("Created Duplex Bridge back to " + info.getBrokerName());
1178                    return null;
1179                } catch (Exception e) {
1180                    LOG.error("Creating duplex network bridge", e);
1181                }
1182            }
1183            // We only expect to get one broker info command per connection
1184            if (this.brokerInfo != null) {
1185                LOG.warn("Unexpected extra broker info command received: " + info);
1186            }
1187            this.brokerInfo = info;
1188            broker.addBroker(this, info);
1189            networkConnection = true;
1190            List<TransportConnectionState> connectionStates = listConnectionStates();
1191            for (TransportConnectionState cs : connectionStates) {
1192                cs.getContext().setNetworkConnection(true);
1193            }
1194            return null;
1195        }
1196    
1197        @SuppressWarnings("unchecked")
1198        private HashMap<String, String> createMap(Properties properties) {
1199            return new HashMap(properties);
1200        }
1201    
1202        protected void dispatch(Command command) throws IOException {
1203            try {
1204                setMarkedCandidate(true);
1205                transport.oneway(command);
1206            } finally {
1207                setMarkedCandidate(false);
1208            }
1209        }
1210    
1211        public String getRemoteAddress() {
1212            return transport.getRemoteAddress();
1213        }
1214    
1215        public String getConnectionId() {
1216            List<TransportConnectionState> connectionStates = listConnectionStates();
1217            for (TransportConnectionState cs : connectionStates) {
1218                if (cs.getInfo().getClientId() != null) {
1219                    return cs.getInfo().getClientId();
1220                }
1221                return cs.getInfo().getConnectionId().toString();
1222            }
1223            return null;
1224        }
1225    
1226        private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) {
1227            ProducerBrokerExchange result = producerExchanges.get(id);
1228            if (result == null) {
1229                synchronized (producerExchanges) {
1230                    result = new ProducerBrokerExchange();
1231                    TransportConnectionState state = lookupConnectionState(id);
1232                    context = state.getContext();
1233                    result.setConnectionContext(context);
1234                    SessionState ss = state.getSessionState(id.getParentId());
1235                    if (ss != null) {
1236                        result.setProducerState(ss.getProducerState(id));
1237                        ProducerState producerState = ss.getProducerState(id);
1238                        if (producerState != null && producerState.getInfo() != null) {
1239                            ProducerInfo info = producerState.getInfo();
1240                            result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1241                        }
1242                    }
1243                    producerExchanges.put(id, result);
1244                }
1245            } else {
1246                context = result.getConnectionContext();
1247            }
1248            return result;
1249        }
1250    
1251        private void removeProducerBrokerExchange(ProducerId id) {
1252            synchronized (producerExchanges) {
1253                producerExchanges.remove(id);
1254            }
1255        }
1256    
1257        private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1258            ConsumerBrokerExchange result = consumerExchanges.get(id);
1259            if (result == null) {
1260                synchronized (consumerExchanges) {
1261                    result = new ConsumerBrokerExchange();
1262                    TransportConnectionState state = lookupConnectionState(id);
1263                    context = state.getContext();
1264                    result.setConnectionContext(context);
1265                    SessionState ss = state.getSessionState(id.getParentId());
1266                    if (ss != null) {
1267                        ConsumerState cs = ss.getConsumerState(id);
1268                        if (cs != null) {
1269                            ConsumerInfo info = cs.getInfo();
1270                            if (info != null) {
1271                                if (info.getDestination() != null && info.getDestination().isPattern()) {
1272                                    result.setWildcard(true);
1273                                }
1274                            }
1275                        }
1276                    }
1277                    consumerExchanges.put(id, result);
1278                }
1279            }
1280            return result;
1281        }
1282    
1283        private void removeConsumerBrokerExchange(ConsumerId id) {
1284            synchronized (consumerExchanges) {
1285                consumerExchanges.remove(id);
1286            }
1287        }
1288    
1289        public int getProtocolVersion() {
1290            return protocolVersion.get();
1291        }
1292    
1293        public Response processControlCommand(ControlCommand command) throws Exception {
1294            String control = command.getCommand();
1295            if (control != null && control.equals("shutdown")) {
1296                System.exit(0);
1297            }
1298            return null;
1299        }
1300    
1301        public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1302            return null;
1303        }
1304    
1305        public Response processConnectionControl(ConnectionControl control) throws Exception {
1306            if (control != null) {
1307                faultTolerantConnection = control.isFaultTolerant();
1308            }
1309            return null;
1310        }
1311    
1312        public Response processConnectionError(ConnectionError error) throws Exception {
1313            return null;
1314        }
1315    
1316        public Response processConsumerControl(ConsumerControl control) throws Exception {
1317            return null;
1318        }
1319    
1320        protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1321                TransportConnectionState state) {
1322            TransportConnectionState cs = null;
1323            if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1324                // swap implementations
1325                TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1326                newRegister.intialize(connectionStateRegister);
1327                connectionStateRegister = newRegister;
1328            }
1329            cs = connectionStateRegister.registerConnectionState(connectionId, state);
1330            return cs;
1331        }
1332    
1333        protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1334            return connectionStateRegister.unregisterConnectionState(connectionId);
1335        }
1336    
1337        protected synchronized List<TransportConnectionState> listConnectionStates() {
1338            return connectionStateRegister.listConnectionStates();
1339        }
1340    
1341        protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1342            return connectionStateRegister.lookupConnectionState(connectionId);
1343        }
1344    
1345        protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1346            return connectionStateRegister.lookupConnectionState(id);
1347        }
1348    
1349        protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1350            return connectionStateRegister.lookupConnectionState(id);
1351        }
1352    
1353        protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1354            return connectionStateRegister.lookupConnectionState(id);
1355        }
1356    
1357        protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1358            return connectionStateRegister.lookupConnectionState(connectionId);
1359        }
1360    }