001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.net.URI; 020 import java.net.URISyntaxException; 021 import java.util.Collection; 022 import java.util.HashMap; 023 import java.util.HashSet; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Set; 027 import java.util.concurrent.ConcurrentHashMap; 028 import java.util.concurrent.CopyOnWriteArrayList; 029 030 import javax.management.MalformedObjectNameException; 031 import javax.management.ObjectName; 032 033 import org.apache.activemq.Service; 034 import org.apache.activemq.broker.BrokerService; 035 import org.apache.activemq.broker.jmx.AnnotatedMBean; 036 import org.apache.activemq.broker.jmx.NetworkBridgeView; 037 import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean; 038 import org.apache.activemq.command.ActiveMQDestination; 039 import org.apache.activemq.command.ConsumerId; 040 import org.apache.activemq.transport.Transport; 041 import org.apache.activemq.transport.TransportFactory; 042 import org.apache.activemq.util.JMXSupport; 043 import org.apache.activemq.util.ServiceStopper; 044 import org.apache.activemq.util.ServiceSupport; 045 import org.apache.commons.logging.Log; 046 import org.apache.commons.logging.LogFactory; 047 048 /** 049 * @version $Revision$ 050 */ 051 public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service { 052 053 private static final Log LOG = LogFactory.getLog(NetworkConnector.class); 054 protected URI localURI; 055 protected ConnectionFilter connectionFilter; 056 protected ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>(); 057 058 protected ServiceSupport serviceSupport = new ServiceSupport() { 059 060 protected void doStart() throws Exception { 061 handleStart(); 062 } 063 064 protected void doStop(ServiceStopper stopper) throws Exception { 065 handleStop(stopper); 066 } 067 }; 068 069 private Set<ActiveMQDestination> durableDestinations; 070 private List<ActiveMQDestination> excludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>(); 071 private List<ActiveMQDestination> dynamicallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>(); 072 private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>(); 073 private BrokerService brokerService; 074 private ObjectName objectName; 075 076 public NetworkConnector() { 077 } 078 079 public NetworkConnector(URI localURI) { 080 this.localURI = localURI; 081 } 082 083 public URI getLocalUri() throws URISyntaxException { 084 return localURI; 085 } 086 087 public void setLocalUri(URI localURI) { 088 this.localURI = localURI; 089 } 090 091 /** 092 * @return Returns the durableDestinations. 093 */ 094 public Set getDurableDestinations() { 095 return durableDestinations; 096 } 097 098 /** 099 * @param durableDestinations The durableDestinations to set. 100 */ 101 public void setDurableDestinations(Set<ActiveMQDestination> durableDestinations) { 102 this.durableDestinations = durableDestinations; 103 } 104 105 /** 106 * @return Returns the excludedDestinations. 107 */ 108 public List<ActiveMQDestination> getExcludedDestinations() { 109 return excludedDestinations; 110 } 111 112 /** 113 * @param excludedDestinations The excludedDestinations to set. 114 */ 115 public void setExcludedDestinations(List<ActiveMQDestination> excludedDestinations) { 116 this.excludedDestinations = excludedDestinations; 117 } 118 119 public void addExcludedDestination(ActiveMQDestination destiantion) { 120 this.excludedDestinations.add(destiantion); 121 } 122 123 /** 124 * @return Returns the staticallyIncludedDestinations. 125 */ 126 public List<ActiveMQDestination> getStaticallyIncludedDestinations() { 127 return staticallyIncludedDestinations; 128 } 129 130 /** 131 * @param staticallyIncludedDestinations The staticallyIncludedDestinations 132 * to set. 133 */ 134 public void setStaticallyIncludedDestinations(List<ActiveMQDestination> staticallyIncludedDestinations) { 135 this.staticallyIncludedDestinations = staticallyIncludedDestinations; 136 } 137 138 public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) { 139 this.staticallyIncludedDestinations.add(destiantion); 140 } 141 142 /** 143 * @return Returns the dynamicallyIncludedDestinations. 144 */ 145 public List<ActiveMQDestination> getDynamicallyIncludedDestinations() { 146 return dynamicallyIncludedDestinations; 147 } 148 149 /** 150 * @param dynamicallyIncludedDestinations The 151 * dynamicallyIncludedDestinations to set. 152 */ 153 public void setDynamicallyIncludedDestinations(List<ActiveMQDestination> dynamicallyIncludedDestinations) { 154 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; 155 } 156 157 public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) { 158 this.dynamicallyIncludedDestinations.add(destiantion); 159 } 160 161 public ConnectionFilter getConnectionFilter() { 162 return connectionFilter; 163 } 164 165 public void setConnectionFilter(ConnectionFilter connectionFilter) { 166 this.connectionFilter = connectionFilter; 167 } 168 169 // Implementation methods 170 // ------------------------------------------------------------------------- 171 protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) { 172 List<ActiveMQDestination> destsList = getDynamicallyIncludedDestinations(); 173 ActiveMQDestination dests[] = destsList.toArray(new ActiveMQDestination[destsList.size()]); 174 result.setDynamicallyIncludedDestinations(dests); 175 destsList = getExcludedDestinations(); 176 dests = destsList.toArray(new ActiveMQDestination[destsList.size()]); 177 result.setExcludedDestinations(dests); 178 destsList = getStaticallyIncludedDestinations(); 179 dests = destsList.toArray(new ActiveMQDestination[destsList.size()]); 180 result.setStaticallyIncludedDestinations(dests); 181 if (durableDestinations != null) { 182 183 HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>(); 184 for (ActiveMQDestination d : durableDestinations) { 185 if( d.isTopic() ) { 186 topics.add(d); 187 } 188 } 189 190 ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()]; 191 dest = (ActiveMQDestination[])topics.toArray(dest); 192 result.setDurableDestinations(dest); 193 } 194 return result; 195 } 196 197 protected Transport createLocalTransport() throws Exception { 198 return TransportFactory.connect(localURI); 199 } 200 201 public void start() throws Exception { 202 serviceSupport.start(); 203 } 204 205 public void stop() throws Exception { 206 serviceSupport.stop(); 207 } 208 209 protected void handleStart() throws Exception { 210 if (localURI == null) { 211 throw new IllegalStateException("You must configure the 'localURI' property"); 212 } 213 LOG.info("Network Connector " + getName() + " Started"); 214 } 215 216 protected void handleStop(ServiceStopper stopper) throws Exception { 217 LOG.info("Network Connector " + getName() + " Stopped"); 218 } 219 220 public ObjectName getObjectName() { 221 return objectName; 222 } 223 224 public void setObjectName(ObjectName objectName) { 225 this.objectName = objectName; 226 } 227 228 public BrokerService getBrokerService() { 229 return brokerService; 230 } 231 232 public void setBrokerService(BrokerService brokerService) { 233 this.brokerService = brokerService; 234 } 235 236 protected void registerNetworkBridgeMBean(NetworkBridge bridge) { 237 if (!getBrokerService().isUseJmx()) { 238 return; 239 } 240 NetworkBridgeViewMBean view = new NetworkBridgeView(bridge); 241 try { 242 ObjectName objectName = createNetworkBridgeObjectName(bridge); 243 AnnotatedMBean.registerMBean(getBrokerService().getManagementContext(), view, objectName); 244 } catch (Throwable e) { 245 LOG.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e); 246 } 247 } 248 249 protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) { 250 if (!getBrokerService().isUseJmx()) { 251 return; 252 } 253 try { 254 ObjectName objectName = createNetworkBridgeObjectName(bridge); 255 getBrokerService().getManagementContext().unregisterMBean(objectName); 256 } catch (Throwable e) { 257 LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e); 258 } 259 } 260 261 262 @SuppressWarnings("unchecked") 263 protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException { 264 ObjectName connectorName = getObjectName(); 265 Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList()); 266 return new ObjectName(connectorName.getDomain() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName")) + "," + "Type=NetworkBridge," 267 + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart((String)map.get("NetworkConnectorName")) + "," + "Name=" 268 + JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress()))); 269 } 270 271 // ask all the bridges as we can't know to which this consumer is tied 272 public boolean removeDemandSubscription(ConsumerId consumerId) { 273 boolean removeSucceeded = false; 274 for (NetworkBridge bridge : bridges.values()) { 275 if (bridge instanceof DemandForwardingBridgeSupport) { 276 DemandForwardingBridgeSupport demandBridge = (DemandForwardingBridgeSupport) bridge; 277 if (demandBridge.removeDemandSubscriptionByLocalId(consumerId)) { 278 removeSucceeded = true; 279 break; 280 } 281 } 282 } 283 return removeSucceeded; 284 } 285 286 public Collection<NetworkBridge> activeBridges() { 287 return bridges.values(); 288 } 289 290 }