001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport.tcp; 020 021 import java.io.IOException; 022 import java.net.InetAddress; 023 import java.net.InetSocketAddress; 024 import java.net.ServerSocket; 025 import java.net.Socket; 026 import java.net.SocketTimeoutException; 027 import java.net.URI; 028 import java.net.URISyntaxException; 029 import java.net.UnknownHostException; 030 031 import javax.jms.JMSException; 032 033 import org.activemq.io.WireFormat; 034 import org.activemq.transport.TransportServerChannelSupport; 035 import org.activemq.util.JMSExceptionHelper; 036 import org.apache.commons.logging.Log; 037 import org.apache.commons.logging.LogFactory; 038 039 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; 040 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 041 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 042 043 /** 044 * Binds to a well known port and listens for Sockets ... 045 * 046 * @version $Revision: 1.1.1.1 $ 047 */ 048 public class TcpTransportServerChannel extends TransportServerChannelSupport implements Runnable { 049 private static final Log log = LogFactory.getLog(TcpTransportServerChannel.class); 050 protected static final int DEFAULT_BACKLOG = 500; 051 private WireFormat wireFormat; 052 private Thread serverSocketThread; 053 private ServerSocket serverSocket; 054 private SynchronizedBoolean closed; 055 private SynchronizedBoolean started; 056 private boolean useAsyncSend = false; 057 private int maxOutstandingMessages = 10; 058 private int backlog = DEFAULT_BACKLOG; 059 060 /** 061 * Default Constructor 062 * 063 * @param bindAddr 064 * @throws JMSException 065 */ 066 public TcpTransportServerChannel(WireFormat wireFormat, URI bindAddr) throws JMSException { 067 super(bindAddr); 068 this.wireFormat = wireFormat; 069 closed = new SynchronizedBoolean(false); 070 started = new SynchronizedBoolean(false); 071 try { 072 serverSocket = createServerSocket(bindAddr); 073 serverSocket.setSoTimeout(2000); 074 updatePhysicalUri(bindAddr); 075 } 076 catch (Exception se) { 077 System.out.println(se); 078 se.printStackTrace(); 079 throw JMSExceptionHelper.newJMSException("Bind to " + bindAddr + " failed: " + se.getMessage(), se); 080 } 081 } 082 083 public TcpTransportServerChannel(WireFormat wireFormat, ServerSocket serverSocket) throws JMSException { 084 super(serverSocket.getInetAddress().toString()); 085 this.wireFormat = wireFormat; 086 this.serverSocket = serverSocket; 087 closed = new SynchronizedBoolean(false); 088 started = new SynchronizedBoolean(false); 089 InetAddress address = serverSocket.getInetAddress(); 090 try { 091 updatePhysicalUri(new URI("tcp", "", address.getHostName(), 0, "", "", "")); 092 } 093 catch (URISyntaxException e) { 094 throw JMSExceptionHelper.newJMSException("Failed to extract URI: : " + e.getMessage(), e); 095 } 096 } 097 098 public void start() throws JMSException { 099 super.start(); 100 if (started.commit(false, true)) { 101 log.info("Listening for connections at: " + getUrl()); 102 serverSocketThread = new Thread(this, toString()); 103 serverSocketThread.setDaemon(true); 104 serverSocketThread.start(); 105 } 106 } 107 108 public void stop() throws JMSException { 109 if (closed.commit(false, true)) { 110 super.stop(); 111 try { 112 if (serverSocket != null) { 113 serverSocket.close(); 114 if (serverSocketThread != null) { 115 serverSocketThread.join(); 116 serverSocketThread = null; 117 } 118 } 119 } 120 catch (Throwable e) { 121 throw JMSExceptionHelper.newJMSException("Failed to stop: " + e, e); 122 } 123 } 124 } 125 126 public InetSocketAddress getSocketAddress() { 127 return (InetSocketAddress) serverSocket.getLocalSocketAddress(); 128 } 129 130 /** 131 * @return pretty print of this 132 */ 133 public String toString() { 134 return "TcpTransportServerChannel@" + getUrl(); 135 } 136 137 /** 138 * pull Sockets from the ServerSocket 139 */ 140 public void run() { 141 while (!closed.get()) { 142 Socket socket = null; 143 try { 144 socket = serverSocket.accept(); 145 if (socket != null) { 146 if (closed.get()) { 147 socket.close(); 148 } 149 else { 150 // have thread per channel for sending messages and a thread for receiving them 151 PooledExecutor executor = null; 152 if (useAsyncSend) { 153 executor = new PooledExecutor(new BoundedBuffer(maxOutstandingMessages), 1); 154 executor.waitWhenBlocked(); 155 executor.setKeepAliveTime(1000); 156 } 157 TcpTransportChannel channel = createTransportChannel(socket, executor); 158 addClient(channel); 159 } 160 } 161 } 162 catch (SocketTimeoutException ste) { 163 //expect this to happen 164 } 165 catch (Throwable e) { 166 if (!closed.get()) { 167 log.warn("run()", e); 168 } 169 } 170 } 171 } 172 173 protected TcpTransportChannel createTransportChannel(Socket socket, PooledExecutor executor) throws JMSException { 174 TcpTransportChannel channel = new TcpTransportChannel(this,wireFormat.copy(), socket, executor); 175 return channel; 176 } 177 178 // Properties 179 //------------------------------------------------------------------------- 180 public boolean isUseAsyncSend() { 181 return useAsyncSend; 182 } 183 184 public void setUseAsyncSend(boolean useAsyncSend) { 185 this.useAsyncSend = useAsyncSend; 186 } 187 188 public int getMaxOutstandingMessages() { 189 return maxOutstandingMessages; 190 } 191 192 public void setMaxOutstandingMessages(int maxOutstandingMessages) { 193 this.maxOutstandingMessages = maxOutstandingMessages; 194 } 195 196 public int getBacklog() { 197 return backlog; 198 } 199 200 public void setBacklog(int backlog) { 201 this.backlog = backlog; 202 } 203 204 public WireFormat getWireFormat() { 205 return wireFormat; 206 } 207 208 public void setWireFormat(WireFormat wireFormat) { 209 this.wireFormat = wireFormat; 210 } 211 212 // Implementation methods 213 //------------------------------------------------------------------------- 214 /** 215 * In cases where we construct ourselves with a zero port we need to regenerate the URI with the real physical port 216 * so that people can connect to us via discovery 217 */ 218 protected void updatePhysicalUri(URI bindAddr) throws URISyntaxException { 219 URI newURI = new URI(bindAddr.getScheme(), bindAddr.getUserInfo(), resolveHostName(bindAddr.getHost()), serverSocket 220 .getLocalPort(), bindAddr.getPath(), bindAddr.getQuery(), bindAddr.getFragment()); 221 setUrl(newURI.toString()); 222 } 223 224 /** 225 * Factory method to create a new ServerSocket 226 * 227 * @throws UnknownHostException 228 * @throws IOException 229 */ 230 protected ServerSocket createServerSocket(URI bind) throws UnknownHostException, IOException { 231 ServerSocket answer = null; 232 String host = bind.getHost(); 233 host = (host == null || host.length() == 0) ? "localhost" : host; 234 InetAddress addr = InetAddress.getByName(host); 235 if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) { 236 answer = new ServerSocket(bind.getPort(), backlog); 237 } 238 else { 239 answer = new ServerSocket(bind.getPort(), backlog, addr); 240 } 241 return answer; 242 } 243 }