001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.usage;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.List;
023    import java.util.concurrent.CopyOnWriteArrayList;
024    import java.util.concurrent.Executor;
025    import java.util.concurrent.LinkedBlockingQueue;
026    import java.util.concurrent.ThreadFactory;
027    import java.util.concurrent.ThreadPoolExecutor;
028    import java.util.concurrent.TimeUnit;
029    import java.util.concurrent.atomic.AtomicBoolean;
030    
031    import org.apache.activemq.Service;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    /**
036     * Used to keep track of how much of something is being used so that a
037     * productive working set usage can be controlled. Main use case is manage
038     * memory usage.
039     * 
040     * @org.apache.xbean.XBean
041     * @version $Revision: 1.3 $
042     */
043    public abstract class Usage<T extends Usage> implements Service {
044    
045        private static final Log LOG = LogFactory.getLog(Usage.class);
046        private static ThreadPoolExecutor executor;
047        protected final Object usageMutex = new Object();
048        protected int percentUsage;
049        protected T parent;
050        private UsageCapacity limiter = new DefaultUsageCapacity();
051        private int percentUsageMinDelta = 1;
052        private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
053        private final boolean debug = LOG.isDebugEnabled();
054        private String name;
055        private float usagePortion = 1.0f;
056        private List<T> children = new CopyOnWriteArrayList<T>();
057        private final List<Runnable> callbacks = new LinkedList<Runnable>();
058        private int pollingTime = 100;
059        
060        private AtomicBoolean started=new AtomicBoolean();
061    
062        public Usage(T parent, String name, float portion) {
063            this.parent = parent;
064            this.usagePortion = portion;
065            if (parent != null) {
066                this.limiter.setLimit((long)(parent.getLimit() * portion));
067                name = parent.name + ":" + name;
068            }
069            this.name = name;
070        }
071    
072        protected abstract long retrieveUsage();
073    
074        /**
075         * @throws InterruptedException
076         */
077        public void waitForSpace() throws InterruptedException {
078            waitForSpace(0);
079        }
080    
081        /**
082         * @param timeout
083         * @throws InterruptedException
084         * @return true if space
085         */
086        public boolean waitForSpace(long timeout) throws InterruptedException {
087            if (parent != null) {
088                if (!parent.waitForSpace(timeout)) {
089                    return false;
090                }
091            }
092            synchronized (usageMutex) {
093                percentUsage=caclPercentUsage();
094                if (percentUsage >= 100) {
095                    long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
096                    long timeleft = deadline;
097                    while (timeleft > 0) {
098                        percentUsage=caclPercentUsage();
099                        if (percentUsage >= 100) {
100                            usageMutex.wait(pollingTime);
101                            timeleft = deadline - System.currentTimeMillis();
102                        } else {
103                            break;
104                        }
105                    }
106                }
107                return percentUsage < 100;
108            }
109        }
110    
111        public boolean isFull() {
112            if (parent != null && parent.isFull()) {
113                return true;
114            }
115            synchronized (usageMutex) {
116                percentUsage=caclPercentUsage();
117                return percentUsage >= 100;
118            }
119        }
120    
121        public void addUsageListener(UsageListener listener) {
122            listeners.add(listener);
123        }
124    
125        public void removeUsageListener(UsageListener listener) {
126            listeners.remove(listener);
127        }
128    
129        public long getLimit() {
130            synchronized (usageMutex) {
131                return limiter.getLimit();
132            }
133        }
134    
135        /**
136         * Sets the memory limit in bytes. Setting the limit in bytes will set the
137         * usagePortion to 0 since the UsageManager is not going to be portion based
138         * off the parent.
139         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
140         * 
141         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
142         */
143        public void setLimit(long limit) {
144            if (percentUsageMinDelta < 0) {
145                throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
146            }
147            synchronized (usageMutex) {
148                this.limiter.setLimit(limit);
149                this.usagePortion = 0;
150            }
151            onLimitChange();
152        }
153    
154        protected void onLimitChange() {
155            // We may need to calculate the limit
156            if (usagePortion > 0 && parent != null) {
157                synchronized (usageMutex) {
158                    this.limiter.setLimit((long)(parent.getLimit() * usagePortion));
159                }
160            }
161            // Reset the percent currently being used.
162            int percentUsage;
163            synchronized (usageMutex) {
164                percentUsage = caclPercentUsage();
165            }
166            setPercentUsage(percentUsage);
167            // Let the children know that the limit has changed. They may need to
168            // set
169            // their limits based on ours.
170            for (T child : children) {
171                child.onLimitChange();
172            }
173        }
174    
175        public float getUsagePortion() {
176            synchronized (usageMutex) {
177                return usagePortion;
178            }
179        }
180    
181        public void setUsagePortion(float usagePortion) {
182            synchronized (usageMutex) {
183                this.usagePortion = usagePortion;
184            }
185            onLimitChange();
186        }
187    
188        public int getPercentUsage() {
189            synchronized (usageMutex) {
190                return percentUsage;
191            }
192        }
193    
194        public int getPercentUsageMinDelta() {
195            synchronized (usageMutex) {
196                return percentUsageMinDelta;
197            }
198        }
199    
200        /**
201         * Sets the minimum number of percentage points the usage has to change
202         * before a UsageListener event is fired by the manager.
203         * 
204         * @param percentUsageMinDelta
205         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
206         */
207        public void setPercentUsageMinDelta(int percentUsageMinDelta) {
208            if (percentUsageMinDelta < 1) {
209                throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
210            }
211            int percentUsage;
212            synchronized (usageMutex) {
213                this.percentUsageMinDelta = percentUsageMinDelta;
214                percentUsage = caclPercentUsage();
215            }
216            setPercentUsage(percentUsage);
217        }
218    
219        public long getUsage() {
220            synchronized (usageMutex) {
221                return retrieveUsage();
222            }
223        }
224    
225        protected void setPercentUsage(int value) {
226            synchronized (usageMutex) {
227                int oldValue = percentUsage;
228                percentUsage = value;
229                if (oldValue != value) {
230                    fireEvent(oldValue, value);
231                }
232            }
233        }
234    
235        protected int caclPercentUsage() {
236            if (limiter.getLimit() == 0) {
237                return 0;
238            }
239            return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
240        }
241    
242        private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
243            if (debug) {
244                LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " 
245                    + newPercentUsage + "% of available memory");
246            }   
247            if (started.get()) {
248                // Switching from being full to not being full..
249                if (oldPercentUsage >= 100 && newPercentUsage < 100) {
250                    synchronized (usageMutex) {
251                        usageMutex.notifyAll();
252                        if (!callbacks.isEmpty()) {
253                            for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
254                                Runnable callback = iter.next();
255                                getExecutor().execute(callback);
256                            }
257                            callbacks.clear();
258                        }
259                    }
260                }
261                if (!listeners.isEmpty()) {
262                    // Let the listeners know on a separate thread
263                    Runnable listenerNotifier = new Runnable() {
264                        public void run() {
265                            for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
266                                UsageListener l = iter.next();
267                                l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
268                            }
269                        }
270                    };
271                    if (started.get()) {
272                        getExecutor().execute(listenerNotifier);
273                    } else {
274                        LOG.warn("Not notifying memory usage change to listeners on shutdown");
275                    }
276                }
277            }
278        }
279    
280        public String getName() {
281            return name;
282        }
283    
284        public String toString() {
285            return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%";
286        }
287    
288        @SuppressWarnings("unchecked")
289        public void start() {
290            if (started.compareAndSet(false, true)){
291                if (parent != null) {
292                    parent.addChild(this);
293                }
294                for (T t:children) {
295                    t.start();
296                }
297            }
298        }
299    
300        @SuppressWarnings("unchecked")
301        public void stop() {
302            if (started.compareAndSet(true, false)){
303                if (parent != null) {
304                    parent.removeChild(this);
305                }
306                
307                //clear down any callbacks
308                synchronized (usageMutex) {
309                    usageMutex.notifyAll();
310                    for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) {
311                        Runnable callback = iter.next();
312                        callback.run();
313                    }
314                    this.callbacks.clear();
315                }
316                for (T t:children) {
317                    t.stop();
318                }
319            }
320        }
321    
322        private void addChild(T child) {
323            children.add(child);
324            if (started.get()) {
325                child.start();
326            }
327        }
328    
329        private void removeChild(T child) {
330            children.remove(child);
331        }
332    
333        /**
334         * @param callback
335         * @return true if the UsageManager was full. The callback will only be
336         *         called if this method returns true.
337         */
338        public boolean notifyCallbackWhenNotFull(final Runnable callback) {
339            if (parent != null) {
340                Runnable r = new Runnable() {
341    
342                    public void run() {
343                        synchronized (usageMutex) {
344                            if (percentUsage >= 100) {
345                                callbacks.add(callback);
346                            } else {
347                                callback.run();
348                            }
349                        }
350                    }
351                };
352                if (parent.notifyCallbackWhenNotFull(r)) {
353                    return true;
354                }
355            }
356            synchronized (usageMutex) {
357                if (percentUsage >= 100) {
358                    callbacks.add(callback);
359                    return true;
360                } else {
361                    return false;
362                }
363            }
364        }
365    
366        /**
367         * @return the limiter
368         */
369        public UsageCapacity getLimiter() {
370            return this.limiter;
371        }
372    
373        /**
374         * @param limiter the limiter to set
375         */
376        public void setLimiter(UsageCapacity limiter) {
377            this.limiter = limiter;
378        }
379    
380        /**
381         * @return the pollingTime
382         */
383        public int getPollingTime() {
384            return this.pollingTime;
385        }
386    
387        /**
388         * @param pollingTime the pollingTime to set
389         */
390        public void setPollingTime(int pollingTime) {
391            this.pollingTime = pollingTime;
392        }
393    
394        public void setName(String name) {
395            this.name = name;
396        }
397    
398        public T getParent() {
399            return parent;
400        }
401    
402        public void setParent(T parent) {
403            this.parent = parent;
404        }
405        
406        protected Executor getExecutor() {
407            return executor;
408        }
409        
410        static {
411            executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
412                public Thread newThread(Runnable runnable) {
413                    Thread thread = new Thread(runnable, "Usage Async Task");
414                    thread.setDaemon(true);
415                    return thread;
416                }
417            });
418        }
419    
420    }