001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.net.URI; 020 import java.util.Hashtable; 021 import java.util.Map; 022 import java.util.Random; 023 import java.util.concurrent.ConcurrentHashMap; 024 025 import javax.naming.CommunicationException; 026 import javax.naming.Context; 027 import javax.naming.NamingEnumeration; 028 import javax.naming.directory.Attributes; 029 import javax.naming.directory.DirContext; 030 import javax.naming.directory.InitialDirContext; 031 import javax.naming.directory.SearchControls; 032 import javax.naming.directory.SearchResult; 033 import javax.naming.event.EventDirContext; 034 import javax.naming.event.NamespaceChangeListener; 035 import javax.naming.event.NamingEvent; 036 import javax.naming.event.NamingExceptionEvent; 037 import javax.naming.event.ObjectChangeListener; 038 039 import org.apache.activemq.util.URISupport; 040 import org.apache.activemq.util.URISupport.CompositeData; 041 import org.apache.commons.logging.Log; 042 import org.apache.commons.logging.LogFactory; 043 044 /** 045 * class to create dynamic network connectors listed in an directory 046 * server using the LDAP v3 protocol as defined in RFC 2251, the 047 * entries listed in the directory server must implement the ipHost 048 * and ipService objectClasses as defined in RFC 2307. 049 * 050 * @author Trevor Pounds 051 * @see <a href="http://www.faqs.org/rfcs/rfc2251.html">RFC 2251</a> 052 * @see <a href="http://www.faqs.org/rfcs/rfc2307.html">RFC 2307</a> 053 * 054 * @org.apache.xbean.XBean element="ldapNetworkConnector" 055 */ 056 public class LdapNetworkConnector 057 extends NetworkConnector 058 implements NamespaceChangeListener, 059 ObjectChangeListener 060 { 061 private static final Log LOG = LogFactory.getLog(LdapNetworkConnector.class); 062 063 // force returned entries to implement the ipHost and ipService object classes (RFC 2307) 064 private static final String REQUIRED_OBJECT_CLASS_FILTER = "(&(objectClass=ipHost)(objectClass=ipService))"; 065 066 // connection 067 private URI[] availableURIs = null; 068 private int availableURIsIndex = 0; 069 private String base = null; 070 private boolean failover = false; 071 private long curReconnectDelay = 1000; /* 1 sec */ 072 private long maxReconnectDelay = 30000; /* 30 sec */ 073 074 // authentication 075 private String user = null; 076 private String password = null; 077 private boolean anonymousAuthentication = false; 078 079 // search 080 private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */); 081 private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER; 082 private boolean searchEventListener = false; 083 084 // connector management 085 private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap(); 086 private Map<URI, Integer> referenceMap = new ConcurrentHashMap(); 087 private Map<String, URI> uuidMap = new ConcurrentHashMap(); 088 089 // local context 090 private DirContext context = null; 091 //currently in use URI 092 private URI ldapURI = null; 093 094 /** 095 * returns the next URI from the configured list 096 * 097 * @return random URI from the configured list 098 */ 099 public URI getUri() 100 { return availableURIs[++availableURIsIndex % availableURIs.length]; } 101 102 /** 103 * sets the LDAP server URI 104 * 105 * @param _uri LDAP server URI 106 */ 107 public void setUri(URI _uri) 108 throws Exception 109 { 110 CompositeData data = URISupport.parseComposite(_uri); 111 if(data.getScheme().equals("failover")) 112 { 113 availableURIs = data.getComponents(); 114 failover = true; 115 } 116 else 117 { availableURIs = new URI[]{ _uri }; } 118 } 119 120 /** 121 * sets the base LDAP dn used for lookup operations 122 * 123 * @param _base LDAP base dn 124 */ 125 public void setBase(String _base) 126 { base = _base; } 127 128 /** 129 * sets the LDAP user for access credentials 130 * 131 * @param _user LDAP dn of user 132 */ 133 public void setUser(String _user) 134 { user = _user; } 135 136 /** 137 * sets the LDAP password for access credentials 138 * 139 * @param _password user password 140 */ 141 public void setPassword(String _password) 142 { password = _password; } 143 144 /** 145 * sets LDAP anonymous authentication access credentials 146 * 147 * @param _anonymousAuthentication set to true to use anonymous authentication 148 */ 149 public void setAnonymousAuthentication(boolean _anonymousAuthentication) 150 { anonymousAuthentication = _anonymousAuthentication; } 151 152 /** 153 * sets the LDAP search scope 154 * 155 * @param _searchScope LDAP JNDI search scope 156 */ 157 public void setSearchScope(String _searchScope) 158 throws Exception 159 { 160 int scope; 161 if(_searchScope.equals("OBJECT_SCOPE")) 162 { scope = SearchControls.OBJECT_SCOPE; } 163 else if(_searchScope.equals("ONELEVEL_SCOPE")) 164 { scope = SearchControls.ONELEVEL_SCOPE; } 165 else if(_searchScope.equals("SUBTREE_SCOPE")) 166 { scope = SearchControls.SUBTREE_SCOPE; } 167 else 168 { throw new Exception("ERR: unknown LDAP search scope specified: " + _searchScope); } 169 searchControls.setSearchScope(scope); 170 } 171 172 /** 173 * sets the LDAP search filter as defined in RFC 2254 174 * 175 * @param _searchFilter LDAP search filter 176 * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a> 177 */ 178 public void setSearchFilter(String _searchFilter) 179 { searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + _searchFilter + "))"; } 180 181 /** 182 * enables/disable a persistent search to the LDAP server as defined 183 * in draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3) 184 * 185 * @param _searchEventListener enable = true, disable = false (default) 186 * @see <a href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a> 187 */ 188 public void setSearchEventListener(boolean _searchEventListener) 189 { searchEventListener = _searchEventListener; } 190 191 /** 192 * start the connector 193 */ 194 public void start() 195 throws Exception 196 { 197 LOG.info("connecting..."); 198 Hashtable<String, String> env = new Hashtable(); 199 env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); 200 this.ldapURI = getUri(); 201 LOG.debug(" URI [" + this.ldapURI + "]"); 202 env.put(Context.PROVIDER_URL, this.ldapURI.toString()); 203 if(anonymousAuthentication) 204 { 205 LOG.debug(" login credentials [anonymous]"); 206 env.put(Context.SECURITY_AUTHENTICATION, "none"); 207 } 208 else 209 { 210 LOG.debug(" login credentials [" + user + ":******]"); 211 env.put(Context.SECURITY_PRINCIPAL, user); 212 env.put(Context.SECURITY_CREDENTIALS, password); 213 } 214 boolean isConnected = false; 215 while(!isConnected) 216 { 217 try 218 { 219 context = new InitialDirContext(env); 220 isConnected = true; 221 } 222 catch(CommunicationException err) 223 { 224 if(failover) 225 { 226 this.ldapURI = getUri(); 227 LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover connection to [" + this.ldapURI.toString() + "]"); 228 env.put(Context.PROVIDER_URL, this.ldapURI.toString()); 229 Thread.sleep(curReconnectDelay); 230 curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay); 231 } 232 else 233 { throw err; } 234 } 235 } 236 237 // add connectors from search results 238 LOG.info("searching for network connectors..."); 239 LOG.debug(" base [" + base + "]"); 240 LOG.debug(" filter [" + searchFilter + "]"); 241 LOG.debug(" scope [" + searchControls.getSearchScope() + "]"); 242 NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls); 243 while(results.hasMore()) 244 { addConnector(results.next()); } 245 246 // register persistent search event listener 247 if(searchEventListener) 248 { 249 LOG.info("registering persistent search listener..."); 250 EventDirContext eventContext = (EventDirContext)context.lookup(""); 251 eventContext.addNamingListener(base, searchFilter, searchControls, this); 252 } 253 else // otherwise close context (i.e. connection as it is no longer needed) 254 { context.close(); } 255 } 256 257 /** 258 * stop the connector 259 */ 260 public void stop() 261 throws Exception 262 { 263 LOG.info("stopping context..."); 264 for(NetworkConnector connector : connectorMap.values()) 265 { connector.stop(); } 266 connectorMap.clear(); 267 referenceMap.clear(); 268 uuidMap.clear(); 269 context.close(); 270 } 271 272 /** 273 * returns the name of the connector 274 * 275 * @return connector name 276 */ 277 public String getName() { 278 279 String name = super.getName(); 280 if (name == null) { 281 name = this.getClass().getName() + " [" + ldapURI.toString() + "]"; 282 super.setName(name); 283 } 284 return name; 285 } 286 287 /** 288 * add connector of the given URI 289 * 290 * @param result 291 * search result of connector to add 292 */ 293 protected synchronized void addConnector(SearchResult result) 294 throws Exception 295 { 296 String uuid = toUUID(result); 297 if(uuidMap.containsKey(uuid)) 298 { 299 LOG.warn("connector already regsitered for UUID [" + uuid + "]"); 300 return; 301 } 302 303 URI connectorURI = toURI(result); 304 if(connectorMap.containsKey(connectorURI)) 305 { 306 int referenceCount = referenceMap.get(connectorURI) + 1; 307 LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid + "], total reference(s) [" + referenceCount + "]"); 308 referenceMap.put(connectorURI, referenceCount); 309 uuidMap.put(uuid, connectorURI); 310 return; 311 } 312 313 // FIXME: disable JMX listing of LDAP managed connectors, we will 314 // want to map/manage these differently in the future 315 // boolean useJMX = getBrokerService().isUseJmx(); 316 // getBrokerService().setUseJmx(false); 317 NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI); 318 // getBrokerService().setUseJmx(useJMX); 319 320 // propogate std connector properties that may have been set via XML 321 connector.setDynamicOnly(isDynamicOnly()); 322 connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority()); 323 connector.setNetworkTTL(getNetworkTTL()); 324 connector.setConduitSubscriptions(isConduitSubscriptions()); 325 connector.setExcludedDestinations(getExcludedDestinations()); 326 connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations()); 327 connector.setDuplex(isDuplex()); 328 329 // XXX: set in the BrokerService.startAllConnectors method and is 330 // required to prevent remote broker exceptions upon connection 331 connector.setLocalUri(getBrokerService().getVmConnectorURI()); 332 connector.setBrokerName(getBrokerService().getBrokerName()); 333 connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations()); 334 335 // start network connector 336 connectorMap.put(connectorURI, connector); 337 referenceMap.put(connectorURI, 1); 338 uuidMap.put(uuid, connectorURI); 339 connector.start(); 340 LOG.info("connector added with URI [" + connectorURI + "]"); 341 } 342 343 /** 344 * remove connector of the given URI 345 * 346 * @param result search result of connector to remove 347 */ 348 protected synchronized void removeConnector(SearchResult result) 349 throws Exception 350 { 351 String uuid = toUUID(result); 352 if(!uuidMap.containsKey(uuid)) 353 { 354 LOG.warn("connector not regsitered for UUID [" + uuid + "]"); 355 return; 356 } 357 358 URI connectorURI = uuidMap.get(uuid); 359 if(!connectorMap.containsKey(connectorURI)) 360 { 361 LOG.warn("connector not regisitered for URI [" + connectorURI + "]"); 362 return; 363 } 364 365 int referenceCount = referenceMap.get(connectorURI) - 1; 366 referenceMap.put(connectorURI, referenceCount); 367 uuidMap.remove(uuid); 368 LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]"); 369 370 if(referenceCount > 0) 371 { return; } 372 373 NetworkConnector connector = connectorMap.remove(connectorURI); 374 connector.stop(); 375 LOG.info("connector removed with URI [" + connectorURI + "]"); 376 } 377 378 /** 379 * convert search result into URI 380 * 381 * @param result search result to convert to URI 382 */ 383 protected URI toURI(SearchResult result) 384 throws Exception 385 { 386 Attributes attributes = result.getAttributes(); 387 String address = (String)attributes.get("iphostnumber").get(); 388 String port = (String)attributes.get("ipserviceport").get(); 389 String protocol = (String)attributes.get("ipserviceprotocol").get(); 390 URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")"); 391 LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]"); 392 return connectorURI; 393 } 394 395 /** 396 * convert search result into URI 397 * 398 * @param result search result to convert to URI 399 */ 400 protected String toUUID(SearchResult result) 401 { 402 String uuid = result.getNameInNamespace(); 403 LOG.debug("retrieved UUID from SearchResult [" + uuid + "]"); 404 return uuid; 405 } 406 407 /** 408 * invoked when an entry has been added during a persistent search 409 */ 410 public void objectAdded(NamingEvent event) 411 { 412 LOG.debug("entry added"); 413 try 414 { addConnector((SearchResult)event.getNewBinding()); } 415 catch(Exception err) 416 { LOG.error("ERR: caught unexpected exception", err); } 417 } 418 419 /** 420 * invoked when an entry has been removed during a persistent search 421 */ 422 public void objectRemoved(NamingEvent event) 423 { 424 LOG.debug("entry removed"); 425 try 426 { removeConnector((SearchResult)event.getOldBinding()); } 427 catch(Exception err) 428 { LOG.error("ERR: caught unexpected exception", err); } 429 } 430 431 /** 432 * invoked when an entry has been renamed during a persistent search 433 */ 434 public void objectRenamed(NamingEvent event) 435 { 436 LOG.debug("entry renamed"); 437 // XXX: getNameInNamespace method does not seem to work properly, 438 // but getName seems to provide the result we want 439 String uuidOld = event.getOldBinding().getName(); 440 String uuidNew = event.getNewBinding().getName(); 441 URI connectorURI = uuidMap.remove(uuidOld); 442 uuidMap.put(uuidNew, connectorURI); 443 LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]"); 444 } 445 446 /** 447 * invoked when an entry has been changed during a persistent search 448 */ 449 public void objectChanged(NamingEvent event) 450 { 451 LOG.debug("entry changed"); 452 try 453 { 454 SearchResult result = (SearchResult)event.getNewBinding(); 455 removeConnector(result); 456 addConnector(result); 457 } 458 catch(Exception err) 459 { LOG.error("ERR: caught unexpected exception", err); } 460 } 461 462 /** 463 * invoked when an exception has occurred during a persistent search 464 */ 465 public void namingExceptionThrown(NamingExceptionEvent event) 466 { LOG.error("ERR: caught unexpected exception", event.getException()); } 467 }