001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.service.impl; 019 020 import javax.jms.JMSException; 021 022 import org.apache.commons.logging.Log; 023 import org.apache.commons.logging.LogFactory; 024 import org.activemq.message.ActiveMQMessage; 025 import org.activemq.message.MessageAck; 026 import org.activemq.service.DeadLetterPolicy; 027 import org.activemq.service.MessageContainerAdmin; 028 import org.activemq.service.MessageIdentity; 029 import org.activemq.service.QueueList; 030 import org.activemq.service.QueueListEntry; 031 import org.activemq.service.QueueMessageContainer; 032 import org.activemq.service.TransactionManager; 033 import org.activemq.service.TransactionTask; 034 import org.activemq.store.MessageStore; 035 import org.activemq.store.PersistenceAdapter; 036 import org.activemq.store.RecoveryListener; 037 038 /** 039 * A default implementation of a Durable Queue based 040 * {@link org.activemq.service.MessageContainer} 041 * which acts as an adapter between the {@link org.activemq.service.MessageContainerManager} 042 * requirements and those of the persistent {@link MessageStore} implementations. 043 * 044 * @version $Revision: 1.1.1.1 $ 045 */ 046 public class DurableQueueMessageContainer implements QueueMessageContainer, MessageContainerAdmin { 047 private static final Log log = LogFactory.getLog(DurableQueueMessageContainer.class); 048 049 private MessageStore messageStore; 050 private String destinationName; 051 private boolean deadLetterQueue; 052 053 /** 054 * messages to be delivered 055 */ 056 private QueueList messagesToBeDelivered; 057 /** 058 * messages that have been delivered but not acknowledged 059 */ 060 private QueueList deliveredMessages; 061 062 public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName) { 063 this(persistenceAdapter, messageStore, destinationName, new DefaultQueueList(), new DefaultQueueList()); 064 } 065 066 public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName, QueueList messagesToBeDelivered, QueueList deliveredMessages) { 067 this.messageStore = messageStore; 068 this.destinationName = destinationName; 069 this.messagesToBeDelivered = messagesToBeDelivered; 070 this.deliveredMessages = deliveredMessages; 071 this.deadLetterQueue = destinationName.startsWith(DeadLetterPolicy.DEAD_LETTER_PREFIX); 072 } 073 074 public String getDestinationName() { 075 return destinationName; 076 } 077 078 public void addMessage(ActiveMQMessage message) throws JMSException { 079 messageStore.addMessage(message); 080 final MessageIdentity answer = message.getJMSMessageIdentity(); 081 082 // If there is no transaction.. then this executes directly. 083 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){ 084 public void execute() throws Throwable { 085 synchronized( this ) { 086 messagesToBeDelivered.add(answer); 087 } 088 } 089 }); 090 } 091 092 public synchronized void delete(final MessageIdentity messageID, MessageAck ack) throws JMSException { 093 094 messageStore.removeMessage(ack); 095 096 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){ 097 public void execute() throws Throwable { 098 doDelete(messageID); 099 } 100 }); 101 102 } 103 104 /** 105 * @param messageID 106 * @param storedIdentity 107 * @throws JMSException 108 */ 109 private void doDelete(MessageIdentity messageID) throws JMSException { 110 MessageIdentity storedIdentity=null; 111 synchronized( this ) { 112 QueueListEntry entry = deliveredMessages.getFirstEntry(); 113 while (entry != null) { 114 MessageIdentity identity = (MessageIdentity) entry.getElement(); 115 if (messageID.equals(identity)) { 116 deliveredMessages.remove(entry); 117 storedIdentity=identity; 118 break; 119 } 120 entry = deliveredMessages.getNextEntry(entry); 121 } 122 123 if (storedIdentity==null) { 124 // maybe the messages have not been delivered yet 125 // as we are recovering from a previous transaction log 126 entry = messagesToBeDelivered.getFirstEntry(); 127 while (entry != null) { 128 MessageIdentity identity = (MessageIdentity) entry.getElement(); 129 if (messageID.equals(identity)) { 130 messagesToBeDelivered.remove(entry); 131 storedIdentity=identity; 132 break; 133 } 134 entry = messagesToBeDelivered.getNextEntry(entry); 135 } 136 } 137 } 138 139 if (storedIdentity==null) { 140 log.error("Attempt to acknowledge unknown messageID: " + messageID); 141 } else { 142 } 143 } 144 145 public ActiveMQMessage getMessage(MessageIdentity messageID) throws JMSException { 146 return messageStore.getMessage(messageID); 147 } 148 149 150 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException { 151 /** TODO: make more optimal implementation */ 152 return getMessage(messageIdentity) != null; 153 } 154 155 /** 156 * Does nothing since when we receive an acknowledgement on a queue 157 * we can delete the message 158 * 159 * @param messageIdentity 160 */ 161 public void registerMessageInterest(MessageIdentity messageIdentity) { 162 } 163 164 /** 165 * Does nothing since when we receive an acknowledgement on a queue 166 * we can delete the message 167 * 168 * @param messageIdentity 169 * @param ack 170 */ 171 public void unregisterMessageInterest(MessageIdentity ack) { 172 } 173 174 public ActiveMQMessage poll() throws JMSException { 175 ActiveMQMessage message = null; 176 MessageIdentity messageIdentity = null; 177 synchronized (this) { 178 messageIdentity = (MessageIdentity) messagesToBeDelivered.removeFirst(); 179 if (messageIdentity != null) { 180 deliveredMessages.add(messageIdentity); 181 } 182 } 183 if (messageIdentity != null) { 184 message = messageStore.getMessage(messageIdentity); 185 } 186 return message; 187 } 188 189 public ActiveMQMessage peekNext(MessageIdentity messageID) throws JMSException { 190 ActiveMQMessage answer = null; 191 MessageIdentity identity = null; 192 synchronized( this ) { 193 if (messageID == null) { 194 identity = (MessageIdentity) messagesToBeDelivered.getFirst(); 195 } 196 else { 197 int index = messagesToBeDelivered.indexOf(messageID); 198 if (index >= 0 && (index + 1) < messagesToBeDelivered.size()) { 199 identity = (MessageIdentity) messagesToBeDelivered.get(index + 1); 200 } 201 } 202 203 } 204 if (identity != null) { 205 answer = messageStore.getMessage(identity); 206 } 207 return answer; 208 } 209 210 211 public synchronized void returnMessage(MessageIdentity messageIdentity) throws JMSException { 212 boolean result = deliveredMessages.remove(messageIdentity); 213 messagesToBeDelivered.addFirst(messageIdentity); 214 } 215 216 /** 217 * called to reset dispatch pointers if a new Message Consumer joins 218 * 219 * @throws javax.jms.JMSException 220 */ 221 public synchronized void reset() throws JMSException { 222 //new Message Consumer - move all filtered/undispatched messages to front of queue 223 int count = 0; 224 MessageIdentity messageIdentity = (MessageIdentity) deliveredMessages.removeFirst(); 225 while (messageIdentity != null) { 226 messagesToBeDelivered.add(count++, messageIdentity); 227 messageIdentity = (MessageIdentity) deliveredMessages.removeFirst(); 228 } 229 } 230 231 public synchronized void start() throws JMSException { 232 final QueueMessageContainer container = this; 233 messageStore.start(); 234 messageStore.recover(new RecoveryListener() { 235 public void recoverMessage(MessageIdentity messageIdentity) throws JMSException { 236 DurableQueueMessageContainer.this.recoverMessageToBeDelivered(messageIdentity); 237 } 238 }); 239 } 240 241 public synchronized void recoverMessageToBeDelivered(MessageIdentity messageIdentity) throws JMSException { 242 messagesToBeDelivered.add(messageIdentity); 243 } 244 245 public void stop() throws JMSException { 246 messageStore.stop(); 247 } 248 249 /** 250 * @see org.activemq.service.MessageContainer#getMessageContainerAdmin() 251 */ 252 public MessageContainerAdmin getMessageContainerAdmin() { 253 return this; 254 } 255 256 /** 257 * @see org.activemq.service.MessageContainerAdmin#empty() 258 */ 259 public void empty() throws JMSException { 260 messageStore.removeAllMessages(); 261 } 262 263 /** 264 * @see org.activemq.service.QueueMessageContainer#isDeadLetterQueue() 265 */ 266 public boolean isDeadLetterQueue() { 267 return deadLetterQueue; 268 } 269 270 /** 271 * @see org.activemq.service.QueueMessageContainer#setDeadLetterQueue(boolean) 272 */ 273 public void setDeadLetterQueue(boolean value) { 274 deadLetterQueue = value; 275 276 } 277 278 }