001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * Copyright 2005 Hiram Chirino
005     * 
006     * Licensed under the Apache License, Version 2.0 (the "License"); 
007     * you may not use this file except in compliance with the License. 
008     * You may obtain a copy of the License at 
009     * 
010     * http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS, 
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
015     * See the License for the specific language governing permissions and 
016     * limitations under the License. 
017     * 
018     **/
019    
020    package org.activemq.service.boundedvm;
021    import org.activemq.broker.BrokerClient;
022    import org.activemq.filter.Filter;
023    import org.activemq.io.util.MemoryBoundedQueue;
024    import org.activemq.io.util.MemoryBoundedQueueManager;
025    import org.activemq.io.util.MemoryManageable;
026    import org.activemq.message.ActiveMQDestination;
027    import org.activemq.message.ActiveMQMessage;
028    import org.activemq.message.ConsumerInfo;
029    import org.activemq.service.DeadLetterPolicy;
030    import org.activemq.service.MessageContainerAdmin;
031    import org.activemq.service.MessageIdentity;
032    import org.activemq.service.QueueListEntry;
033    import org.activemq.service.RedeliveryPolicy;
034    import org.activemq.service.Service;
035    import org.activemq.service.TransactionManager;
036    import org.activemq.service.TransactionTask;
037    import org.activemq.service.impl.DefaultQueueList;
038    import org.activemq.store.MessageStore;
039    import org.activemq.store.RecoveryListener;
040    import org.apache.commons.logging.Log;
041    import org.apache.commons.logging.LogFactory;
042    
043    import EDU.oswego.cs.dl.util.concurrent.Executor;
044    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
045    
046    import javax.jms.JMSException;
047    
048    import java.util.HashMap;
049    import java.util.List;
050    import java.util.Map;
051    
052    /**
053     * A MessageContainer for Durable queues
054     * 
055     * @version $Revision: 1.1.1.1 $
056     */
057    public class DurableQueueBoundedMessageContainer implements Service, Runnable, MessageContainerAdmin {
058    
059        private final MessageStore messageStore;
060        private final MemoryBoundedQueueManager queueManager;
061        private final ActiveMQDestination destination;
062        private final Executor threadPool;
063        private final DeadLetterPolicy deadLetterPolicy;
064        private final Log log;
065        private final MemoryBoundedQueue queue;
066        
067        private final DefaultQueueList subscriptions = new DefaultQueueList();
068        private final SynchronizedBoolean started = new SynchronizedBoolean(false);
069        private final SynchronizedBoolean running = new SynchronizedBoolean(false);
070        private final Object dispatchMutex = new Object();
071        private final Object subscriptionsMutex = new Object();
072    
073        private long idleTimestamp; //length of time (ms) there have been no active subscribers
074            
075        /**
076         * Construct this beast
077         * 
078         * @param threadPool
079         * @param queueManager
080         * @param destination
081         * @param redeliveryPolicy
082         * @param deadLetterPolicy
083         */
084        public DurableQueueBoundedMessageContainer(MessageStore messageStore, Executor threadPool, MemoryBoundedQueueManager queueManager,
085                ActiveMQDestination destination,RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
086            this.messageStore = messageStore;        
087            this.threadPool = threadPool;
088            this.queueManager = queueManager;
089            this.destination = destination;
090            this.deadLetterPolicy = deadLetterPolicy;
091            
092            this.queue = queueManager.getMemoryBoundedQueue("DURABLE_QUEUE:-" + destination.getPhysicalName());
093            this.log = LogFactory.getLog("DurableQueueBoundedMessageContainer:- " + destination);
094        }
095    
096        
097        /**
098         * @return true if there are subscribers waiting for messages
099         */
100        public boolean isActive(){
101            return !subscriptions.isEmpty();
102        }
103        
104        /**
105         * @return true if no messages are enqueued
106         */
107        public boolean isEmpty(){
108            return queue.isEmpty();
109        }
110        
111        /**
112         * @return the timestamp (ms) from the when the last active subscriber stopped
113         */
114        public long getIdleTimestamp(){
115            return idleTimestamp;
116        }
117        
118        
119    
120        /**
121         * Add a consumer to dispatch messages to
122         * 
123         * @param filter
124         * @param info
125         * @param client
126         * @return DurableQueueSubscription
127         * @throws JMSException
128         */
129        public DurableQueueSubscription addConsumer(Filter filter, ConsumerInfo info, BrokerClient client)
130                throws JMSException {
131            DurableQueueSubscription ts = findMatch(info);
132            if (ts == null) {
133                MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue("DURABLE_SUB:-"+info.getConsumerId());
134                MemoryBoundedQueue ackQueue = queueManager.getMemoryBoundedQueue("DURABLE_SUB_ACKED:-"+info.getConsumerId());
135                ts = new DurableQueueSubscription(client, queue, ackQueue, filter, info);
136                synchronized (subscriptionsMutex) {
137                    idleTimestamp = 0;
138                    subscriptions.add(ts);
139                    checkRunning();
140                }
141            }
142            return ts;
143        }
144    
145        /**
146         * Remove a consumer
147         * 
148         * @param info
149         * @throws JMSException
150         */
151        public void removeConsumer(ConsumerInfo info) throws JMSException {
152            DurableQueueSubscription ts = null;
153            synchronized (subscriptionsMutex) {
154                ts = findMatch(info);
155                if (ts != null) {
156    
157                    subscriptions.remove(ts);
158                    if (subscriptions.isEmpty()) {
159                        running.commit(true, false);
160                        idleTimestamp = System.currentTimeMillis();
161                    }
162                }
163            }
164            if (ts != null) {
165    
166                // get unacknowledged messages and re-enqueue them
167                List list = ts.getUndeliveredMessages();
168                for (int i = list.size() - 1; i >= 0; i--) {
169                    queue.enqueueFirstNoBlock((MemoryManageable) list.get(i));
170                }
171    
172                // If it is a queue browser, then re-enqueue the browsed
173                // messages.
174                if (ts.isBrowser()) {
175                    list = ts.listAckedMessages();
176                    for (int i = list.size() - 1; i >= 0; i--) {
177                        queue.enqueueFirstNoBlock((MemoryManageable) list
178                                .get(i));
179                    }
180                    ts.removeAllAckedMessages();
181                }
182    
183                ts.close();
184            }
185        }
186    
187        /**
188         * start working
189         * 
190         * @throws JMSException
191         */
192        public void start() throws JMSException {
193            if (started.commit(false, true)) {
194                messageStore.start();
195                
196                // Avoid recovery failing due to memory constraints.
197                this.queueManager.setMemoryLimitEnforced(false);
198                try {
199                    messageStore.recover(new RecoveryListener() {
200                        public void recoverMessage(MessageIdentity messageIdentity) throws JMSException {
201                            recoverMessageToBeDelivered(messageIdentity);
202                        }
203                    });
204                } finally {
205                    this.queueManager.setMemoryLimitEnforced(true);
206                }
207                
208                checkRunning();
209            }
210        }
211    
212        private void recoverMessageToBeDelivered(MessageIdentity msgId) throws JMSException {
213            DurableMessagePointer pointer = new DurableMessagePointer(messageStore, getDestination(), messageStore.getMessage(msgId));
214            queue.enqueue(pointer);
215        }
216    
217        /**
218         * enqueue a message for dispatching
219         * 
220         * @param message
221         * @throws JMSException 
222         */
223        public void enqueue(final ActiveMQMessage message) throws JMSException {
224            final DurableMessagePointer pointer = new DurableMessagePointer(messageStore, getDestination(), message);
225            if (message.isAdvisory()) {
226                doAdvisoryDispatchMessage(pointer);
227            }
228            else {            
229                messageStore.addMessage(message);            
230                // If there is no transaction.. then this executes directly.
231                TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
232                    public void execute() throws Throwable {
233                        queue.enqueue(pointer);
234                        checkRunning();
235                    }
236                });            
237            }
238        }
239    
240        public void redeliver(DurableMessagePointer message) {
241            queue.enqueueFirstNoBlock(message);
242            checkRunning();
243        }
244    
245        public void redeliver(List messages) {
246            queue.enqueueAllFirstNoBlock(messages);
247            checkRunning();
248        }
249    
250        /**
251         * stop working
252         */
253        public void stop() {
254            started.set(false);
255            running.set(false);
256            queue.clear();
257        }
258    
259        /**
260         * close down this container
261         * 
262         * @throws JMSException
263         */
264        public void close() throws JMSException {
265            if (started.get()) {
266                stop();
267            }
268            synchronized(subscriptionsMutex){
269                QueueListEntry entry = subscriptions.getFirstEntry();
270                while (entry != null) {
271                    DurableQueueSubscription ts = (DurableQueueSubscription) entry.getElement();
272                    ts.close();
273                    entry = subscriptions.getNextEntry(entry);
274                }
275                subscriptions.clear();
276            }
277        }
278    
279        /**
280         * do some dispatching
281         */
282        public void run() {
283            // Only allow one thread at a time to dispatch.
284            synchronized (dispatchMutex) {
285                boolean dispatched = false;
286                boolean targeted = false;
287                DurableMessagePointer messagePointer = null;
288                int notDispatchedCount = 0;
289                int sleepTime = 250;
290                int iterationsWithoutDispatchingBeforeStopping = 10000 / sleepTime;// ~10
291                                                                                    // seconds
292                Map messageParts = new HashMap();
293                try {
294                    while (started.get() && running.get()) {
295                        dispatched = false;
296                        targeted = false;
297                        synchronized (subscriptionsMutex) {
298                            if (!subscriptions.isEmpty()) {
299                                messagePointer = (DurableMessagePointer) queue
300                                        .dequeue(sleepTime);
301                                if (messagePointer != null) {
302                                    ActiveMQMessage message = messagePointer
303                                            .getMessage();
304                                    if (!message.isExpired()) {
305    
306                                        QueueListEntry entry = subscriptions
307                                                .getFirstEntry();
308                                        while (entry != null) {
309                                            DurableQueueSubscription ts = (DurableQueueSubscription) entry
310                                                    .getElement();
311                                            if (ts.isTarget(message)) {
312                                                targeted = true;
313                                                if (message.isMessagePart()) {
314                                                    DurableQueueSubscription sameTarget = (DurableQueueSubscription) messageParts
315                                                            .get(message
316                                                                    .getParentMessageID());
317                                                    if (sameTarget == null) {
318                                                        sameTarget = ts;
319                                                        messageParts
320                                                                .put(
321                                                                        message
322                                                                                .getParentMessageID(),
323                                                                        sameTarget);
324                                                    }
325                                                    sameTarget
326                                                            .doDispatch(messagePointer);
327                                                    if (message.isLastMessagePart()) {
328                                                        messageParts
329                                                                .remove(message
330                                                                        .getParentMessageID());
331                                                    }
332                                                    messagePointer = null;
333                                                    dispatched = true;
334                                                    notDispatchedCount = 0;
335                                                    break;
336                                                } else if (ts.canAcceptMessages()) {
337                                                    ts.doDispatch(messagePointer);
338                                                    messagePointer = null;
339                                                    dispatched = true;
340                                                    notDispatchedCount = 0;
341                                                    subscriptions.rotate();
342                                                    break;
343                                                }
344                                            }
345                                            entry = subscriptions
346                                                    .getNextEntry(entry);
347                                        }
348    
349                                    } else {
350                                        // expire message
351                                        if (log.isDebugEnabled()) {
352                                            log.debug("expired message: "
353                                                    + messagePointer);
354                                        }
355                                        if (deadLetterPolicy != null) {
356                                            deadLetterPolicy
357                                                    .sendToDeadLetter(messagePointer
358                                                            .getMessage());
359                                        }
360                                        messagePointer = null;
361                                    }
362                                }
363                            }
364                        }
365                        if (!dispatched) {
366                            if (messagePointer != null) {
367                                if (targeted) {
368                                    queue.enqueueFirstNoBlock(messagePointer);
369                                } else {
370                                    //no matching subscribers - dump to end and hope one shows up ...
371                                    queue.enqueueNoBlock(messagePointer);
372    
373                                }
374                            }
375                            if (running.get()) {
376                                if (notDispatchedCount++ > iterationsWithoutDispatchingBeforeStopping
377                                        && queue.isEmpty()) {
378                                    synchronized (running) {
379                                        running.commit(true, false);
380                                    }
381                                } else {
382                                    Thread.sleep(sleepTime);
383                                }
384                            }
385                        }
386                    }
387                } catch (InterruptedException ie) {
388                    //someone is stopping us from another thread
389                } catch (Throwable e) {
390                    log.warn("stop dispatching", e);
391                    stop();
392                }
393            }
394        }
395    
396        private DurableQueueSubscription findMatch(ConsumerInfo info) throws JMSException {
397            DurableQueueSubscription result = null;
398            synchronized (subscriptionsMutex) {
399                QueueListEntry entry = subscriptions.getFirstEntry();
400                while (entry != null) {
401                    DurableQueueSubscription ts = (DurableQueueSubscription) entry
402                            .getElement();
403                    if (ts.getConsumerInfo().equals(info)) {
404                        result = ts;
405                        break;
406                    }
407                    entry = subscriptions.getNextEntry(entry);
408                }
409            }
410            return result;
411        }
412    
413        /**
414         * @return the destination associated with this container
415         */
416        public ActiveMQDestination getDestination() {
417            return destination;
418        }
419    
420        /**
421         * @return the destination name
422         */
423        public String getDestinationName() {
424            return destination.getPhysicalName();
425        }
426    
427        protected void clear() {
428            queue.clear();
429        }
430    
431        protected void removeExpiredMessages() {
432            long currentTime = System.currentTimeMillis();
433            List list = queue.getContents();
434            for (int i = 0;i < list.size();i++) {
435                DurableMessagePointer msgPointer = (DurableMessagePointer) list.get(i);
436                ActiveMQMessage message = msgPointer.getMessage();
437                if (message.isExpired(currentTime)) {
438                    // TODO: remove message from message store.
439                    queue.remove(msgPointer);
440                    if (log.isDebugEnabled()) {
441                        log.debug("expired message: " + msgPointer);
442                    }
443                }
444            }
445        }
446        
447        protected void checkRunning(){
448            if (!running.get() && started.get() && !subscriptions.isEmpty()) {
449                synchronized (running) {
450                    if (running.commit(false, true)) {
451                        try {
452                            threadPool.execute(this);
453                        }
454                        catch (InterruptedException e) {
455                           log.error(this + " Couldn't start executing ",e);
456                        }
457                    }
458                }
459            }
460        }
461    
462    
463        /**
464         * @see org.activemq.service.MessageContainer#getMessageContainerAdmin()
465         */
466        public MessageContainerAdmin getMessageContainerAdmin() {
467            return this;
468        }
469    
470        /**
471         * @see org.activemq.service.MessageContainerAdmin#empty()
472         */
473        public void empty() throws JMSException {
474            if( subscriptions.isEmpty() ) {
475                messageStore.removeAllMessages();
476                queue.clear();
477            } else {
478                throw new JMSException("Cannot empty a queue while it is use.");
479            }
480        }
481        
482        /**
483         * Dispatch an Advisory Message
484         * @param messagePointer
485         */
486        private synchronized void doAdvisoryDispatchMessage(DurableMessagePointer messagePointer) {
487            ActiveMQMessage message = messagePointer.getMessage();
488            try {
489    
490                if (message.isAdvisory() && !message.isExpired()) {
491                    synchronized (subscriptionsMutex) {
492                        QueueListEntry entry = subscriptions.getFirstEntry();
493                        while (entry != null) {
494                            DurableQueueSubscription ts = (DurableQueueSubscription) entry
495                                    .getElement();
496                            if (ts.isTarget(message)) {
497                                ts.doDispatch(messagePointer);
498                                break;
499                            }
500                            entry = subscriptions.getNextEntry(entry);
501                        }
502                    }
503                }
504            } catch (JMSException jmsEx) {
505                log.warn("Failed to dispatch advisory", jmsEx);
506            }
507        }
508    
509    }