001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport;
018    
019    import java.io.IOException;
020    import java.net.MalformedURLException;
021    import java.net.URI;
022    import java.net.URISyntaxException;
023    import java.net.UnknownHostException;
024    import java.util.HashMap;
025    import java.util.Map;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.Executor;
028    
029    import org.apache.activemq.broker.BrokerService;
030    import org.apache.activemq.broker.BrokerServiceAware;
031    import org.apache.activemq.broker.SslContext;
032    import org.apache.activemq.util.FactoryFinder;
033    import org.apache.activemq.util.IOExceptionSupport;
034    import org.apache.activemq.util.IntrospectionSupport;
035    import org.apache.activemq.util.URISupport;
036    import org.apache.activemq.wireformat.WireFormat;
037    import org.apache.activemq.wireformat.WireFormatFactory;
038    
039    public abstract class TransportFactory {
040    
041        private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
042        private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
043        private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
044    
045        private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
046        private static final String THREAD_NAME_FILTER = "threadName";
047        
048        public abstract TransportServer doBind(URI location) throws IOException;
049    
050        public Transport doConnect(URI location, Executor ex) throws Exception {
051            return doConnect(location);
052        }
053    
054        public Transport doCompositeConnect(URI location, Executor ex) throws Exception {
055            return doCompositeConnect(location);
056        }
057    
058        /**
059         * Creates a normal transport.
060         * 
061         * @param location
062         * @return the transport
063         * @throws Exception
064         */
065        public static Transport connect(URI location) throws Exception {
066            TransportFactory tf = findTransportFactory(location);
067            return tf.doConnect(location);
068        }
069    
070        /**
071         * Creates a normal transport.
072         * 
073         * @param location
074         * @param ex
075         * @return the transport
076         * @throws Exception
077         */
078        public static Transport connect(URI location, Executor ex) throws Exception {
079            TransportFactory tf = findTransportFactory(location);
080            return tf.doConnect(location, ex);
081        }
082    
083        /**
084         * Creates a slimmed down transport that is more efficient so that it can be
085         * used by composite transports like reliable and HA.
086         * 
087         * @param location
088         * @return the Transport
089         * @throws Exception
090         */
091        public static Transport compositeConnect(URI location) throws Exception {
092            TransportFactory tf = findTransportFactory(location);
093            return tf.doCompositeConnect(location);
094        }
095    
096        /**
097         * Creates a slimmed down transport that is more efficient so that it can be
098         * used by composite transports like reliable and HA.
099         * 
100         * @param location
101         * @param ex
102         * @return the Transport
103         * @throws Exception
104         */
105        public static Transport compositeConnect(URI location, Executor ex) throws Exception {
106            TransportFactory tf = findTransportFactory(location);
107            return tf.doCompositeConnect(location, ex);
108        }
109    
110        public static TransportServer bind(URI location) throws IOException {
111            TransportFactory tf = findTransportFactory(location);
112            return tf.doBind(location);
113        }
114    
115        /**
116         * @deprecated 
117         */
118        public static TransportServer bind(String brokerId, URI location) throws IOException {
119            return bind(location);
120        }
121        
122        public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
123            TransportFactory tf = findTransportFactory(location);
124            if( brokerService!=null && tf instanceof BrokerServiceAware ) {
125                ((BrokerServiceAware)tf).setBrokerService(brokerService);
126            }
127            try {
128                if( brokerService!=null ) {
129                    SslContext.setCurrentSslContext(brokerService.getSslContext());
130                }
131                return tf.doBind(location);
132            } finally {
133                SslContext.setCurrentSslContext(null);
134            }
135        }    
136    
137        public Transport doConnect(URI location) throws Exception {
138            try {
139                Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
140                WireFormat wf = createWireFormat(options);
141                Transport transport = createTransport(location, wf);
142                Transport rc = configure(transport, wf, options);
143                if (!options.isEmpty()) {
144                    throw new IllegalArgumentException("Invalid connect parameters: " + options);
145                }
146                return rc;
147            } catch (URISyntaxException e) {
148                throw IOExceptionSupport.create(e);
149            }
150        }
151    
152        public Transport doCompositeConnect(URI location) throws Exception {
153            try {
154                Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
155                WireFormat wf = createWireFormat(options);
156                Transport transport = createTransport(location, wf);
157                Transport rc = compositeConfigure(transport, wf, options);
158                if (!options.isEmpty()) {
159                    throw new IllegalArgumentException("Invalid connect parameters: " + options);
160                }
161                return rc;
162    
163            } catch (URISyntaxException e) {
164                throw IOExceptionSupport.create(e);
165            }
166        }
167        
168         /**
169          * Allow registration of a transport factory without wiring via META-INF classes
170         * @param scheme
171         * @param tf
172         */
173        public static void registerTransportFactory(String scheme, TransportFactory tf) {
174            TRANSPORT_FACTORYS.put(scheme, tf);
175          }
176    
177        /**
178         * Factory method to create a new transport
179         * 
180         * @throws IOException
181         * @throws UnknownHostException
182         */
183        protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException {
184            throw new IOException("createTransport() method not implemented!");
185        }
186    
187        /**
188         * @param location
189         * @return
190         * @throws IOException
191         */
192        private static TransportFactory findTransportFactory(URI location) throws IOException {
193            String scheme = location.getScheme();
194            if (scheme == null) {
195                throw new IOException("Transport not scheme specified: [" + location + "]");
196            }
197            TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
198            if (tf == null) {
199                // Try to load if from a META-INF property.
200                try {
201                    tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
202                    TRANSPORT_FACTORYS.put(scheme, tf);
203                } catch (Throwable e) {
204                    throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
205                }
206            }
207            return tf;
208        }
209    
210        protected WireFormat createWireFormat(Map<String, String> options) throws IOException {
211            WireFormatFactory factory = createWireFormatFactory(options);
212            WireFormat format = factory.createWireFormat();
213            return format;
214        }
215    
216        protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
217            String wireFormat = (String)options.remove("wireFormat");
218            if (wireFormat == null) {
219                wireFormat = getDefaultWireFormatType();
220            }
221    
222            try {
223                WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
224                IntrospectionSupport.setProperties(wff, options, "wireFormat.");
225                return wff;
226            } catch (Throwable e) {
227                throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
228            }
229        }
230    
231        protected String getDefaultWireFormatType() {
232            return "default";
233        }
234    
235        /**
236         * Fully configures and adds all need transport filters so that the
237         * transport can be used by the JMS client.
238         * 
239         * @param transport
240         * @param wf
241         * @param options
242         * @return
243         * @throws Exception
244         */
245        public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
246            transport = compositeConfigure(transport, wf, options);
247    
248            transport = new MutexTransport(transport);
249            transport = new ResponseCorrelator(transport);
250    
251            return transport;
252        }
253    
254        /**
255         * Fully configures and adds all need transport filters so that the
256         * transport can be used by the ActiveMQ message broker. The main difference
257         * between this and the configure() method is that the broker does not issue
258         * requests to the client so the ResponseCorrelator is not needed.
259         * 
260         * @param transport
261         * @param format
262         * @param options
263         * @return
264         * @throws Exception
265         */
266        public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
267            if (options.containsKey(THREAD_NAME_FILTER)) {
268                transport = new ThreadNameFilter(transport);
269            }
270            transport = compositeConfigure(transport, format, options);
271            transport = new MutexTransport(transport);
272            return transport;
273        }
274    
275        /**
276         * Similar to configure(...) but this avoid adding in the MutexTransport and
277         * ResponseCorrelator transport layers so that the resulting transport can
278         * more efficiently be used as part of a composite transport.
279         * 
280         * @param transport
281         * @param format
282         * @param options
283         * @return
284         */
285        public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
286            if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
287                transport = new WriteTimeoutFilter(transport);
288                String soWriteTimeout = (String)options.remove(WRITE_TIMEOUT_FILTER);
289                if (soWriteTimeout!=null) {
290                    ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
291                }
292            }
293            IntrospectionSupport.setProperties(transport, options);
294            return transport;
295        }
296    
297    }