001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.List;
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.broker.region.Destination;
025    import org.apache.activemq.broker.region.MessageReference;
026    import org.apache.activemq.broker.region.QueueMessageReference;
027    
028    /**
029     * hold pending messages in a linked list (messages awaiting disptach to a
030     * consumer) cursor
031     * 
032     * @version $Revision: 915914 $
033     */
034    public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
035        private final LinkedList<MessageReference> list = new LinkedList<MessageReference>();
036        private Iterator<MessageReference> iter;
037        public VMPendingMessageCursor() {
038            this.useCache = false;
039        }
040    
041        @Override
042        public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination)
043                throws Exception {
044            List<MessageReference> rc = new ArrayList<MessageReference>();
045            for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
046                MessageReference r = iterator.next();
047                if (r.getRegionDestination() == destination) {
048                    r.decrementReferenceCount();
049                    rc.add(r);
050                    iterator.remove();
051                }
052            }
053            return rc;
054        }
055    
056        /**
057         * @return true if there are no pending messages
058         */
059        @Override
060        public synchronized boolean isEmpty() {
061            if (list.isEmpty()) {
062                return true;
063            } else {
064                for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
065                    MessageReference node = iterator.next();
066                    if (node == QueueMessageReference.NULL_MESSAGE) {
067                        continue;
068                    }
069                    if (!node.isDropped()) {
070                        return false;
071                    }
072                    // We can remove dropped references.
073                    iterator.remove();
074                }
075                return true;
076            }
077        }
078    
079        /**
080         * reset the cursor
081         */
082        @Override
083        public synchronized void reset() {
084            iter = list.listIterator();
085            last = null;
086        }
087    
088        /**
089         * add message to await dispatch
090         * 
091         * @param node
092         */
093        @Override
094        public synchronized void addMessageLast(MessageReference node) {
095            node.incrementReferenceCount();
096            list.addLast(node);
097        }
098    
099        /**
100         * add message to await dispatch
101         * 
102         * @param position
103         * @param node
104         */
105        @Override
106        public synchronized void addMessageFirst(MessageReference node) {
107            node.incrementReferenceCount();
108            list.addFirst(node);
109        }
110    
111        /**
112         * @return true if there pending messages to dispatch
113         */
114        @Override
115        public synchronized boolean hasNext() {
116            return iter.hasNext();
117        }
118    
119        /**
120         * @return the next pending message
121         */
122        @Override
123        public synchronized MessageReference next() {
124            last = iter.next();
125            if (last != null) {
126                last.incrementReferenceCount();
127            }
128            return last;
129        }
130    
131        /**
132         * remove the message at the cursor position
133         */
134        @Override
135        public synchronized void remove() {
136            if (last != null) {
137                last.decrementReferenceCount();
138            }
139            iter.remove();
140        }
141    
142        /**
143         * @return the number of pending messages
144         */
145        @Override
146        public synchronized int size() {
147            return list.size();
148        }
149    
150        /**
151         * clear all pending messages
152         */
153        @Override
154        public synchronized void clear() {
155            for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
156                MessageReference ref = i.next();
157                ref.decrementReferenceCount();
158            }
159            list.clear();
160        }
161    
162        @Override
163        public synchronized void remove(MessageReference node) {
164            for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
165                MessageReference ref = i.next();
166                if (node.getMessageId().equals(ref.getMessageId())) {
167                    ref.decrementReferenceCount();
168                    i.remove();
169                    break;
170                }
171            }
172        }
173    
174        /**
175         * Page in a restricted number of messages
176         * 
177         * @param maxItems
178         * @return a list of paged in messages
179         */
180        @Override
181        public LinkedList<MessageReference> pageInList(int maxItems) {
182            LinkedList<MessageReference> result = new LinkedList<MessageReference>();
183            for (MessageReference ref: list) {
184                ref.incrementReferenceCount();
185                result.add(ref);
186                if (result.size() >= maxItems) {
187                    break;
188                }
189            }
190            return result;
191        }
192    
193        @Override
194        public boolean isTransient() {
195            return true;
196        }
197    
198        @Override
199        public void destroy() throws Exception {
200            super.destroy();
201            clear();
202        }
203    }