001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport;
018    
019    import java.io.IOException;
020    import java.util.Timer;
021    import java.util.concurrent.SynchronousQueue;
022    import java.util.concurrent.ThreadFactory;
023    import java.util.concurrent.ThreadPoolExecutor;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicBoolean;
026    import java.util.concurrent.atomic.AtomicInteger;
027    
028    import org.apache.activemq.command.KeepAliveInfo;
029    import org.apache.activemq.command.WireFormatInfo;
030    import org.apache.activemq.thread.SchedulerTimerTask;
031    import org.apache.activemq.wireformat.WireFormat;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    /**
036     * Used to make sure that commands are arriving periodically from the peer of
037     * the transport.
038     *
039     * @version $Revision$
040     */
041    public class InactivityMonitor extends TransportFilter {
042    
043        private static final Log LOG = LogFactory.getLog(InactivityMonitor.class);
044        private static final ThreadPoolExecutor ASYNC_TASKS;
045    
046        private static int CHECKER_COUNTER;
047        private static Timer  READ_CHECK_TIMER;
048        private static Timer  WRITE_CHECK_TIMER;
049    
050        private WireFormatInfo localWireFormatInfo;
051        private WireFormatInfo remoteWireFormatInfo;
052        private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
053    
054        private final AtomicBoolean commandSent = new AtomicBoolean(false);
055        private final AtomicBoolean inSend = new AtomicBoolean(false);
056        private final AtomicBoolean failed = new AtomicBoolean(false);
057    
058        private final AtomicBoolean commandReceived = new AtomicBoolean(true);
059        private final AtomicBoolean inReceive = new AtomicBoolean(false);
060        private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
061    
062        private SchedulerTimerTask writeCheckerTask;
063        private SchedulerTimerTask readCheckerTask;
064    
065        private boolean ignoreRemoteWireFormat = false;
066        private long readCheckTime;
067        private long writeCheckTime;
068        private long initialDelayTime;
069        private boolean keepAliveResponseRequired;
070        private WireFormat wireFormat;
071    
072        private final Runnable readChecker = new Runnable() {
073            long lastRunTime;
074            public void run() {
075                long now = System.currentTimeMillis();
076                long elapsed = (now-lastRunTime);
077    
078                if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
079                    LOG.debug(""+elapsed+" ms elapsed since last read check.");
080                }
081    
082                // Perhaps the timer executed a read check late.. and then executes
083                // the next read check on time which causes the time elapsed between
084                // read checks to be small..
085    
086                // If less than 90% of the read check Time elapsed then abort this readcheck.
087                if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
088                    LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
089                    return;
090                }
091    
092                lastRunTime = now;
093                readCheck();
094            }
095        };
096    
097        private boolean allowReadCheck(long elapsed) {
098            return elapsed > (readCheckTime * 9 / 10);
099        }
100    
101        private final Runnable writeChecker = new Runnable() {
102            long lastRunTime;
103            public void run() {
104                long now = System.currentTimeMillis();
105                if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
106                    LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check.");
107    
108                }
109                lastRunTime = now;
110                writeCheck();
111            }
112        };
113    
114        public InactivityMonitor(Transport next, WireFormat wireFormat) {
115            super(next);
116            this.wireFormat = wireFormat;
117        }
118    
119        public void stop() throws Exception {
120            stopMonitorThreads();
121            next.stop();
122        }
123    
124        final void writeCheck() {
125            if (inSend.get()) {
126                if (LOG.isTraceEnabled()) {
127                    LOG.trace("A send is in progress");
128                }
129                return;
130            }
131    
132            if (!commandSent.get()) {
133                if (LOG.isTraceEnabled()) {
134                    LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
135                }
136                ASYNC_TASKS.execute(new Runnable() {
137                    public void run() {
138                        if (monitorStarted.get()) {
139                            try {
140    
141                                KeepAliveInfo info = new KeepAliveInfo();
142                                info.setResponseRequired(keepAliveResponseRequired);
143                                oneway(info);
144                            } catch (IOException e) {
145                                onException(e);
146                            }
147                        }
148                    };
149                });
150            } else {
151                if (LOG.isTraceEnabled()) {
152                    LOG.trace("Message sent since last write check, resetting flag");
153                }
154            }
155    
156            commandSent.set(false);
157        }
158    
159        final void readCheck() {
160            int currentCounter = next.getReceiveCounter();
161            int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
162            if (inReceive.get() || currentCounter!=previousCounter ) {
163                if (LOG.isTraceEnabled()) {
164                    LOG.trace("A receive is in progress");
165                }
166                return;
167            }
168            if (!commandReceived.get()) {
169                if (LOG.isDebugEnabled()) {
170                    LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
171                }
172                ASYNC_TASKS.execute(new Runnable() {
173                    public void run() {
174                        onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
175                    };
176    
177                });
178            } else {
179                if (LOG.isTraceEnabled()) {
180                    LOG.trace("Message received since last read check, resetting flag: ");
181                }
182            }
183            commandReceived.set(false);
184        }
185    
186        public void onCommand(Object command) {
187            commandReceived.set(true);
188            inReceive.set(true);
189            try {
190                if (command.getClass() == KeepAliveInfo.class) {
191                    KeepAliveInfo info = (KeepAliveInfo) command;
192                    if (info.isResponseRequired()) {
193                        try {
194                            info.setResponseRequired(false);
195                            oneway(info);
196                        } catch (IOException e) {
197                            onException(e);
198                        }
199                    }
200                } else {
201                    if (command.getClass() == WireFormatInfo.class) {
202                        synchronized (this) {
203                            IOException error = null;
204                            remoteWireFormatInfo = (WireFormatInfo) command;
205                            try {
206                                startMonitorThreads();
207                            } catch (IOException e) {
208                                error = e;
209                            }
210                            if (error != null) {
211                                onException(error);
212                            }
213                        }
214                    }
215                    synchronized (readChecker) {
216                        transportListener.onCommand(command);
217                    }
218                }
219            } finally {
220    
221                inReceive.set(false);
222            }
223        }
224    
225        public void oneway(Object o) throws IOException {
226            // Disable inactivity monitoring while processing a command.
227            //synchronize this method - its not synchronized
228            //further down the transport stack and gets called by more
229            //than one thread  by this class
230            synchronized(inSend) {
231                inSend.set(true);
232                try {
233    
234                    if( failed.get() ) {
235                        throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress());
236                    }
237                    if (o.getClass() == WireFormatInfo.class) {
238                        synchronized (this) {
239                            localWireFormatInfo = (WireFormatInfo)o;
240                            startMonitorThreads();
241                        }
242                    }
243                    next.oneway(o);
244                } finally {
245                    commandSent.set(true);
246                    inSend.set(false);
247                }
248            }
249        }
250    
251        public void onException(IOException error) {
252            if (failed.compareAndSet(false, true)) {
253                stopMonitorThreads();
254                transportListener.onException(error);
255            }
256        }
257    
258        public void setKeepAliveResponseRequired(boolean val) {
259            keepAliveResponseRequired = val;
260        }
261    
262        public void setIgnoreRemoteWireFormat(boolean val) {
263            ignoreRemoteWireFormat = val;
264        }
265    
266        private synchronized void startMonitorThreads() throws IOException {
267            if (monitorStarted.get()) {
268                return;
269            }
270            if (localWireFormatInfo == null) {
271                return;
272            }
273            if (remoteWireFormatInfo == null) {
274                return;
275            }
276    
277            if (!ignoreRemoteWireFormat) {
278                readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
279                initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
280            } else {
281                readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
282                initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
283            }
284    
285            if (readCheckTime > 0) {
286                monitorStarted.set(true);
287                writeCheckerTask = new SchedulerTimerTask(writeChecker);
288                readCheckerTask = new  SchedulerTimerTask(readChecker);
289                writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
290                synchronized( InactivityMonitor.class ) {
291                    if( CHECKER_COUNTER == 0 ) {
292                        READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
293                        WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
294                    }
295                    CHECKER_COUNTER++;
296                    WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime);
297                    READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime);
298                }
299            }
300        }
301    
302        /**
303         *
304         */
305        private synchronized void stopMonitorThreads() {
306            if (monitorStarted.compareAndSet(true, false)) {
307                readCheckerTask.cancel();
308                writeCheckerTask.cancel();
309                synchronized( InactivityMonitor.class ) {
310                    WRITE_CHECK_TIMER.purge();
311                    READ_CHECK_TIMER.purge();
312                    CHECKER_COUNTER--;
313                    if(CHECKER_COUNTER==0) {
314                        WRITE_CHECK_TIMER.cancel();
315                        READ_CHECK_TIMER.cancel();
316                        WRITE_CHECK_TIMER = null;
317                        READ_CHECK_TIMER = null;
318                    }
319                }
320            }
321        }
322    
323    
324        static {
325            ASYNC_TASKS =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
326                public Thread newThread(Runnable runnable) {
327                    Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
328                    thread.setDaemon(true);
329                    return thread;
330                }
331            });
332        }
333    
334    }