001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.usage; 018 019 import java.util.ArrayList; 020 import java.util.Iterator; 021 import java.util.LinkedList; 022 import java.util.List; 023 import java.util.concurrent.CopyOnWriteArrayList; 024 import java.util.concurrent.Executor; 025 import java.util.concurrent.LinkedBlockingQueue; 026 import java.util.concurrent.ThreadFactory; 027 import java.util.concurrent.ThreadPoolExecutor; 028 import java.util.concurrent.TimeUnit; 029 import java.util.concurrent.atomic.AtomicBoolean; 030 031 import org.apache.activemq.Service; 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 035 /** 036 * Used to keep track of how much of something is being used so that a 037 * productive working set usage can be controlled. Main use case is manage 038 * memory usage. 039 * 040 * @org.apache.xbean.XBean 041 * @version $Revision: 1.3 $ 042 */ 043 public abstract class Usage<T extends Usage> implements Service { 044 045 private static final Log LOG = LogFactory.getLog(Usage.class); 046 private static ThreadPoolExecutor executor; 047 protected final Object usageMutex = new Object(); 048 protected int percentUsage; 049 protected T parent; 050 private UsageCapacity limiter = new DefaultUsageCapacity(); 051 private int percentUsageMinDelta = 1; 052 private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>(); 053 private final boolean debug = LOG.isDebugEnabled(); 054 private String name; 055 private float usagePortion = 1.0f; 056 private List<T> children = new CopyOnWriteArrayList<T>(); 057 private final List<Runnable> callbacks = new LinkedList<Runnable>(); 058 private int pollingTime = 100; 059 060 private AtomicBoolean started=new AtomicBoolean(); 061 062 public Usage(T parent, String name, float portion) { 063 this.parent = parent; 064 this.usagePortion = portion; 065 if (parent != null) { 066 this.limiter.setLimit((long)(parent.getLimit() * portion)); 067 name = parent.name + ":" + name; 068 } 069 this.name = name; 070 } 071 072 protected abstract long retrieveUsage(); 073 074 /** 075 * @throws InterruptedException 076 */ 077 public void waitForSpace() throws InterruptedException { 078 waitForSpace(0); 079 } 080 081 /** 082 * @param timeout 083 * @throws InterruptedException 084 * @return true if space 085 */ 086 public boolean waitForSpace(long timeout) throws InterruptedException { 087 if (parent != null) { 088 if (!parent.waitForSpace(timeout)) { 089 return false; 090 } 091 } 092 synchronized (usageMutex) { 093 percentUsage=caclPercentUsage(); 094 if (percentUsage >= 100) { 095 long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE; 096 long timeleft = deadline; 097 while (timeleft > 0) { 098 percentUsage=caclPercentUsage(); 099 if (percentUsage >= 100) { 100 usageMutex.wait(pollingTime); 101 timeleft = deadline - System.currentTimeMillis(); 102 } else { 103 break; 104 } 105 } 106 } 107 return percentUsage < 100; 108 } 109 } 110 111 public boolean isFull() { 112 if (parent != null && parent.isFull()) { 113 return true; 114 } 115 synchronized (usageMutex) { 116 percentUsage=caclPercentUsage(); 117 return percentUsage >= 100; 118 } 119 } 120 121 public void addUsageListener(UsageListener listener) { 122 listeners.add(listener); 123 } 124 125 public void removeUsageListener(UsageListener listener) { 126 listeners.remove(listener); 127 } 128 129 public long getLimit() { 130 synchronized (usageMutex) { 131 return limiter.getLimit(); 132 } 133 } 134 135 /** 136 * Sets the memory limit in bytes. Setting the limit in bytes will set the 137 * usagePortion to 0 since the UsageManager is not going to be portion based 138 * off the parent. 139 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 140 * 141 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 142 */ 143 public void setLimit(long limit) { 144 if (percentUsageMinDelta < 0) { 145 throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0"); 146 } 147 synchronized (usageMutex) { 148 this.limiter.setLimit(limit); 149 this.usagePortion = 0; 150 } 151 onLimitChange(); 152 } 153 154 protected void onLimitChange() { 155 // We may need to calculate the limit 156 if (usagePortion > 0 && parent != null) { 157 synchronized (usageMutex) { 158 this.limiter.setLimit((long)(parent.getLimit() * usagePortion)); 159 } 160 } 161 // Reset the percent currently being used. 162 int percentUsage; 163 synchronized (usageMutex) { 164 percentUsage = caclPercentUsage(); 165 } 166 setPercentUsage(percentUsage); 167 // Let the children know that the limit has changed. They may need to 168 // set 169 // their limits based on ours. 170 for (T child : children) { 171 child.onLimitChange(); 172 } 173 } 174 175 public float getUsagePortion() { 176 synchronized (usageMutex) { 177 return usagePortion; 178 } 179 } 180 181 public void setUsagePortion(float usagePortion) { 182 synchronized (usageMutex) { 183 this.usagePortion = usagePortion; 184 } 185 onLimitChange(); 186 } 187 188 public int getPercentUsage() { 189 synchronized (usageMutex) { 190 return percentUsage; 191 } 192 } 193 194 public int getPercentUsageMinDelta() { 195 synchronized (usageMutex) { 196 return percentUsageMinDelta; 197 } 198 } 199 200 /** 201 * Sets the minimum number of percentage points the usage has to change 202 * before a UsageListener event is fired by the manager. 203 * 204 * @param percentUsageMinDelta 205 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 206 */ 207 public void setPercentUsageMinDelta(int percentUsageMinDelta) { 208 if (percentUsageMinDelta < 1) { 209 throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0"); 210 } 211 int percentUsage; 212 synchronized (usageMutex) { 213 this.percentUsageMinDelta = percentUsageMinDelta; 214 percentUsage = caclPercentUsage(); 215 } 216 setPercentUsage(percentUsage); 217 } 218 219 public long getUsage() { 220 synchronized (usageMutex) { 221 return retrieveUsage(); 222 } 223 } 224 225 protected void setPercentUsage(int value) { 226 synchronized (usageMutex) { 227 int oldValue = percentUsage; 228 percentUsage = value; 229 if (oldValue != value) { 230 fireEvent(oldValue, value); 231 } 232 } 233 } 234 235 protected int caclPercentUsage() { 236 if (limiter.getLimit() == 0) { 237 return 0; 238 } 239 return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta); 240 } 241 242 private void fireEvent(final int oldPercentUsage, final int newPercentUsage) { 243 if (debug) { 244 LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " 245 + newPercentUsage + "% of available memory"); 246 } 247 if (started.get()) { 248 // Switching from being full to not being full.. 249 if (oldPercentUsage >= 100 && newPercentUsage < 100) { 250 synchronized (usageMutex) { 251 usageMutex.notifyAll(); 252 if (!callbacks.isEmpty()) { 253 for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) { 254 Runnable callback = iter.next(); 255 getExecutor().execute(callback); 256 } 257 callbacks.clear(); 258 } 259 } 260 } 261 if (!listeners.isEmpty()) { 262 // Let the listeners know on a separate thread 263 Runnable listenerNotifier = new Runnable() { 264 public void run() { 265 for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) { 266 UsageListener l = iter.next(); 267 l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage); 268 } 269 } 270 }; 271 if (started.get()) { 272 getExecutor().execute(listenerNotifier); 273 } else { 274 LOG.warn("Not notifying memory usage change to listeners on shutdown"); 275 } 276 } 277 } 278 } 279 280 public String getName() { 281 return name; 282 } 283 284 public String toString() { 285 return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%"; 286 } 287 288 @SuppressWarnings("unchecked") 289 public void start() { 290 if (started.compareAndSet(false, true)){ 291 if (parent != null) { 292 parent.addChild(this); 293 } 294 for (T t:children) { 295 t.start(); 296 } 297 } 298 } 299 300 @SuppressWarnings("unchecked") 301 public void stop() { 302 if (started.compareAndSet(true, false)){ 303 if (parent != null) { 304 parent.removeChild(this); 305 } 306 307 //clear down any callbacks 308 synchronized (usageMutex) { 309 usageMutex.notifyAll(); 310 for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) { 311 Runnable callback = iter.next(); 312 callback.run(); 313 } 314 this.callbacks.clear(); 315 } 316 for (T t:children) { 317 t.stop(); 318 } 319 } 320 } 321 322 private void addChild(T child) { 323 children.add(child); 324 if (started.get()) { 325 child.start(); 326 } 327 } 328 329 private void removeChild(T child) { 330 children.remove(child); 331 } 332 333 /** 334 * @param callback 335 * @return true if the UsageManager was full. The callback will only be 336 * called if this method returns true. 337 */ 338 public boolean notifyCallbackWhenNotFull(final Runnable callback) { 339 if (parent != null) { 340 Runnable r = new Runnable() { 341 342 public void run() { 343 synchronized (usageMutex) { 344 if (percentUsage >= 100) { 345 callbacks.add(callback); 346 } else { 347 callback.run(); 348 } 349 } 350 } 351 }; 352 if (parent.notifyCallbackWhenNotFull(r)) { 353 return true; 354 } 355 } 356 synchronized (usageMutex) { 357 if (percentUsage >= 100) { 358 callbacks.add(callback); 359 return true; 360 } else { 361 return false; 362 } 363 } 364 } 365 366 /** 367 * @return the limiter 368 */ 369 public UsageCapacity getLimiter() { 370 return this.limiter; 371 } 372 373 /** 374 * @param limiter the limiter to set 375 */ 376 public void setLimiter(UsageCapacity limiter) { 377 this.limiter = limiter; 378 } 379 380 /** 381 * @return the pollingTime 382 */ 383 public int getPollingTime() { 384 return this.pollingTime; 385 } 386 387 /** 388 * @param pollingTime the pollingTime to set 389 */ 390 public void setPollingTime(int pollingTime) { 391 this.pollingTime = pollingTime; 392 } 393 394 public void setName(String name) { 395 this.name = name; 396 } 397 398 public T getParent() { 399 return parent; 400 } 401 402 public void setParent(T parent) { 403 this.parent = parent; 404 } 405 406 protected Executor getExecutor() { 407 return executor; 408 } 409 410 static { 411 executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 412 public Thread newThread(Runnable runnable) { 413 Thread thread = new Thread(runnable, "Usage Async Task"); 414 thread.setDaemon(true); 415 return thread; 416 } 417 }); 418 } 419 420 }