001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.service.impl;
019    
020    import javax.jms.JMSException;
021    
022    import org.apache.commons.logging.Log;
023    import org.apache.commons.logging.LogFactory;
024    import org.activemq.message.ActiveMQMessage;
025    import org.activemq.message.MessageAck;
026    import org.activemq.service.DeadLetterPolicy;
027    import org.activemq.service.MessageContainerAdmin;
028    import org.activemq.service.MessageIdentity;
029    import org.activemq.service.QueueList;
030    import org.activemq.service.QueueListEntry;
031    import org.activemq.service.QueueMessageContainer;
032    import org.activemq.service.TransactionManager;
033    import org.activemq.service.TransactionTask;
034    import org.activemq.store.MessageStore;
035    import org.activemq.store.PersistenceAdapter;
036    import org.activemq.store.RecoveryListener;
037    
038    /**
039     * A default implementation of a Durable Queue based
040     * {@link org.activemq.service.MessageContainer}
041     * which acts as an adapter between the {@link org.activemq.service.MessageContainerManager}
042     * requirements and those of the persistent {@link MessageStore} implementations.
043     *
044     * @version $Revision: 1.1.1.1 $
045     */
046    public class DurableQueueMessageContainer implements QueueMessageContainer, MessageContainerAdmin {
047        private static final Log log = LogFactory.getLog(DurableQueueMessageContainer.class);
048    
049        private MessageStore messageStore;
050        private String destinationName;
051        private boolean deadLetterQueue;
052    
053        /**
054         * messages to be delivered
055         */
056        private QueueList messagesToBeDelivered;
057        /**
058         * messages that have been delivered but not acknowledged
059         */
060        private QueueList deliveredMessages;
061    
062        public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName) {
063            this(persistenceAdapter, messageStore, destinationName, new DefaultQueueList(), new DefaultQueueList());
064        }
065    
066        public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName, QueueList messagesToBeDelivered, QueueList deliveredMessages) {
067            this.messageStore = messageStore;
068            this.destinationName = destinationName;
069            this.messagesToBeDelivered = messagesToBeDelivered;
070            this.deliveredMessages = deliveredMessages;
071            this.deadLetterQueue = destinationName.startsWith(DeadLetterPolicy.DEAD_LETTER_PREFIX);
072        }
073    
074        public String getDestinationName() {
075            return destinationName;
076        }
077    
078        public void addMessage(ActiveMQMessage message) throws JMSException {
079            messageStore.addMessage(message);
080            final MessageIdentity answer = message.getJMSMessageIdentity();
081            
082            // If there is no transaction.. then this executes directly.
083            TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
084                public void execute() throws Throwable {
085                    synchronized( this ) {
086                            messagesToBeDelivered.add(answer);
087                    }
088                }
089            });     
090        }
091    
092        public synchronized void delete(final MessageIdentity messageID, MessageAck ack) throws JMSException {
093            
094            messageStore.removeMessage(ack);                
095    
096            TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
097                public void execute() throws Throwable {
098                    doDelete(messageID);
099                }
100            });     
101            
102        }
103    
104        /**
105         * @param messageID
106         * @param storedIdentity
107         * @throws JMSException
108         */
109        private void doDelete(MessageIdentity messageID) throws JMSException {
110            MessageIdentity storedIdentity=null;
111            synchronized( this ) {
112                    QueueListEntry entry = deliveredMessages.getFirstEntry();
113                    while (entry != null) {
114                        MessageIdentity identity = (MessageIdentity) entry.getElement();
115                        if (messageID.equals(identity)) {
116                            deliveredMessages.remove(entry);
117                            storedIdentity=identity;
118                            break;
119                        }
120                        entry = deliveredMessages.getNextEntry(entry);
121                    }
122                    
123                    if (storedIdentity==null) {
124                        // maybe the messages have not been delivered yet
125                        // as we are recovering from a previous transaction log
126                        entry = messagesToBeDelivered.getFirstEntry();
127                        while (entry != null) {
128                            MessageIdentity identity = (MessageIdentity) entry.getElement();
129                            if (messageID.equals(identity)) {                           
130                                messagesToBeDelivered.remove(entry);
131                                storedIdentity=identity;
132                                break;
133                            }
134                            entry = messagesToBeDelivered.getNextEntry(entry);
135                        }
136                    }
137            }
138            
139            if (storedIdentity==null) {
140                log.error("Attempt to acknowledge unknown messageID: " + messageID);
141            } else {
142            }
143        }
144    
145        public ActiveMQMessage getMessage(MessageIdentity messageID) throws JMSException {
146            return messageStore.getMessage(messageID);
147        }
148    
149    
150        public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
151            /** TODO: make more optimal implementation */
152            return getMessage(messageIdentity) != null;
153        }
154    
155        /**
156         * Does nothing since when we receive an acknowledgement on a queue
157         * we can delete the message
158         *
159         * @param messageIdentity
160         */
161        public void registerMessageInterest(MessageIdentity messageIdentity) {
162        }
163    
164        /**
165         * Does nothing since when we receive an acknowledgement on a queue
166         * we can delete the message
167         *
168         * @param messageIdentity
169         * @param ack
170         */
171        public void unregisterMessageInterest(MessageIdentity ack) {
172        }
173    
174        public ActiveMQMessage poll() throws JMSException {
175            ActiveMQMessage message = null;
176            MessageIdentity messageIdentity = null;
177            synchronized (this) {
178                messageIdentity = (MessageIdentity) messagesToBeDelivered.removeFirst();
179                if (messageIdentity != null) {
180                    deliveredMessages.add(messageIdentity);
181                }
182            }
183            if (messageIdentity != null) {
184                message = messageStore.getMessage(messageIdentity);
185            }
186            return message;
187        }
188    
189        public ActiveMQMessage peekNext(MessageIdentity messageID) throws JMSException {
190            ActiveMQMessage answer = null;
191            MessageIdentity identity = null;
192            synchronized( this ) {
193                if (messageID == null) {
194                    identity = (MessageIdentity) messagesToBeDelivered.getFirst();
195                }
196                else {
197                    int index = messagesToBeDelivered.indexOf(messageID);
198                    if (index >= 0 && (index + 1) < messagesToBeDelivered.size()) {
199                            identity = (MessageIdentity) messagesToBeDelivered.get(index + 1);
200                    }
201                }
202                
203            }
204            if (identity != null) {
205                answer = messageStore.getMessage(identity);
206            }
207            return answer;
208        }
209    
210    
211        public synchronized void returnMessage(MessageIdentity messageIdentity) throws JMSException {
212            boolean result = deliveredMessages.remove(messageIdentity);
213            messagesToBeDelivered.addFirst(messageIdentity);
214        }
215    
216        /**
217         * called to reset dispatch pointers if a new Message Consumer joins
218         *
219         * @throws javax.jms.JMSException
220         */
221        public synchronized void reset() throws JMSException {
222            //new Message Consumer - move all filtered/undispatched messages to front of queue
223            int count = 0;
224            MessageIdentity messageIdentity = (MessageIdentity) deliveredMessages.removeFirst();
225            while (messageIdentity != null) {
226                messagesToBeDelivered.add(count++, messageIdentity);
227                messageIdentity = (MessageIdentity) deliveredMessages.removeFirst();
228            }
229        }
230    
231        public synchronized void start() throws JMSException {
232            final QueueMessageContainer container = this;
233            messageStore.start();
234            messageStore.recover(new RecoveryListener() {
235                public void recoverMessage(MessageIdentity messageIdentity) throws JMSException {
236                    DurableQueueMessageContainer.this.recoverMessageToBeDelivered(messageIdentity);
237                }
238            });
239        }
240    
241        public synchronized void recoverMessageToBeDelivered(MessageIdentity messageIdentity) throws JMSException {
242            messagesToBeDelivered.add(messageIdentity);
243        }
244    
245        public void stop() throws JMSException {
246            messageStore.stop();
247        }
248    
249        /**
250         * @see org.activemq.service.MessageContainer#getMessageContainerAdmin()
251         */
252        public MessageContainerAdmin getMessageContainerAdmin() {
253            return this;
254        }
255    
256        /**
257         * @see org.activemq.service.MessageContainerAdmin#empty()
258         */
259        public void empty() throws JMSException {
260            messageStore.removeAllMessages();
261        }
262    
263        /**
264         * @see org.activemq.service.QueueMessageContainer#isDeadLetterQueue()
265         */
266        public boolean isDeadLetterQueue() {
267            return deadLetterQueue;
268        }
269    
270        /**
271         * @see org.activemq.service.QueueMessageContainer#setDeadLetterQueue(boolean)
272         */
273        public void setDeadLetterQueue(boolean value) {
274            deadLetterQueue = value;
275            
276        }
277        
278    }