001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 import java.util.Iterator; 020 import java.util.Set; 021 import java.util.concurrent.CopyOnWriteArraySet; 022 import java.util.concurrent.atomic.AtomicBoolean; 023 024 import org.apache.activemq.Service; 025 import org.apache.activemq.ThreadPriorities; 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 029 /** 030 * Used to provide information on the status of the Connection 031 * 032 * @version $Revision: 1.5 $ 033 */ 034 public class TransportStatusDetector implements Service, Runnable { 035 private static final Log LOG = LogFactory.getLog(TransportStatusDetector.class); 036 private TransportConnector connector; 037 private Set<TransportConnection> collectionCandidates = new CopyOnWriteArraySet<TransportConnection>(); 038 private AtomicBoolean started = new AtomicBoolean(false); 039 private Thread runner; 040 private int sweepInterval = 5000; 041 042 TransportStatusDetector(TransportConnector connector) { 043 this.connector = connector; 044 } 045 046 /** 047 * @return Returns the sweepInterval. 048 */ 049 public int getSweepInterval() { 050 return sweepInterval; 051 } 052 053 /** 054 * The sweepInterval to set. 055 * 056 * @param sweepInterval 057 */ 058 public void setSweepInterval(int sweepInterval) { 059 this.sweepInterval = sweepInterval; 060 } 061 062 protected void doCollection() { 063 for (Iterator<TransportConnection> i = collectionCandidates.iterator(); i.hasNext();) { 064 TransportConnection tc = i.next(); 065 if (tc.isMarkedCandidate()) { 066 if (tc.isBlockedCandidate()) { 067 collectionCandidates.remove(tc); 068 doCollection(tc); 069 } else { 070 tc.doMark(); 071 } 072 } else { 073 collectionCandidates.remove(tc); 074 } 075 } 076 } 077 078 protected void doSweep() { 079 for (Iterator i = connector.getConnections().iterator(); i.hasNext();) { 080 TransportConnection connection = (TransportConnection)i.next(); 081 if (connection.isMarkedCandidate()) { 082 connection.doMark(); 083 collectionCandidates.add(connection); 084 } 085 } 086 } 087 088 protected void doCollection(TransportConnection tc) { 089 LOG.warn("found a blocked client - stopping: " + tc); 090 try { 091 tc.stop(); 092 } catch (Exception e) { 093 LOG.error("Error stopping " + tc, e); 094 } 095 } 096 097 public void run() { 098 while (started.get()) { 099 try { 100 doCollection(); 101 doSweep(); 102 Thread.sleep(sweepInterval); 103 } catch (Throwable e) { 104 LOG.error("failed to complete a sweep for blocked clients", e); 105 } 106 } 107 } 108 109 public void start() throws Exception { 110 if (started.compareAndSet(false, true)) { 111 runner = new Thread(this, "ActiveMQ Transport Status Monitor: " + connector); 112 runner.setDaemon(true); 113 runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT); 114 runner.start(); 115 } 116 } 117 118 public void stop() throws Exception { 119 started.set(false); 120 if (runner != null) { 121 runner.join(getSweepInterval() * 5); 122 } 123 } 124 }