001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.concurrent.ConcurrentHashMap;
025    
026    import org.apache.activemq.broker.SslContext;
027    import org.apache.activemq.command.DiscoveryEvent;
028    import org.apache.activemq.transport.Transport;
029    import org.apache.activemq.transport.TransportFactory;
030    import org.apache.activemq.transport.discovery.DiscoveryAgent;
031    import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
032    import org.apache.activemq.transport.discovery.DiscoveryListener;
033    import org.apache.activemq.util.IntrospectionSupport;
034    import org.apache.activemq.util.ServiceStopper;
035    import org.apache.activemq.util.ServiceSupport;
036    import org.apache.activemq.util.URISupport;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    
040    /**
041     * A network connector which uses a discovery agent to detect the remote brokers
042     * available and setup a connection to each available remote broker
043     * 
044     * @org.apache.xbean.XBean element="networkConnector"
045     * @version $Revision: 824823 $
046     */
047    public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener {
048        private static final Log LOG = LogFactory.getLog(DiscoveryNetworkConnector.class);
049    
050        private DiscoveryAgent discoveryAgent;
051        
052        private Map<String, String> parameters;
053        
054        public DiscoveryNetworkConnector() {
055        }
056    
057        public DiscoveryNetworkConnector(URI discoveryURI) throws IOException {
058            setUri(discoveryURI);
059        }
060    
061        public void setUri(URI discoveryURI) throws IOException {
062            setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI));
063            try {
064                parameters = URISupport.parseParamters(discoveryURI);
065                // allow discovery agent to grab it's parameters
066                IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters);
067            } catch (URISyntaxException e) {
068                LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI, e);
069            }  
070            
071        }
072    
073        public void onServiceAdd(DiscoveryEvent event) {
074            String localURIName = localURI.getScheme() + "://" + localURI.getHost();
075            // Ignore events once we start stopping.
076            if (serviceSupport.isStopped() || serviceSupport.isStopping()) {
077                return;
078            }
079            String url = event.getServiceName();
080            if (url != null) {
081                URI uri;
082                try {
083                    uri = new URI(url);
084                } catch (URISyntaxException e) {
085                    LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
086                    return;
087                }
088                // Should we try to connect to that URI?
089                if( bridges.containsKey(uri) ) {
090                    LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri );
091                    return;
092                }
093                if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri))) {
094                    LOG.debug("not connecting loopback: " + uri);
095                    return;
096                }
097                URI connectUri = uri;
098                try {
099                    connectUri = URISupport.applyParameters(connectUri, parameters);
100                } catch (URISyntaxException e) {
101                    LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e);
102                }
103                LOG.info("Establishing network connection from " + localURIName + " to " + connectUri);
104    
105                Transport remoteTransport;
106                Transport localTransport;
107                try {
108                    // Allows the transport to access the broker's ssl configuration.
109                    SslContext.setCurrentSslContext(getBrokerService().getSslContext());
110                    try {
111                        remoteTransport = TransportFactory.connect(connectUri);
112                    } catch (Exception e) {
113                        LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage());
114                        LOG.debug("Connection failure exception: " + e, e);
115                        return;
116                    }
117                    try {
118                        localTransport = createLocalTransport();
119                    } catch (Exception e) {
120                        ServiceSupport.dispose(remoteTransport);
121                        LOG.warn("Could not connect to local URI: " + localURIName + ": " + e.getMessage());
122                        LOG.debug("Connection failure exception: " + e, e);
123                        return;
124                    }
125                } finally {
126                    SslContext.setCurrentSslContext(null);
127                }
128                NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
129                try {
130                    bridge.start();
131                    bridges.put(uri, bridge);
132                } catch (Exception e) {
133                    ServiceSupport.dispose(localTransport);
134                    ServiceSupport.dispose(remoteTransport);
135                    LOG.warn("Could not start network bridge between: " + localURIName + " and: " + uri + " due to: " + e);
136                    LOG.debug("Start failure exception: " + e, e);
137                    try {
138                        discoveryAgent.serviceFailed(event);
139                    } catch (IOException e1) {
140                        LOG.debug("Discovery agent failure while handling failure event: " + e1.getMessage(), e1);
141                    }
142                    return;
143                }
144            }
145        }
146    
147        public void onServiceRemove(DiscoveryEvent event) {
148            String url = event.getServiceName();
149            if (url != null) {
150                URI uri;
151                try {
152                    uri = new URI(url);
153                } catch (URISyntaxException e) {
154                    LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
155                    return;
156                }
157    
158                NetworkBridge bridge = bridges.remove(uri);
159                if (bridge == null) {
160                    return;
161                }
162    
163                ServiceSupport.dispose(bridge);
164            }
165        }
166    
167        public DiscoveryAgent getDiscoveryAgent() {
168            return discoveryAgent;
169        }
170    
171        public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
172            this.discoveryAgent = discoveryAgent;
173            if (discoveryAgent != null) {
174                this.discoveryAgent.setDiscoveryListener(this);
175            }
176        }
177    
178        protected void handleStart() throws Exception {
179            if (discoveryAgent == null) {
180                throw new IllegalStateException("You must configure the 'discoveryAgent' property");
181            }
182            this.discoveryAgent.start();
183            super.handleStart();
184        }
185    
186        protected void handleStop(ServiceStopper stopper) throws Exception {
187            for (Iterator<NetworkBridge> i = bridges.values().iterator(); i.hasNext();) {
188                NetworkBridge bridge = i.next();
189                try {
190                    bridge.stop();
191                } catch (Exception e) {
192                    stopper.onException(this, e);
193                }
194            }
195            try {
196                this.discoveryAgent.stop();
197            } catch (Exception e) {
198                stopper.onException(this, e);
199            }
200    
201            super.handleStop(stopper);
202        }
203    
204        protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
205            NetworkBridgeListener listener = new NetworkBridgeListener() {
206    
207                public void bridgeFailed() {
208                    if (!serviceSupport.isStopped()) {
209                        try {
210                            discoveryAgent.serviceFailed(event);
211                        } catch (IOException e) {
212                        }
213                    }
214    
215                }
216    
217                public void onStart(NetworkBridge bridge) {
218                    registerNetworkBridgeMBean(bridge);
219                }
220    
221                public void onStop(NetworkBridge bridge) {
222                    unregisterNetworkBridgeMBean(bridge);
223                }
224    
225            };
226            DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener);
227            result.setBrokerService(getBrokerService());
228            return configureBridge(result);
229        }
230    
231        public String getName() {
232            String name = super.getName();
233            if (name == null) {
234                name = discoveryAgent.toString();
235                super.setName(name);
236            }
237            return name;
238        }
239    
240        @Override
241        public String toString() {
242            return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService();
243        }
244    }