001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.tcp; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import java.net.UnknownHostException; 023 import java.util.HashMap; 024 import java.util.Map; 025 026 import javax.net.ServerSocketFactory; 027 import javax.net.SocketFactory; 028 029 import org.apache.activemq.openwire.OpenWireFormat; 030 import org.apache.activemq.transport.InactivityMonitor; 031 import org.apache.activemq.transport.Transport; 032 import org.apache.activemq.transport.TransportFactory; 033 import org.apache.activemq.transport.TransportLoggerFactory; 034 import org.apache.activemq.transport.TransportServer; 035 import org.apache.activemq.transport.WireFormatNegotiator; 036 import org.apache.activemq.util.IOExceptionSupport; 037 import org.apache.activemq.util.IntrospectionSupport; 038 import org.apache.activemq.util.URISupport; 039 import org.apache.activemq.wireformat.WireFormat; 040 import org.apache.commons.logging.Log; 041 import org.apache.commons.logging.LogFactory; 042 043 /** 044 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) 045 * @version $Revision$ 046 */ 047 public class TcpTransportFactory extends TransportFactory { 048 private static final Log LOG = LogFactory.getLog(TcpTransportFactory.class); 049 050 public TransportServer doBind(final URI location) throws IOException { 051 try { 052 Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location)); 053 054 ServerSocketFactory serverSocketFactory = createServerSocketFactory(); 055 TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); 056 server.setWireFormatFactory(createWireFormatFactory(options)); 057 IntrospectionSupport.setProperties(server, options); 058 Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport."); 059 server.setTransportOption(transportOptions); 060 server.bind(); 061 062 return server; 063 } catch (URISyntaxException e) { 064 throw IOExceptionSupport.create(e); 065 } 066 } 067 068 /** 069 * Allows subclasses of TcpTransportFactory to create custom instances of 070 * TcpTransportServer. 071 * 072 * @param location 073 * @param serverSocketFactory 074 * @return 075 * @throws IOException 076 * @throws URISyntaxException 077 */ 078 protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { 079 return new TcpTransportServer(this, location, serverSocketFactory); 080 } 081 082 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { 083 084 TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class); 085 IntrospectionSupport.setProperties(tcpTransport, options); 086 087 Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket."); 088 tcpTransport.setSocketOptions(socketOptions); 089 090 if (tcpTransport.isTrace()) { 091 try { 092 transport = TransportLoggerFactory.getInstance().createTransportLogger(transport, tcpTransport.getLogWriterName(), 093 tcpTransport.isDynamicManagement(), tcpTransport.isStartLogging(), tcpTransport.getJmxPort()); 094 } catch (Throwable e) { 095 LOG.error("Could not create TransportLogger object for: " + tcpTransport.getLogWriterName() + ", reason: " + e, e); 096 } 097 } 098 099 boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true")); 100 if (useInactivityMonitor && isUseInactivityMonitor(transport)) { 101 transport = new InactivityMonitor(transport, format); 102 IntrospectionSupport.setProperties(transport, options); 103 } 104 105 106 // Only need the WireFormatNegotiator if using openwire 107 if (format instanceof OpenWireFormat) { 108 transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion()); 109 } 110 111 return super.compositeConfigure(transport, format, options); 112 } 113 114 private String getOption(Map options, String key, String def) { 115 String rc = (String) options.remove(key); 116 if( rc == null ) { 117 rc = def; 118 } 119 return rc; 120 } 121 122 /** 123 * Returns true if the inactivity monitor should be used on the transport 124 */ 125 protected boolean isUseInactivityMonitor(Transport transport) { 126 return true; 127 } 128 129 protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { 130 URI localLocation = null; 131 String path = location.getPath(); 132 // see if the path is a local URI location 133 if (path != null && path.length() > 0) { 134 int localPortIndex = path.indexOf(':'); 135 try { 136 Integer.parseInt(path.substring(localPortIndex + 1, path.length())); 137 String localString = location.getScheme() + ":/" + path; 138 localLocation = new URI(localString); 139 } catch (Exception e) { 140 LOG.warn("path isn't a valid local location for TcpTransport to use", e); 141 } 142 } 143 SocketFactory socketFactory = createSocketFactory(); 144 return createTcpTransport(wf, socketFactory, location, localLocation); 145 } 146 147 /** 148 * Allows subclasses of TcpTransportFactory to provide a create custom 149 * TcpTransport intances. 150 * 151 * @param location 152 * @param wf 153 * @param socketFactory 154 * @param localLocation 155 * @return 156 * @throws UnknownHostException 157 * @throws IOException 158 */ 159 protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { 160 return new TcpTransport(wf, socketFactory, location, localLocation); 161 } 162 163 protected ServerSocketFactory createServerSocketFactory() throws IOException { 164 return ServerSocketFactory.getDefault(); 165 } 166 167 protected SocketFactory createSocketFactory() throws IOException { 168 return SocketFactory.getDefault(); 169 } 170 }