001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.cursors; 018 019 import java.util.ArrayList; 020 import java.util.Iterator; 021 import java.util.LinkedList; 022 import java.util.List; 023 import org.apache.activemq.broker.ConnectionContext; 024 import org.apache.activemq.broker.region.Destination; 025 import org.apache.activemq.broker.region.MessageReference; 026 import org.apache.activemq.broker.region.QueueMessageReference; 027 028 /** 029 * hold pending messages in a linked list (messages awaiting disptach to a 030 * consumer) cursor 031 * 032 * @version $Revision: 915914 $ 033 */ 034 public class VMPendingMessageCursor extends AbstractPendingMessageCursor { 035 private final LinkedList<MessageReference> list = new LinkedList<MessageReference>(); 036 private Iterator<MessageReference> iter; 037 public VMPendingMessageCursor() { 038 this.useCache = false; 039 } 040 041 @Override 042 public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) 043 throws Exception { 044 List<MessageReference> rc = new ArrayList<MessageReference>(); 045 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) { 046 MessageReference r = iterator.next(); 047 if (r.getRegionDestination() == destination) { 048 r.decrementReferenceCount(); 049 rc.add(r); 050 iterator.remove(); 051 } 052 } 053 return rc; 054 } 055 056 /** 057 * @return true if there are no pending messages 058 */ 059 @Override 060 public synchronized boolean isEmpty() { 061 if (list.isEmpty()) { 062 return true; 063 } else { 064 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) { 065 MessageReference node = iterator.next(); 066 if (node == QueueMessageReference.NULL_MESSAGE) { 067 continue; 068 } 069 if (!node.isDropped()) { 070 return false; 071 } 072 // We can remove dropped references. 073 iterator.remove(); 074 } 075 return true; 076 } 077 } 078 079 /** 080 * reset the cursor 081 */ 082 @Override 083 public synchronized void reset() { 084 iter = list.listIterator(); 085 last = null; 086 } 087 088 /** 089 * add message to await dispatch 090 * 091 * @param node 092 */ 093 @Override 094 public synchronized void addMessageLast(MessageReference node) { 095 node.incrementReferenceCount(); 096 list.addLast(node); 097 } 098 099 /** 100 * add message to await dispatch 101 * 102 * @param position 103 * @param node 104 */ 105 @Override 106 public synchronized void addMessageFirst(MessageReference node) { 107 node.incrementReferenceCount(); 108 list.addFirst(node); 109 } 110 111 /** 112 * @return true if there pending messages to dispatch 113 */ 114 @Override 115 public synchronized boolean hasNext() { 116 return iter.hasNext(); 117 } 118 119 /** 120 * @return the next pending message 121 */ 122 @Override 123 public synchronized MessageReference next() { 124 last = iter.next(); 125 if (last != null) { 126 last.incrementReferenceCount(); 127 } 128 return last; 129 } 130 131 /** 132 * remove the message at the cursor position 133 */ 134 @Override 135 public synchronized void remove() { 136 if (last != null) { 137 last.decrementReferenceCount(); 138 } 139 iter.remove(); 140 } 141 142 /** 143 * @return the number of pending messages 144 */ 145 @Override 146 public synchronized int size() { 147 return list.size(); 148 } 149 150 /** 151 * clear all pending messages 152 */ 153 @Override 154 public synchronized void clear() { 155 for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) { 156 MessageReference ref = i.next(); 157 ref.decrementReferenceCount(); 158 } 159 list.clear(); 160 } 161 162 @Override 163 public synchronized void remove(MessageReference node) { 164 for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) { 165 MessageReference ref = i.next(); 166 if (node.getMessageId().equals(ref.getMessageId())) { 167 ref.decrementReferenceCount(); 168 i.remove(); 169 break; 170 } 171 } 172 } 173 174 /** 175 * Page in a restricted number of messages 176 * 177 * @param maxItems 178 * @return a list of paged in messages 179 */ 180 @Override 181 public LinkedList<MessageReference> pageInList(int maxItems) { 182 LinkedList<MessageReference> result = new LinkedList<MessageReference>(); 183 for (MessageReference ref: list) { 184 ref.incrementReferenceCount(); 185 result.add(ref); 186 if (result.size() >= maxItems) { 187 break; 188 } 189 } 190 return result; 191 } 192 193 @Override 194 public boolean isTransient() { 195 return true; 196 } 197 198 @Override 199 public void destroy() throws Exception { 200 super.destroy(); 201 clear(); 202 } 203 }