001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.io.IOException;
020    import java.security.GeneralSecurityException;
021    import java.security.cert.X509Certificate;
022    import java.util.Collection;
023    import java.util.List;
024    import java.util.Properties;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.CountDownLatch;
027    import java.util.concurrent.SynchronousQueue;
028    import java.util.concurrent.ThreadFactory;
029    import java.util.concurrent.ThreadPoolExecutor;
030    import java.util.concurrent.TimeUnit;
031    import java.util.concurrent.atomic.AtomicBoolean;
032    import java.util.concurrent.atomic.AtomicLong;
033    
034    import org.apache.activemq.Service;
035    import org.apache.activemq.advisory.AdvisorySupport;
036    import org.apache.activemq.broker.BrokerService;
037    import org.apache.activemq.broker.BrokerServiceAware;
038    import org.apache.activemq.broker.TransportConnection;
039    import org.apache.activemq.broker.region.AbstractRegion;
040    import org.apache.activemq.broker.region.RegionBroker;
041    import org.apache.activemq.broker.region.Subscription;
042    import org.apache.activemq.command.ActiveMQDestination;
043    import org.apache.activemq.command.ActiveMQMessage;
044    import org.apache.activemq.command.ActiveMQTempDestination;
045    import org.apache.activemq.command.ActiveMQTopic;
046    import org.apache.activemq.command.BrokerId;
047    import org.apache.activemq.command.BrokerInfo;
048    import org.apache.activemq.command.Command;
049    import org.apache.activemq.command.ConnectionError;
050    import org.apache.activemq.command.ConnectionId;
051    import org.apache.activemq.command.ConnectionInfo;
052    import org.apache.activemq.command.ConsumerId;
053    import org.apache.activemq.command.ConsumerInfo;
054    import org.apache.activemq.command.DataStructure;
055    import org.apache.activemq.command.DestinationInfo;
056    import org.apache.activemq.command.ExceptionResponse;
057    import org.apache.activemq.command.KeepAliveInfo;
058    import org.apache.activemq.command.Message;
059    import org.apache.activemq.command.MessageAck;
060    import org.apache.activemq.command.MessageDispatch;
061    import org.apache.activemq.command.NetworkBridgeFilter;
062    import org.apache.activemq.command.ProducerInfo;
063    import org.apache.activemq.command.RemoveInfo;
064    import org.apache.activemq.command.Response;
065    import org.apache.activemq.command.SessionInfo;
066    import org.apache.activemq.command.ShutdownInfo;
067    import org.apache.activemq.command.WireFormatInfo;
068    import org.apache.activemq.filter.DestinationFilter;
069    import org.apache.activemq.transport.DefaultTransportListener;
070    import org.apache.activemq.transport.FutureResponse;
071    import org.apache.activemq.transport.ResponseCallback;
072    import org.apache.activemq.transport.Transport;
073    import org.apache.activemq.transport.TransportDisposedIOException;
074    import org.apache.activemq.transport.TransportFilter;
075    import org.apache.activemq.transport.TransportListener;
076    import org.apache.activemq.transport.tcp.SslTransport;
077    import org.apache.activemq.util.IdGenerator;
078    import org.apache.activemq.util.IntrospectionSupport;
079    import org.apache.activemq.util.LongSequenceGenerator;
080    import org.apache.activemq.util.MarshallingSupport;
081    import org.apache.activemq.util.ServiceStopper;
082    import org.apache.activemq.util.ServiceSupport;
083    import org.apache.commons.logging.Log;
084    import org.apache.commons.logging.LogFactory;
085    
086    /**
087     * A useful base class for implementing demand forwarding bridges.
088     * 
089     * @version $Revision: 835920 $
090     */
091    public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
092    
093        private static final Log LOG = LogFactory.getLog(DemandForwardingBridge.class);
094        private static final ThreadPoolExecutor ASYNC_TASKS;
095        protected final Transport localBroker;
096        protected final Transport remoteBroker;
097        protected final IdGenerator idGenerator = new IdGenerator();
098        protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
099        protected ConnectionInfo localConnectionInfo;
100        protected ConnectionInfo remoteConnectionInfo;
101        protected SessionInfo localSessionInfo;
102        protected ProducerInfo producerInfo;
103        protected String remoteBrokerName = "Unknown";
104        protected String localClientId;
105        protected ConsumerInfo demandConsumerInfo;
106        protected int demandConsumerDispatched;
107        protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
108        protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
109        protected AtomicBoolean disposed = new AtomicBoolean();
110        protected BrokerId localBrokerId;
111        protected ActiveMQDestination[] excludedDestinations;
112        protected ActiveMQDestination[] dynamicallyIncludedDestinations;
113        protected ActiveMQDestination[] staticallyIncludedDestinations;
114        protected ActiveMQDestination[] durableDestinations;
115        protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
116        protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
117        protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
118        protected CountDownLatch startedLatch = new CountDownLatch(2);
119        protected CountDownLatch localStartedLatch = new CountDownLatch(1);
120        protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
121        protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1);
122        protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
123        protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
124        protected NetworkBridgeConfiguration configuration;
125    
126        final AtomicLong enqueueCounter = new AtomicLong();
127        final AtomicLong dequeueCounter = new AtomicLong();
128    
129        private NetworkBridgeListener networkBridgeListener;
130        private boolean createdByDuplex;
131        private BrokerInfo localBrokerInfo;
132        private BrokerInfo remoteBrokerInfo;
133    
134        private AtomicBoolean started = new AtomicBoolean();
135        private TransportConnection duplexInitiatingConnection;
136        private BrokerService brokerService = null;
137    
138        public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
139            this.configuration = configuration;
140            this.localBroker = localBroker;
141            this.remoteBroker = remoteBroker;
142        }
143    
144        public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
145            this.localBrokerInfo = localBrokerInfo;
146            this.remoteBrokerInfo = remoteBrokerInfo;
147            this.duplexInitiatingConnection = connection;
148            start();
149            serviceRemoteCommand(remoteBrokerInfo);
150        }
151    
152        public void start() throws Exception {
153            if (started.compareAndSet(false, true)) {
154                localBroker.setTransportListener(new DefaultTransportListener() {
155    
156                    public void onCommand(Object o) {
157                        Command command = (Command) o;
158                        serviceLocalCommand(command);
159                    }
160    
161                    public void onException(IOException error) {
162                        serviceLocalException(error);
163                    }
164                });
165                remoteBroker.setTransportListener(new TransportListener() {
166    
167                    public void onCommand(Object o) {
168                        Command command = (Command) o;
169                        serviceRemoteCommand(command);
170                    }
171    
172                    public void onException(IOException error) {
173                        serviceRemoteException(error);
174                    }
175    
176                    public void transportInterupted() {
177                        // clear any subscriptions - to try and prevent the bridge
178                        // from stalling the broker
179                        if (remoteInterupted.compareAndSet(false, true)) {
180                            LOG.info("Outbound transport to " + remoteBrokerName + " interrupted.");
181                            if (localBridgeStarted.get()) {
182                                clearDownSubscriptions();
183                                synchronized (DemandForwardingBridgeSupport.this) {
184                                    try {
185                                        localBroker.oneway(localConnectionInfo.createRemoveCommand());
186                                    } catch (TransportDisposedIOException td) {
187                                        LOG.debug("local broker is now disposed", td);
188                                    } catch (IOException e) {
189                                        LOG.warn("Caught exception from local start", e);
190                                    }
191                                }
192                            }
193                            localBridgeStarted.set(false);
194                            remoteBridgeStarted.set(false);
195                            startedLatch = new CountDownLatch(2);
196                            localStartedLatch = new CountDownLatch(1);
197                        }
198                    }
199    
200                    public void transportResumed() {
201                        if (remoteInterupted.compareAndSet(true, false)) {
202                            // We want to slow down false connects so that we don't
203                            // get in a busy loop.
204                            // False connects can occurr if you using SSH tunnels.
205                            if (!lastConnectSucceeded.get()) {
206                                try {
207                                    LOG.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");
208                                    Thread.sleep(1000);
209                                } catch (InterruptedException e) {
210                                    Thread.currentThread().interrupt();
211                                }
212                            }
213                            lastConnectSucceeded.set(false);
214                            try {
215                                startLocalBridge();
216                                remoteBridgeStarted.set(true);
217                                startedLatch.countDown();
218                                LOG.info("Outbound transport to " + remoteBrokerName + " resumed");
219                            } catch (Exception e) {
220                                LOG.error("Caught exception  from local start in resume transport", e);
221                            }
222                        }
223                    }
224                });
225    
226                localBroker.start();
227                remoteBroker.start();
228                if (configuration.isDuplex() && duplexInitiatingConnection == null) {
229                    // initiator side of duplex network
230                    remoteBrokerNameKnownLatch.await();
231                }
232                try {
233                    triggerRemoteStartBridge();
234                } catch (IOException e) {
235                    LOG.warn("Caught exception from remote start", e);
236                }
237                NetworkBridgeListener l = this.networkBridgeListener;
238                if (l != null) {
239                    l.onStart(this);
240                }
241            }
242        }
243    
244        protected void triggerLocalStartBridge() throws IOException {
245            ASYNC_TASKS.execute(new Runnable() {
246                public void run() {
247                    final String originalName = Thread.currentThread().getName();
248                    Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
249                    try {
250                        startLocalBridge();
251                    } catch (Exception e) {
252                        serviceLocalException(e);
253                    } finally {
254                        Thread.currentThread().setName(originalName);
255                    }
256                }
257            });
258        }
259    
260        protected void triggerRemoteStartBridge() throws IOException {
261            ASYNC_TASKS.execute(new Runnable() {
262                public void run() {
263                    final String originalName = Thread.currentThread().getName();
264                    Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker);
265                    try {
266                        startRemoteBridge();
267                    } catch (Exception e) {
268                        serviceRemoteException(e);
269                    } finally {
270                        Thread.currentThread().setName(originalName);
271                    }
272                }
273            });
274        }
275    
276        protected void startLocalBridge() throws Exception {
277            if (localBridgeStarted.compareAndSet(false, true)) {
278                synchronized (this) {
279                    if (LOG.isTraceEnabled()) {
280                        LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker);
281                    }
282                    remoteBrokerNameKnownLatch.await();
283    
284                    localConnectionInfo = new ConnectionInfo();
285                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
286                    localClientId = "NC_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
287                    localConnectionInfo.setClientId(localClientId);
288                    localConnectionInfo.setUserName(configuration.getUserName());
289                    localConnectionInfo.setPassword(configuration.getPassword());
290                    Transport originalTransport = remoteBroker;
291                    while (originalTransport instanceof TransportFilter) {
292                        originalTransport = ((TransportFilter) originalTransport).getNext();
293                    }
294                    if (originalTransport instanceof SslTransport) {
295                        X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
296                        localConnectionInfo.setTransportContext(peerCerts);
297                    }
298                    localBroker.oneway(localConnectionInfo);
299    
300                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
301                    localBroker.oneway(localSessionInfo);
302    
303                    LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
304    
305                    startedLatch.countDown();
306                    localStartedLatch.countDown();
307                    setupStaticDestinations();
308                }
309            }
310        }
311    
312        protected void startRemoteBridge() throws Exception {
313            if (remoteBridgeStarted.compareAndSet(false, true)) {
314                if (LOG.isTraceEnabled()) {
315                    LOG.trace(configuration.getBrokerName() + " starting remote Bridge, localBroker=" + localBroker);
316                }
317                synchronized (this) {
318                    if (!isCreatedByDuplex()) {
319                        BrokerInfo brokerInfo = new BrokerInfo();
320                        brokerInfo.setBrokerName(configuration.getBrokerName());
321                        brokerInfo.setNetworkConnection(true);
322                        brokerInfo.setDuplexConnection(configuration.isDuplex());
323                        // set our properties
324                        Properties props = new Properties();
325                        IntrospectionSupport.getProperties(configuration, props, null);
326                        String str = MarshallingSupport.propertiesToString(props);
327                        brokerInfo.setNetworkProperties(str);
328                        brokerInfo.setBrokerId(this.localBrokerId);
329                        remoteBroker.oneway(brokerInfo);
330                    }
331                    if (remoteConnectionInfo != null) {
332                        remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
333                    }
334                    remoteConnectionInfo = new ConnectionInfo();
335                    remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
336                    remoteConnectionInfo.setClientId("NC_" + configuration.getBrokerName() + "_outbound");
337                    remoteConnectionInfo.setUserName(configuration.getUserName());
338                    remoteConnectionInfo.setPassword(configuration.getPassword());
339                    remoteBroker.oneway(remoteConnectionInfo);
340    
341                    SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
342                    remoteBroker.oneway(remoteSessionInfo);
343                    producerInfo = new ProducerInfo(remoteSessionInfo, 1);
344                    producerInfo.setResponseRequired(false);
345                    remoteBroker.oneway(producerInfo);
346                    // Listen to consumer advisory messages on the remote broker to
347                    // determine demand.
348                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
349                    demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
350                    String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + configuration.getDestinationFilter();
351                    if (configuration.isBridgeTempDestinations()) {
352                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
353                    }
354                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
355                    demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
356                    remoteBroker.oneway(demandConsumerInfo);
357                    startedLatch.countDown();
358                    if (!disposed.get()) {
359                        triggerLocalStartBridge();
360                    }
361                }
362            }
363        }
364    
365        public void stop() throws Exception {
366            if (started.compareAndSet(true, false)) {
367                if (disposed.compareAndSet(false, true)) {
368                    LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
369                    NetworkBridgeListener l = this.networkBridgeListener;
370                    if (l != null) {
371                        l.onStop(this);
372                    }
373                    try {
374                        remoteBridgeStarted.set(false);
375                        final CountDownLatch sendShutdown = new CountDownLatch(1);
376                        ASYNC_TASKS.execute(new Runnable() {
377                            public void run() {
378                                try {
379                                    localBroker.oneway(new ShutdownInfo());
380                                    sendShutdown.countDown();
381                                    remoteBroker.oneway(new ShutdownInfo());
382                                } catch (Throwable e) {
383                                    LOG.debug("Caught exception sending shutdown", e);
384                                } finally {
385                                    sendShutdown.countDown();
386                                }
387    
388                            }
389                        });
390                        if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
391                            LOG.info("Network Could not shutdown in a timely manner");
392                        }
393                    } finally {
394                        ServiceStopper ss = new ServiceStopper();
395                        ss.stop(remoteBroker);
396                        ss.stop(localBroker);
397                        // Release the started Latch since another thread could be
398                        // stuck waiting for it to start up.
399                        startedLatch.countDown();
400                        startedLatch.countDown();
401                        localStartedLatch.countDown();
402                        ss.throwFirstException();
403                    }
404                }
405                LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
406            }
407        }
408    
409        public void serviceRemoteException(Throwable error) {
410            if (!disposed.get()) {
411                if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
412                    LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
413                } else {
414                    LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
415                }
416                LOG.debug("The remote Exception was: " + error, error);
417                ASYNC_TASKS.execute(new Runnable() {
418                    public void run() {
419                        ServiceSupport.dispose(getControllingService());
420                    }
421                });
422                fireBridgeFailed();
423            }
424        }
425    
426        protected void serviceRemoteCommand(Command command) {
427            if (!disposed.get()) {
428                try {
429                    if (command.isMessageDispatch()) {
430                        waitStarted();
431                        MessageDispatch md = (MessageDispatch) command;
432                        serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
433                        demandConsumerDispatched++;
434                        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
435                            remoteBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched));
436                            demandConsumerDispatched = 0;
437                        }
438                    } else if (command.isBrokerInfo()) {
439                        lastConnectSucceeded.set(true);
440                        remoteBrokerInfo = (BrokerInfo) command;
441                        Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
442                        try {
443                            IntrospectionSupport.getProperties(configuration, props, null);
444                            if (configuration.getExcludedDestinations() != null) {
445                                excludedDestinations = configuration.getExcludedDestinations().toArray(
446                                        new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
447                            }
448                            if (configuration.getStaticallyIncludedDestinations() != null) {
449                                staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
450                                        new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
451                            }
452                            if (configuration.getDynamicallyIncludedDestinations() != null) {
453                                dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations()
454                                        .toArray(
455                                                new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
456                                                        .size()]);
457                            }
458                        } catch (Throwable t) {
459                            LOG.error("Error mapping remote destinations", t);
460                        }
461                        serviceRemoteBrokerInfo(command);
462                        // Let the local broker know the remote broker's ID.
463                        localBroker.oneway(command);
464                    } else if (command.getClass() == ConnectionError.class) {
465                        ConnectionError ce = (ConnectionError) command;
466                        serviceRemoteException(ce.getException());
467                    } else {
468                        if (isDuplex()) {
469                            if (command.isMessage()) {
470                                ActiveMQMessage message = (ActiveMQMessage) command;
471                                if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())) {
472                                    serviceRemoteConsumerAdvisory(message.getDataStructure());
473                                } else {
474                                    if (!isPermissableDestination(message.getDestination(), true)) {
475                                        return;
476                                    }
477                                    if (message.isResponseRequired()) {
478                                        Response reply = new Response();
479                                        reply.setCorrelationId(message.getCommandId());
480                                        localBroker.oneway(message);
481                                        remoteBroker.oneway(reply);
482                                    } else {
483                                        localBroker.oneway(message);
484                                    }
485                                }
486                            } else {
487                                switch (command.getDataStructureType()) {
488                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
489                                case SessionInfo.DATA_STRUCTURE_TYPE:
490                                case ProducerInfo.DATA_STRUCTURE_TYPE:
491                                    localBroker.oneway(command);
492                                    break;
493                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
494                                    localStartedLatch.await();
495                                    if (started.get()) {
496                                        if (!addConsumerInfo((ConsumerInfo) command)) {
497                                            if (LOG.isDebugEnabled()) {
498                                                LOG.debug("Ignoring ConsumerInfo: " + command);
499                                            }
500                                        } else {
501                                            if (LOG.isTraceEnabled()) {
502                                                LOG.trace("Adding ConsumerInfo: " + command);
503                                            }
504                                        }
505                                    } else {
506                                        // received a subscription whilst stopping
507                                        LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
508                                    }
509                                    break;
510                                default:
511                                    if (LOG.isDebugEnabled()) {
512                                        LOG.debug("Ignoring remote command: " + command);
513                                    }
514                                }
515                            }
516                        } else {
517                            switch (command.getDataStructureType()) {
518                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
519                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
520                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
521                                break;
522                            default:
523                                LOG.warn("Unexpected remote command: " + command);
524                            }
525                        }
526                    }
527                } catch (Throwable e) {
528                    if (LOG.isDebugEnabled()) {
529                        LOG.debug("Exception processing remote command: " + command, e);
530                    }
531                    serviceRemoteException(e);
532                }
533            }
534        }
535    
536        private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
537            final int networkTTL = configuration.getNetworkTTL();
538            if (data.getClass() == ConsumerInfo.class) {
539                // Create a new local subscription
540                ConsumerInfo info = (ConsumerInfo) data;
541                BrokerId[] path = info.getBrokerPath();
542    
543                if (path != null && path.length >= networkTTL) {
544                    if (LOG.isDebugEnabled()) {
545                        LOG.debug(configuration.getBrokerName() + " Ignoring sub  from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info);
546                    }
547                    return;
548                }
549                if (contains(path, localBrokerPath[0])) {
550                    // Ignore this consumer as it's a consumer we locally sent to the broker.
551                    if (LOG.isDebugEnabled()) {
552                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
553                    }
554                    return;
555                }
556                if (!isPermissableDestination(info.getDestination())) {
557                    // ignore if not in the permitted or in the excluded list
558                    if (LOG.isDebugEnabled()) {
559                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info);
560                    }
561                    return;
562                }
563    
564                // in a cyclic network there can be multiple bridges per broker that can propagate
565                // a network subscription so there is a need to synchronise on a shared entity
566                synchronized (brokerService.getVmConnectorURI()) {
567                    if (addConsumerInfo(info)) {
568                        if (LOG.isDebugEnabled()) {
569                            LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
570                        }
571                    } else {
572                        if (LOG.isDebugEnabled()) {
573                            LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
574                        }
575                    }
576                }
577            } else if (data.getClass() == DestinationInfo.class) {
578                // It's a destination info - we want to pass up
579                // information about temporary destinations
580                DestinationInfo destInfo = (DestinationInfo) data;
581                BrokerId[] path = destInfo.getBrokerPath();
582                if (path != null && path.length >= networkTTL) {
583                    if (LOG.isDebugEnabled()) {
584                        LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only");
585                    }
586                    return;
587                }
588                if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
589                    // Ignore this consumer as it's a consumer we locally sent to
590                    // the broker.
591                    if (LOG.isDebugEnabled()) {
592                        LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
593                    }
594                    return;
595                }
596                destInfo.setConnectionId(localConnectionInfo.getConnectionId());
597                if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
598                    // re-set connection id so comes from here
599                    ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
600                    tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
601                }
602                destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
603                if (LOG.isTraceEnabled()) {
604                    LOG.trace("bridging destination control command: " + destInfo);
605                }
606                localBroker.oneway(destInfo);
607            } else if (data.getClass() == RemoveInfo.class) {
608                ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
609                removeDemandSubscription(id);
610            }
611        }
612    
613        public void serviceLocalException(Throwable error) {
614            if (!disposed.get()) {
615                LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
616                LOG.debug("The local Exception was:" + error, error);
617                ASYNC_TASKS.execute(new Runnable() {
618                    public void run() {
619                        ServiceSupport.dispose(getControllingService());
620                    }
621                });
622                fireBridgeFailed();
623            }
624        }
625    
626        protected Service getControllingService() {
627            return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
628        }
629    
630        protected void addSubscription(DemandSubscription sub) throws IOException {
631            if (sub != null) {
632                localBroker.oneway(sub.getLocalInfo());
633            }
634        }
635    
636        protected void removeSubscription(final DemandSubscription sub) throws IOException {
637            if (sub != null) {
638                if (LOG.isDebugEnabled()) {
639                    LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
640                }
641                subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
642    
643                // continue removal in separate thread to free up this thread for outstanding responses
644                ASYNC_TASKS.execute(new Runnable() {
645                    public void run() {
646                        sub.waitForCompletion();
647                        try {
648                            localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
649                        } catch (IOException e) {
650                            LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
651                        }
652                    }
653                });
654            }
655        }
656    
657        protected Message configureMessage(MessageDispatch md) {
658            Message message = md.getMessage().copy();
659            // Update the packet to show where it came from.
660            message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
661            message.setProducerId(producerInfo.getProducerId());
662            message.setDestination(md.getDestination());
663            if (message.getOriginalTransactionId() == null) {
664                message.setOriginalTransactionId(message.getTransactionId());
665            }
666            message.setTransactionId(null);
667            return message;
668        }
669    
670        protected void serviceLocalCommand(Command command) {
671            if (!disposed.get()) {
672                try {
673                    if (command.isMessageDispatch()) {
674                        enqueueCounter.incrementAndGet();
675                        final MessageDispatch md = (MessageDispatch) command;
676                        final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
677                        if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
678                            // See if this consumer's brokerPath tells us it came from the broker at the other end
679                            // of the bridge. I think we should be making this decision based on the message's
680                            // broker bread crumbs and not the consumer's? However, the message's broker bread
681                            // crumbs are null, which is another matter.   
682                            boolean cameFromRemote = false;
683                            Object consumerInfo = md.getMessage().getDataStructure();
684                            if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo))
685                                cameFromRemote = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
686    
687                            Message message = configureMessage(md);
688                            if (LOG.isDebugEnabled()) {
689                                LOG.debug("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message);
690                            }
691    
692                            if (!message.isResponseRequired()) {
693    
694                                // If the message was originally sent using async
695                                // send, we will preserve that QOS
696                                // by bridging it using an async send (small chance
697                                // of message loss).
698    
699                                try {
700                                    // Don't send it off to the remote if it originally came from the remote. 
701                                    if (!cameFromRemote) {
702                                        remoteBroker.oneway(message);
703                                    } else {
704                                        if (LOG.isDebugEnabled()) {
705                                            LOG.debug("Message not forwarded on to remote, because message came from remote");
706                                        }
707                                    }
708    
709                                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
710                                    dequeueCounter.incrementAndGet();
711                                } finally {
712                                    sub.decrementOutstandingResponses();
713                                }
714    
715                            } else {
716    
717                                // The message was not sent using async send, so we
718                                // should only ack the local
719                                // broker when we get confirmation that the remote
720                                // broker has received the message.
721                                ResponseCallback callback = new ResponseCallback() {
722                                    public void onCompletion(FutureResponse future) {
723                                        try {
724                                            Response response = future.getResult();
725                                            if (response.isException()) {
726                                                ExceptionResponse er = (ExceptionResponse) response;
727                                                serviceLocalException(er.getException());
728                                            } else {
729                                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
730                                                dequeueCounter.incrementAndGet();
731    
732                                            }
733                                        } catch (IOException e) {
734                                            serviceLocalException(e);
735                                        } finally {
736                                            sub.decrementOutstandingResponses();
737                                        }
738                                    }
739                                };
740    
741                                remoteBroker.asyncRequest(message, callback);
742                            }
743    
744                        } else {
745                            if (LOG.isDebugEnabled()) {
746                                LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
747                            }
748                        }
749                    } else if (command.isBrokerInfo()) {
750                        localBrokerInfo = (BrokerInfo) command;
751                        serviceLocalBrokerInfo(command);
752                    } else if (command.isShutdownInfo()) {
753                        LOG.info(configuration.getBrokerName() + " Shutting down");
754                        // Don't shut down the whole connector if the remote side
755                        // was interrupted.
756                        // the local transport is just shutting down temporarily
757                        // until the remote side
758                        // is restored.
759                        if (!remoteInterupted.get()) {
760                            stop();
761                        }
762                    } else if (command.getClass() == ConnectionError.class) {
763                        ConnectionError ce = (ConnectionError) command;
764                        serviceLocalException(ce.getException());
765                    } else {
766                        switch (command.getDataStructureType()) {
767                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
768                            break;
769                        default:
770                            LOG.warn("Unexpected local command: " + command);
771                        }
772                    }
773                } catch (Throwable e) {
774                    LOG.warn("Caught an exception processing local command", e);
775                    serviceLocalException(e);
776                }
777            }
778        }
779    
780        /**
781         * @return Returns the dynamicallyIncludedDestinations.
782         */
783        public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
784            return dynamicallyIncludedDestinations;
785        }
786    
787        /**
788         * @param dynamicallyIncludedDestinations The
789         *            dynamicallyIncludedDestinations to set.
790         */
791        public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
792            this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
793        }
794    
795        /**
796         * @return Returns the excludedDestinations.
797         */
798        public ActiveMQDestination[] getExcludedDestinations() {
799            return excludedDestinations;
800        }
801    
802        /**
803         * @param excludedDestinations The excludedDestinations to set.
804         */
805        public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
806            this.excludedDestinations = excludedDestinations;
807        }
808    
809        /**
810         * @return Returns the staticallyIncludedDestinations.
811         */
812        public ActiveMQDestination[] getStaticallyIncludedDestinations() {
813            return staticallyIncludedDestinations;
814        }
815    
816        /**
817         * @param staticallyIncludedDestinations The staticallyIncludedDestinations
818         *            to set.
819         */
820        public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
821            this.staticallyIncludedDestinations = staticallyIncludedDestinations;
822        }
823    
824        /**
825         * @return Returns the durableDestinations.
826         */
827        public ActiveMQDestination[] getDurableDestinations() {
828            return durableDestinations;
829        }
830    
831        /**
832         * @param durableDestinations The durableDestinations to set.
833         */
834        public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
835            this.durableDestinations = durableDestinations;
836        }
837    
838        /**
839         * @return Returns the localBroker.
840         */
841        public Transport getLocalBroker() {
842            return localBroker;
843        }
844    
845        /**
846         * @return Returns the remoteBroker.
847         */
848        public Transport getRemoteBroker() {
849            return remoteBroker;
850        }
851    
852        /**
853         * @return the createdByDuplex
854         */
855        public boolean isCreatedByDuplex() {
856            return this.createdByDuplex;
857        }
858    
859        /**
860         * @param createdByDuplex the createdByDuplex to set
861         */
862        public void setCreatedByDuplex(boolean createdByDuplex) {
863            this.createdByDuplex = createdByDuplex;
864        }
865    
866        public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
867            if (brokerPath != null) {
868                for (int i = 0; i < brokerPath.length; i++) {
869                    if (brokerId.equals(brokerPath[i])) {
870                        return true;
871                    }
872                }
873            }
874            return false;
875        }
876    
877        protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
878            if (brokerPath == null || brokerPath.length == 0) {
879                return pathsToAppend;
880            }
881            BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
882            System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
883            System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
884            return rc;
885        }
886    
887        protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
888            if (brokerPath == null || brokerPath.length == 0) {
889                return new BrokerId[] { idToAppend };
890            }
891            BrokerId rc[] = new BrokerId[brokerPath.length + 1];
892            System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
893            rc[brokerPath.length] = idToAppend;
894            return rc;
895        }
896    
897        protected boolean isPermissableDestination(ActiveMQDestination destination) {
898            return isPermissableDestination(destination, false);
899        }
900    
901        protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
902            // Are we not bridging temp destinations?
903            if (destination.isTemporary()) {
904                if (allowTemporary) {
905                    return true;
906                } else {
907                    return configuration.isBridgeTempDestinations();
908                }
909            }
910    
911            final DestinationFilter filter = DestinationFilter.parseFilter(destination);
912    
913            ActiveMQDestination[] dests = excludedDestinations;
914            if (dests != null && dests.length > 0) {
915                for (int i = 0; i < dests.length; i++) {
916                    DestinationFilter exclusionFilter = filter;
917                    ActiveMQDestination match = dests[i];
918                    if (exclusionFilter instanceof org.apache.activemq.filter.SimpleDestinationFilter) {
919                        DestinationFilter newFilter = DestinationFilter.parseFilter(match);
920                        if (!(newFilter instanceof org.apache.activemq.filter.SimpleDestinationFilter)) {
921                            exclusionFilter = newFilter;
922                            match = destination;
923                        }
924                    }
925                    if (match != null && exclusionFilter.matches(match) && dests[i].getDestinationType() == destination.getDestinationType()) {
926                        return false;
927                    }
928                }
929            }
930            dests = dynamicallyIncludedDestinations;
931            if (dests != null && dests.length > 0) {
932                for (int i = 0; i < dests.length; i++) {
933                    DestinationFilter inclusionFilter = filter;
934                    ActiveMQDestination match = dests[i];
935                    if (inclusionFilter instanceof org.apache.activemq.filter.SimpleDestinationFilter) {
936                        DestinationFilter newFilter = DestinationFilter.parseFilter(match);
937                        if (!(newFilter instanceof org.apache.activemq.filter.SimpleDestinationFilter)) {
938                            inclusionFilter = newFilter;
939                            match = destination;
940                        }
941                    }
942                    if (match != null && inclusionFilter.matches(match) && dests[i].getDestinationType() == destination.getDestinationType()) {
943                        return true;
944                    }
945                }
946                return false;
947            }
948            return true;
949        }
950    
951        /**
952         * Subscriptions for these destinations are always created
953         */
954        protected void setupStaticDestinations() {
955            ActiveMQDestination[] dests = staticallyIncludedDestinations;
956            if (dests != null) {
957                for (int i = 0; i < dests.length; i++) {
958                    ActiveMQDestination dest = dests[i];
959                    DemandSubscription sub = createDemandSubscription(dest);
960                    try {
961                        addSubscription(sub);
962                    } catch (IOException e) {
963                        LOG.error("Failed to add static destination " + dest, e);
964                    }
965                    if (LOG.isTraceEnabled()) {
966                        LOG.trace("bridging messages for static destination: " + dest);
967                    }
968                }
969            }
970        }
971    
972        protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
973            boolean consumerAdded = false;
974            ConsumerInfo info = consumerInfo.copy();
975            addRemoteBrokerToBrokerPath(info);
976            DemandSubscription sub = createDemandSubscription(info);
977            if (sub != null) {
978                if (duplicateSuppressionIsRequired(sub)) {
979                    undoMapRegistration(sub);
980                } else {
981                    addSubscription(sub);
982                    consumerAdded = true;
983                }
984            }
985            return consumerAdded;
986        }
987    
988        private void undoMapRegistration(DemandSubscription sub) {
989            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
990            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
991        }
992    
993        /*
994         * check our existing subs networkConsumerIds against the list of network ids in this subscription
995         * A match means a duplicate which we suppress for topics and maybe for queues
996         */
997        private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
998            final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
999            boolean suppress = false;
1000    
1001            if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()) {
1002                return suppress;
1003            }
1004    
1005            List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1006            Collection<Subscription> currentSubs = 
1007                getRegionSubscriptions(consumerInfo.getDestination().isTopic());
1008            for (Subscription sub : currentSubs) {
1009                List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1010                if (!networkConsumers.isEmpty()) {
1011                    if (matchFound(candidateConsumers, networkConsumers)) {
1012                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1013                        break;
1014                    }
1015                }
1016            }
1017            return suppress;
1018        }
1019    
1020        private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1021            boolean suppress = false;
1022    
1023            if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1024                if (LOG.isDebugEnabled()) {
1025                    LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
1026                            + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " 
1027                            + existingSub.getConsumerInfo()  + ", networkComsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
1028                }
1029                suppress = true;
1030            } else {
1031                // remove the existing lower priority duplicate and allow this candidate
1032                try {
1033                    removeDuplicateSubscription(existingSub);
1034    
1035                    if (LOG.isDebugEnabled()) {
1036                        LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
1037                                + " with sub from " + remoteBrokerName
1038                                + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: " 
1039                                + candidateInfo.getNetworkConsumerIds());
1040                    }
1041                } catch (IOException e) {
1042                    LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e);
1043                }
1044            }
1045            return suppress;
1046        }
1047    
1048        private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1049            for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1050                if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1051                    break;
1052                }
1053            }
1054        }
1055    
1056        private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1057            boolean found = false;
1058            for (ConsumerId aliasConsumer : networkConsumers) {
1059                if (candidateConsumers.contains(aliasConsumer)) {
1060                    found = true;
1061                    break;
1062                }
1063            }
1064            return found;
1065        }
1066    
1067        private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) {
1068            RegionBroker region = (RegionBroker) brokerService.getRegionBroker();
1069            AbstractRegion abstractRegion = (AbstractRegion) 
1070                (isTopic ? region.getTopicRegion() : region.getQueueRegion());
1071            return abstractRegion.getSubscriptions().values();
1072        }
1073    
1074        protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1075            //add our original id to ourselves
1076            info.addNetworkConsumerId(info.getConsumerId());
1077            return doCreateDemandSubscription(info);
1078        }
1079    
1080        protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1081            DemandSubscription result = new DemandSubscription(info);
1082            result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1083            if (info.getDestination().isTemporary()) {
1084                // reset the local connection Id
1085    
1086                ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1087                dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1088            }
1089    
1090            if (configuration.isDecreaseNetworkConsumerPriority()) {
1091                byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
1092                if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1093                    // The longer the path to the consumer, the less it's consumer priority.
1094                    priority -= info.getBrokerPath().length + 1;
1095                }
1096                result.getLocalInfo().setPriority(priority);
1097                if (LOG.isDebugEnabled()) {
1098                    LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
1099                }
1100            }
1101            configureDemandSubscription(info, result);
1102            return result;
1103        }
1104    
1105        final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1106            ConsumerInfo info = new ConsumerInfo();
1107            info.setDestination(destination);
1108            // the remote info held by the DemandSubscription holds the original
1109            // consumerId,
1110            // the local info get's overwritten
1111    
1112            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1113            DemandSubscription result = null;
1114            try {
1115                result = createDemandSubscription(info);
1116            } catch (IOException e) {
1117                LOG.error("Failed to create DemandSubscription ", e);
1118            }
1119            if (result != null) {
1120                result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
1121            }
1122            return result;
1123        }
1124    
1125        protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1126            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1127            sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1128            subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1129            subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1130    
1131            // This works for now since we use a VM connection to the local broker.
1132            // may need to change if we ever subscribe to a remote broker.
1133            sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
1134        }
1135    
1136        protected void removeDemandSubscription(ConsumerId id) throws IOException {
1137            DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1138            if (LOG.isDebugEnabled()) {
1139                LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub);
1140            }
1141            if (sub != null) {
1142                removeSubscription(sub);
1143                if (LOG.isDebugEnabled()) {
1144                    LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " :  " + sub.getRemoteInfo());
1145                }
1146            }
1147        }
1148    
1149        protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1150            boolean removeDone = false;
1151            DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1152            if (sub != null) {
1153                try {
1154                    removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1155                    removeDone = true;
1156                } catch (IOException e) {
1157                    LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
1158                }
1159            }
1160            return removeDone;
1161        }
1162    
1163        protected void waitStarted() throws InterruptedException {
1164            startedLatch.await();
1165            localBrokerIdKnownLatch.await();
1166        }
1167    
1168        protected void clearDownSubscriptions() {
1169            subscriptionMapByLocalId.clear();
1170            subscriptionMapByRemoteId.clear();
1171        }
1172    
1173        protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException;
1174    
1175        protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException;
1176    
1177        protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException;
1178    
1179        protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
1180    
1181        protected abstract BrokerId[] getRemoteBrokerPath();
1182    
1183        public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1184            this.networkBridgeListener = listener;
1185        }
1186    
1187        private void fireBridgeFailed() {
1188            NetworkBridgeListener l = this.networkBridgeListener;
1189            if (l != null) {
1190                l.bridgeFailed();
1191            }
1192        }
1193    
1194        public String getRemoteAddress() {
1195            return remoteBroker.getRemoteAddress();
1196        }
1197    
1198        public String getLocalAddress() {
1199            return localBroker.getRemoteAddress();
1200        }
1201    
1202        public String getRemoteBrokerName() {
1203            return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1204        }
1205    
1206        public String getLocalBrokerName() {
1207            return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1208        }
1209    
1210        public long getDequeueCounter() {
1211            return dequeueCounter.get();
1212        }
1213    
1214        public long getEnqueueCounter() {
1215            return enqueueCounter.get();
1216        }
1217    
1218        protected boolean isDuplex() {
1219            return configuration.isDuplex() || createdByDuplex;
1220        }
1221    
1222        public void setBrokerService(BrokerService brokerService) {
1223            this.brokerService = brokerService;
1224        }
1225    
1226        static {
1227            ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
1228                public Thread newThread(Runnable runnable) {
1229                    Thread thread = new Thread(runnable, "NetworkBridge");
1230                    thread.setDaemon(true);
1231                    return thread;
1232                }
1233            });
1234        }
1235    
1236    }