001    package org.activemq.ra;
002    
003    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
004    
005    /**
006     */
007    public class CircularQueue {
008    
009        private final int size;
010    
011        private final SynchronizedBoolean stopping;
012    
013    
014        // For pooling objects
015        private final Object[] contents;
016        final private Object mutex = new Object();
017        //where the next worker to be supplied currently is.
018        private int start=0;
019        //where the next worker to be inserted will go
020        private int end=0;
021    
022        public CircularQueue(int size, SynchronizedBoolean stopping) {
023            this.size = size;
024            contents = new Object[size];
025            this.stopping = stopping;
026        }
027    
028        public Object get() {
029            synchronized(mutex) {
030                    while( true ) {
031                                    Object ew = contents[start];
032                    if (ew != null) {
033                        start++;
034                                    if(start == contents.length) {
035                                            start=0;
036                                    }
037                                    return ew;
038                            } else {
039                                    try {
040                                                    mutex.wait();
041                                                    if(stopping.get()) {
042                                                            return null;
043                                                    }
044                                            } catch (InterruptedException e) {
045                                                    return null;
046                                            }
047                            }
048                    }
049            }
050        }
051    
052        public void returnObject(Object worker) {
053            synchronized(mutex) {
054                    contents[end++] = worker;
055                if( end == contents.length) {
056                    end=0;
057                }
058                    mutex.notify();
059            }
060        }
061    
062        public int size() {
063            return contents.length;
064        }
065    
066        public void drain() {
067            int i = 0;
068            while (i < size) {
069                if (get() != null) {
070                    i++;
071                }
072            }
073        }
074    
075    
076        public void notifyWaiting() {
077            synchronized(mutex) {
078                    mutex.notifyAll();
079            }
080        }
081    
082    }