001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.vm;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.HashMap;
023    import java.util.Map;
024    import java.util.concurrent.ConcurrentHashMap;
025    
026    import org.apache.activemq.broker.BrokerFactory;
027    import org.apache.activemq.broker.BrokerFactoryHandler;
028    import org.apache.activemq.broker.BrokerRegistry;
029    import org.apache.activemq.broker.BrokerService;
030    import org.apache.activemq.broker.TransportConnector;
031    import org.apache.activemq.transport.MarshallingTransportFilter;
032    import org.apache.activemq.transport.Transport;
033    import org.apache.activemq.transport.TransportFactory;
034    import org.apache.activemq.transport.TransportServer;
035    import org.apache.activemq.util.IOExceptionSupport;
036    import org.apache.activemq.util.IntrospectionSupport;
037    import org.apache.activemq.util.ServiceSupport;
038    import org.apache.activemq.util.URISupport;
039    import org.apache.activemq.util.URISupport.CompositeData;
040    import org.apache.commons.logging.Log;
041    import org.apache.commons.logging.LogFactory;
042    
043    public class VMTransportFactory extends TransportFactory {
044        
045        public static final ConcurrentHashMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>();
046        public static final ConcurrentHashMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>();
047        public static final ConcurrentHashMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>();
048        private static final Log LOG = LogFactory.getLog(VMTransportFactory.class);
049        
050        BrokerFactoryHandler brokerFactoryHandler;
051    
052        public Transport doConnect(URI location) throws Exception {
053            return VMTransportServer.configure(doCompositeConnect(location));
054        }
055    
056        public Transport doCompositeConnect(URI location) throws Exception {
057            URI brokerURI;
058            String host;
059            Map<String, String> options;
060            boolean create = true;
061            int waitForStart = -1;
062            CompositeData data = URISupport.parseComposite(location);
063            if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) {
064                brokerURI = data.getComponents()[0];
065                CompositeData brokerData = URISupport.parseComposite(brokerURI);
066                host = (String)brokerData.getParameters().get("brokerName");
067                if (host == null) {
068                    host = "localhost";
069                }
070                if (brokerData.getPath() != null) {
071                    host = brokerData.getPath();
072                }
073                options = data.getParameters();
074                location = new URI("vm://" + host);
075            } else {
076                // If using the less complex vm://localhost?broker.persistent=true
077                // form
078                try {
079                    host = location.getHost();
080                    options = URISupport.parseParamters(location);
081                    String config = (String)options.remove("brokerConfig");
082                    if (config != null) {
083                        brokerURI = new URI(config);
084                    } else {
085                        Map brokerOptions = IntrospectionSupport.extractProperties(options, "broker.");
086                        brokerURI = new URI("broker://()/" + host + "?"
087                                            + URISupport.createQueryString(brokerOptions));
088                    }
089                    if ("false".equals(options.remove("create"))) {
090                        create = false;
091                    }
092                    String waitForStartString = options.remove("waitForStart");
093                    if (waitForStartString != null) {
094                        waitForStart = Integer.parseInt(waitForStartString);
095                    }
096                } catch (URISyntaxException e1) {
097                    throw IOExceptionSupport.create(e1);
098                }
099                location = new URI("vm://" + host);
100            }
101            if (host == null) {
102                host = "localhost";
103            }
104            VMTransportServer server = SERVERS.get(host);
105            // validate the broker is still active
106            if (!validateBroker(host) || server == null) {
107                BrokerService broker = null;
108                // Synchronize on the registry so that multiple concurrent threads
109                // doing this do not think that the broker has not been created and
110                // cause multiple brokers to be started.
111                synchronized (BrokerRegistry.getInstance().getRegistryMutext()) {
112                    broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart);
113                    if (broker == null) {
114                        if (!create) {
115                            throw new IOException("Broker named '" + host + "' does not exist.");
116                        }
117                        try {
118                            if (brokerFactoryHandler != null) {
119                                broker = brokerFactoryHandler.createBroker(brokerURI);
120                            } else {
121                                broker = BrokerFactory.createBroker(brokerURI);
122                            }
123                            broker.start();
124                        } catch (URISyntaxException e) {
125                            throw IOExceptionSupport.create(e);
126                        }
127                        BROKERS.put(host, broker);
128                        BrokerRegistry.getInstance().getRegistryMutext().notifyAll();
129                    }
130    
131                    server = SERVERS.get(host);
132                    if (server == null) {
133                        server = (VMTransportServer)bind(location, true);
134                        TransportConnector connector = new TransportConnector(server);
135                        connector.setBrokerService(broker);
136                        connector.setUri(location);
137                        connector.setTaskRunnerFactory(broker.getTaskRunnerFactory());
138                        connector.start();
139                        CONNECTORS.put(host, connector);
140                    }
141    
142                }
143            }
144    
145            VMTransport vmtransport = server.connect();
146            IntrospectionSupport.setProperties(vmtransport.peer, new HashMap<String,String>(options));
147            IntrospectionSupport.setProperties(vmtransport, options);
148            Transport transport = vmtransport;
149            if (vmtransport.isMarshal()) {
150                Map<String, String> optionsCopy = new HashMap<String, String>(options);
151                transport = new MarshallingTransportFilter(transport, createWireFormat(options),
152                                                           createWireFormat(optionsCopy));
153            }
154            if (!options.isEmpty()) {
155                throw new IllegalArgumentException("Invalid connect parameters: " + options);
156            }
157            return transport;
158        }
159    
160       /**
161        * @param registry
162        * @param brokerName
163        * @param waitForStart - time in milliseconds to wait for a broker to appear
164        * @return
165        */
166        private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) {
167            BrokerService broker = null;
168            synchronized(registry.getRegistryMutext()) {
169                broker = registry.lookup(brokerName);
170                if (broker == null && waitForStart > 0) {
171                    final long expiry = System.currentTimeMillis() + waitForStart;
172                    while (broker == null  && expiry > System.currentTimeMillis()) {
173                        long timeout = Math.max(0, expiry - System.currentTimeMillis());
174                        try {
175                            LOG.debug("waiting for broker named: " + brokerName + " to start");
176                            registry.getRegistryMutext().wait(timeout);
177                        } catch (InterruptedException ignored) {
178                        }
179                        broker = registry.lookup(brokerName);
180                    }
181                }
182            }
183            return broker;
184        }
185    
186        public TransportServer doBind(URI location) throws IOException {
187            return bind(location, false);
188        }
189    
190        /**
191         * @param location
192         * @return the TransportServer
193         * @throws IOException
194         */
195        private TransportServer bind(URI location, boolean dispose) throws IOException {
196            String host = location.getHost();
197            LOG.debug("binding to broker: " + host);
198            VMTransportServer server = new VMTransportServer(location, dispose);
199            Object currentBoundValue = SERVERS.get(host);
200            if (currentBoundValue != null) {
201                throw new IOException("VMTransportServer already bound at: " + location);
202            }
203            SERVERS.put(host, server);
204            return server;
205        }
206    
207        public static void stopped(VMTransportServer server) {
208            String host = server.getBindURI().getHost();
209            SERVERS.remove(host);
210            TransportConnector connector = CONNECTORS.remove(host);
211            if (connector != null) {
212                LOG.debug("Shutting down VM connectors for broker: " + host);
213                ServiceSupport.dispose(connector);
214                BrokerService broker = BROKERS.remove(host);
215                if (broker != null) {
216                    ServiceSupport.dispose(broker);
217                }
218            }
219        }
220    
221        public static void stopped(String host) {
222            SERVERS.remove(host);
223            TransportConnector connector = CONNECTORS.remove(host);
224            if (connector != null) {
225                LOG.debug("Shutting down VM connectors for broker: " + host);
226                ServiceSupport.dispose(connector);
227                BrokerService broker = BROKERS.remove(host);
228                if (broker != null) {
229                    ServiceSupport.dispose(broker);
230                }
231            }
232        }
233    
234        public BrokerFactoryHandler getBrokerFactoryHandler() {
235            return brokerFactoryHandler;
236        }
237    
238        public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler) {
239            this.brokerFactoryHandler = brokerFactoryHandler;
240        }
241    
242        private boolean validateBroker(String host) {
243            boolean result = true;
244            if (BROKERS.containsKey(host) || SERVERS.containsKey(host) || CONNECTORS.containsKey(host)) {
245                // check the broker is still in the BrokerRegistry
246                TransportConnector connector = CONNECTORS.get(host);
247                if (BrokerRegistry.getInstance().lookup(host) == null
248                    || (connector != null && connector.getBroker().isStopped())) {
249                    result = false;
250                    // clean-up
251                    BROKERS.remove(host);
252                    SERVERS.remove(host);
253                    if (connector != null) {
254                        CONNECTORS.remove(host);
255                        if (connector != null) {
256                            ServiceSupport.dispose(connector);
257                        }
258                    }
259                }
260            }
261            return result;
262        }
263    }