001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport; 018 019 import java.io.IOException; 020 import java.util.Timer; 021 import java.util.concurrent.SynchronousQueue; 022 import java.util.concurrent.ThreadFactory; 023 import java.util.concurrent.ThreadPoolExecutor; 024 import java.util.concurrent.TimeUnit; 025 import java.util.concurrent.atomic.AtomicBoolean; 026 import java.util.concurrent.atomic.AtomicInteger; 027 028 import org.apache.activemq.command.KeepAliveInfo; 029 import org.apache.activemq.command.WireFormatInfo; 030 import org.apache.activemq.thread.SchedulerTimerTask; 031 import org.apache.activemq.wireformat.WireFormat; 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 035 /** 036 * Used to make sure that commands are arriving periodically from the peer of 037 * the transport. 038 * 039 * @version $Revision$ 040 */ 041 public class InactivityMonitor extends TransportFilter { 042 043 private static final Log LOG = LogFactory.getLog(InactivityMonitor.class); 044 private static final ThreadPoolExecutor ASYNC_TASKS; 045 046 private static int CHECKER_COUNTER; 047 private static Timer READ_CHECK_TIMER; 048 private static Timer WRITE_CHECK_TIMER; 049 050 private WireFormatInfo localWireFormatInfo; 051 private WireFormatInfo remoteWireFormatInfo; 052 private final AtomicBoolean monitorStarted = new AtomicBoolean(false); 053 054 private final AtomicBoolean commandSent = new AtomicBoolean(false); 055 private final AtomicBoolean inSend = new AtomicBoolean(false); 056 private final AtomicBoolean failed = new AtomicBoolean(false); 057 058 private final AtomicBoolean commandReceived = new AtomicBoolean(true); 059 private final AtomicBoolean inReceive = new AtomicBoolean(false); 060 private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); 061 062 private SchedulerTimerTask writeCheckerTask; 063 private SchedulerTimerTask readCheckerTask; 064 065 private boolean ignoreRemoteWireFormat = false; 066 private long readCheckTime; 067 private long writeCheckTime; 068 private long initialDelayTime; 069 private boolean keepAliveResponseRequired; 070 private WireFormat wireFormat; 071 072 private final Runnable readChecker = new Runnable() { 073 long lastRunTime; 074 public void run() { 075 long now = System.currentTimeMillis(); 076 long elapsed = (now-lastRunTime); 077 078 if( lastRunTime != 0 && LOG.isDebugEnabled() ) { 079 LOG.debug(""+elapsed+" ms elapsed since last read check."); 080 } 081 082 // Perhaps the timer executed a read check late.. and then executes 083 // the next read check on time which causes the time elapsed between 084 // read checks to be small.. 085 086 // If less than 90% of the read check Time elapsed then abort this readcheck. 087 if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression. 088 LOG.debug("Aborting read check.. Not enough time elapsed since last read check."); 089 return; 090 } 091 092 lastRunTime = now; 093 readCheck(); 094 } 095 }; 096 097 private boolean allowReadCheck(long elapsed) { 098 return elapsed > (readCheckTime * 9 / 10); 099 } 100 101 private final Runnable writeChecker = new Runnable() { 102 long lastRunTime; 103 public void run() { 104 long now = System.currentTimeMillis(); 105 if( lastRunTime != 0 && LOG.isDebugEnabled() ) { 106 LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check."); 107 108 } 109 lastRunTime = now; 110 writeCheck(); 111 } 112 }; 113 114 public InactivityMonitor(Transport next, WireFormat wireFormat) { 115 super(next); 116 this.wireFormat = wireFormat; 117 } 118 119 public void stop() throws Exception { 120 stopMonitorThreads(); 121 next.stop(); 122 } 123 124 final void writeCheck() { 125 if (inSend.get()) { 126 if (LOG.isTraceEnabled()) { 127 LOG.trace("A send is in progress"); 128 } 129 return; 130 } 131 132 if (!commandSent.get()) { 133 if (LOG.isTraceEnabled()) { 134 LOG.trace("No message sent since last write check, sending a KeepAliveInfo"); 135 } 136 ASYNC_TASKS.execute(new Runnable() { 137 public void run() { 138 if (monitorStarted.get()) { 139 try { 140 141 KeepAliveInfo info = new KeepAliveInfo(); 142 info.setResponseRequired(keepAliveResponseRequired); 143 oneway(info); 144 } catch (IOException e) { 145 onException(e); 146 } 147 } 148 }; 149 }); 150 } else { 151 if (LOG.isTraceEnabled()) { 152 LOG.trace("Message sent since last write check, resetting flag"); 153 } 154 } 155 156 commandSent.set(false); 157 } 158 159 final void readCheck() { 160 int currentCounter = next.getReceiveCounter(); 161 int previousCounter = lastReceiveCounter.getAndSet(currentCounter); 162 if (inReceive.get() || currentCounter!=previousCounter ) { 163 if (LOG.isTraceEnabled()) { 164 LOG.trace("A receive is in progress"); 165 } 166 return; 167 } 168 if (!commandReceived.get()) { 169 if (LOG.isDebugEnabled()) { 170 LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); 171 } 172 ASYNC_TASKS.execute(new Runnable() { 173 public void run() { 174 onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress())); 175 }; 176 177 }); 178 } else { 179 if (LOG.isTraceEnabled()) { 180 LOG.trace("Message received since last read check, resetting flag: "); 181 } 182 } 183 commandReceived.set(false); 184 } 185 186 public void onCommand(Object command) { 187 commandReceived.set(true); 188 inReceive.set(true); 189 try { 190 if (command.getClass() == KeepAliveInfo.class) { 191 KeepAliveInfo info = (KeepAliveInfo) command; 192 if (info.isResponseRequired()) { 193 try { 194 info.setResponseRequired(false); 195 oneway(info); 196 } catch (IOException e) { 197 onException(e); 198 } 199 } 200 } else { 201 if (command.getClass() == WireFormatInfo.class) { 202 synchronized (this) { 203 IOException error = null; 204 remoteWireFormatInfo = (WireFormatInfo) command; 205 try { 206 startMonitorThreads(); 207 } catch (IOException e) { 208 error = e; 209 } 210 if (error != null) { 211 onException(error); 212 } 213 } 214 } 215 synchronized (readChecker) { 216 transportListener.onCommand(command); 217 } 218 } 219 } finally { 220 221 inReceive.set(false); 222 } 223 } 224 225 public void oneway(Object o) throws IOException { 226 // Disable inactivity monitoring while processing a command. 227 //synchronize this method - its not synchronized 228 //further down the transport stack and gets called by more 229 //than one thread by this class 230 synchronized(inSend) { 231 inSend.set(true); 232 try { 233 234 if( failed.get() ) { 235 throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()); 236 } 237 if (o.getClass() == WireFormatInfo.class) { 238 synchronized (this) { 239 localWireFormatInfo = (WireFormatInfo)o; 240 startMonitorThreads(); 241 } 242 } 243 next.oneway(o); 244 } finally { 245 commandSent.set(true); 246 inSend.set(false); 247 } 248 } 249 } 250 251 public void onException(IOException error) { 252 if (failed.compareAndSet(false, true)) { 253 stopMonitorThreads(); 254 transportListener.onException(error); 255 } 256 } 257 258 public void setKeepAliveResponseRequired(boolean val) { 259 keepAliveResponseRequired = val; 260 } 261 262 public void setIgnoreRemoteWireFormat(boolean val) { 263 ignoreRemoteWireFormat = val; 264 } 265 266 private synchronized void startMonitorThreads() throws IOException { 267 if (monitorStarted.get()) { 268 return; 269 } 270 if (localWireFormatInfo == null) { 271 return; 272 } 273 if (remoteWireFormatInfo == null) { 274 return; 275 } 276 277 if (!ignoreRemoteWireFormat) { 278 readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); 279 initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()); 280 } else { 281 readCheckTime = localWireFormatInfo.getMaxInactivityDuration(); 282 initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay(); 283 } 284 285 if (readCheckTime > 0) { 286 monitorStarted.set(true); 287 writeCheckerTask = new SchedulerTimerTask(writeChecker); 288 readCheckerTask = new SchedulerTimerTask(readChecker); 289 writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime; 290 synchronized( InactivityMonitor.class ) { 291 if( CHECKER_COUNTER == 0 ) { 292 READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true); 293 WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true); 294 } 295 CHECKER_COUNTER++; 296 WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime); 297 READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime); 298 } 299 } 300 } 301 302 /** 303 * 304 */ 305 private synchronized void stopMonitorThreads() { 306 if (monitorStarted.compareAndSet(true, false)) { 307 readCheckerTask.cancel(); 308 writeCheckerTask.cancel(); 309 synchronized( InactivityMonitor.class ) { 310 WRITE_CHECK_TIMER.purge(); 311 READ_CHECK_TIMER.purge(); 312 CHECKER_COUNTER--; 313 if(CHECKER_COUNTER==0) { 314 WRITE_CHECK_TIMER.cancel(); 315 READ_CHECK_TIMER.cancel(); 316 WRITE_CHECK_TIMER = null; 317 READ_CHECK_TIMER = null; 318 } 319 } 320 } 321 } 322 323 324 static { 325 ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { 326 public Thread newThread(Runnable runnable) { 327 Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable); 328 thread.setDaemon(true); 329 return thread; 330 } 331 }); 332 } 333 334 }