001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.transport; 019 020 import java.io.IOException; 021 import java.net.Socket; 022 import java.util.Iterator; 023 import java.util.concurrent.ConcurrentLinkedQueue; 024 import java.util.concurrent.atomic.AtomicInteger; 025 import java.util.concurrent.locks.Condition; 026 import java.util.concurrent.locks.ReentrantLock; 027 028 import org.apache.activemq.transport.tcp.TcpBufferedOutputStream; 029 import org.apache.commons.logging.Log; 030 import org.apache.commons.logging.LogFactory; 031 032 /** 033 * This filter implements write timeouts for socket write operations. 034 * When using blocking IO, the Java implementation doesn't have an explicit flag 035 * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions, 036 * which is usually around 13-30 minutes).<br/> 037 * To enable this transport, in the transport URI, simpley add<br/> 038 * <code>transport.soWriteTimeout=<value in millis></code>.<br/> 039 * For example (15 second timeout on write operations to the socket):</br> 040 * <pre><code> 041 * <transportConnector 042 * name="tcp1" 043 * uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000" 044 * /> 045 * </code></pre><br/> 046 * For example (enable default timeout on the socket):</br> 047 * <pre><code> 048 * <transportConnector 049 * name="tcp1" 050 * uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000" 051 * /> 052 * </code></pre> 053 * @author Filip Hanik 054 * 055 */ 056 public class WriteTimeoutFilter extends TransportFilter { 057 058 private static final Log LOG = LogFactory.getLog(WriteTimeoutFilter.class); 059 protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>(); 060 protected static AtomicInteger messageCounter = new AtomicInteger(0); 061 protected static TimeoutThread timeoutThread = new TimeoutThread(); 062 063 protected static long sleep = 5000l; 064 065 protected long writeTimeout = -1; 066 067 public WriteTimeoutFilter(Transport next) { 068 super(next); 069 } 070 071 @Override 072 public void oneway(Object command) throws IOException { 073 try { 074 registerWrite(this); 075 super.oneway(command); 076 } catch (IOException x) { 077 throw x; 078 } finally { 079 deRegisterWrite(this,false,null); 080 } 081 } 082 083 public long getWriteTimeout() { 084 return writeTimeout; 085 } 086 087 public void setWriteTimeout(long writeTimeout) { 088 this.writeTimeout = writeTimeout; 089 } 090 091 public static long getSleep() { 092 return sleep; 093 } 094 095 public static void setSleep(long sleep) { 096 WriteTimeoutFilter.sleep = sleep; 097 } 098 099 100 protected TcpBufferedOutputStream getWriter() { 101 return next.narrow(TcpBufferedOutputStream.class); 102 } 103 104 protected Socket getSocket() { 105 return next.narrow(Socket.class); 106 } 107 108 protected static void registerWrite(WriteTimeoutFilter filter) { 109 writers.add(filter); 110 } 111 112 protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) { 113 boolean result = writers.remove(filter); 114 if (result) { 115 if (fail) { 116 String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress(); 117 LOG.warn(message); 118 Socket sock = filter.getSocket(); 119 if (sock==null) { 120 LOG.error("Destination socket is null, unable to close socket.("+message+")"); 121 } else { 122 try { 123 sock.close(); 124 }catch (IOException ignore) { 125 } 126 } 127 } 128 } 129 return result; 130 } 131 132 @Override 133 public void start() throws Exception { 134 super.start(); 135 } 136 137 @Override 138 public void stop() throws Exception { 139 super.stop(); 140 } 141 142 protected static class TimeoutThread extends Thread { 143 static AtomicInteger instance = new AtomicInteger(0); 144 boolean run = true; 145 public TimeoutThread() { 146 setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet()); 147 setDaemon(true); 148 setPriority(Thread.MIN_PRIORITY); 149 start(); 150 } 151 152 153 public void run() { 154 while (run) { 155 boolean error = false; 156 try { 157 if (!interrupted()) { 158 Iterator<WriteTimeoutFilter> filters = writers.iterator(); 159 while (run && filters.hasNext()) { 160 WriteTimeoutFilter filter = filters.next(); 161 if (filter.getWriteTimeout()<=0) continue; //no timeout set 162 long writeStart = filter.getWriter().getWriteTimestamp(); 163 long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1; 164 if (delta>filter.getWriteTimeout()) { 165 WriteTimeoutFilter.deRegisterWrite(filter, true,null); 166 }//if timeout 167 }//while 168 }//if interrupted 169 try { 170 Thread.sleep(getSleep()); 171 error = false; 172 } catch (InterruptedException x) { 173 //do nothing 174 } 175 }catch (Throwable t) { //make sure this thread never dies 176 if (!error) { //use error flag to avoid filling up the logs 177 LOG.error("WriteTimeout thread unable validate existing sockets.",t); 178 error = true; 179 } 180 } 181 } 182 } 183 } 184 185 }