001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.io.IOException; 020 021 import org.apache.activemq.command.BrokerId; 022 import org.apache.activemq.command.BrokerInfo; 023 import org.apache.activemq.command.Command; 024 import org.apache.activemq.command.ConsumerInfo; 025 import org.apache.activemq.command.Endpoint; 026 import org.apache.activemq.command.NetworkBridgeFilter; 027 import org.apache.activemq.transport.Transport; 028 import org.apache.activemq.util.ServiceSupport; 029 import org.apache.commons.logging.Log; 030 import org.apache.commons.logging.LogFactory; 031 032 /** 033 * A demand forwarding bridge which works with multicast style transports where 034 * a single Transport could be communicating with multiple remote brokers 035 * 036 * @org.apache.xbean.XBean 037 * 038 * @version $Revision: 808890 $ 039 */ 040 public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSupport { 041 private static final Log LOG = LogFactory.getLog(CompositeDemandForwardingBridge.class); 042 043 protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null}; 044 protected Object brokerInfoMutex = new Object(); 045 046 public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker, 047 Transport remoteBroker) { 048 super(configuration, localBroker, remoteBroker); 049 remoteBrokerName = remoteBroker.toString(); 050 remoteBrokerNameKnownLatch.countDown(); 051 } 052 053 protected void serviceRemoteBrokerInfo(Command command) throws IOException { 054 synchronized (brokerInfoMutex) { 055 BrokerInfo remoteBrokerInfo = (BrokerInfo)command; 056 BrokerId remoteBrokerId = remoteBrokerInfo.getBrokerId(); 057 058 // lets associate the incoming endpoint with a broker ID so we can 059 // refer to it later 060 Endpoint from = command.getFrom(); 061 if (from == null) { 062 LOG.warn("Incoming command does not have a from endpoint: " + command); 063 } else { 064 from.setBrokerInfo(remoteBrokerInfo); 065 } 066 if (localBrokerId != null) { 067 if (localBrokerId.equals(remoteBrokerId)) { 068 LOG.info("Disconnecting loop back connection."); 069 // waitStarted(); 070 ServiceSupport.dispose(this); 071 } 072 } 073 if (!disposed.get()) { 074 triggerLocalStartBridge(); 075 } 076 } 077 } 078 079 protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException { 080 info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getFromBrokerId(info))); 081 } 082 083 /** 084 * Returns the broker ID that the command came from 085 */ 086 protected BrokerId getFromBrokerId(Command command) throws IOException { 087 BrokerId answer = null; 088 Endpoint from = command.getFrom(); 089 if (from == null) { 090 LOG.warn("Incoming command does not have a from endpoint: " + command); 091 } else { 092 answer = from.getBrokerId(); 093 } 094 if (answer != null) { 095 return answer; 096 } else { 097 throw new IOException("No broker ID is available for endpoint: " + from + " from command: " 098 + command); 099 } 100 } 101 102 protected void serviceLocalBrokerInfo(Command command) throws InterruptedException { 103 // TODO is there much we can do here? 104 } 105 106 protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { 107 return new NetworkBridgeFilter(getFromBrokerId(info), configuration.getNetworkTTL()); 108 } 109 110 protected BrokerId[] getRemoteBrokerPath() { 111 return remoteBrokerPath; 112 } 113 114 }