001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * Copyright 2005 Hiram Chirino
005     * 
006     * Licensed under the Apache License, Version 2.0 (the "License"); 
007     * you may not use this file except in compliance with the License. 
008     * You may obtain a copy of the License at 
009     * 
010     * http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS, 
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
015     * See the License for the specific language governing permissions and 
016     * limitations under the License. 
017     * 
018     **/
019    
020    package org.activemq.service.boundedvm;
021    import java.util.Collections;
022    import java.util.HashMap;
023    import java.util.Iterator;
024    import java.util.LinkedList;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Set;
028    
029    import javax.jms.JMSException;
030    
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    import org.activemq.broker.BrokerClient;
034    import org.activemq.filter.AndFilter;
035    import org.activemq.filter.DestinationMap;
036    import org.activemq.filter.Filter;
037    import org.activemq.filter.FilterFactory;
038    import org.activemq.filter.FilterFactoryImpl;
039    import org.activemq.filter.NoLocalFilter;
040    import org.activemq.io.util.MemoryBoundedQueueManager;
041    import org.activemq.message.ActiveMQDestination;
042    import org.activemq.message.ActiveMQMessage;
043    import org.activemq.message.ActiveMQQueue;
044    import org.activemq.message.ConsumerInfo;
045    import org.activemq.message.MessageAck;
046    import org.activemq.service.DeadLetterPolicy;
047    import org.activemq.service.MessageContainer;
048    import org.activemq.service.MessageContainerManager;
049    import org.activemq.service.RedeliveryPolicy;
050    import org.activemq.service.TransactionManager;
051    import org.activemq.service.TransactionTask;
052    import org.activemq.store.MessageStore;
053    import org.activemq.store.PersistenceAdapter;
054    
055    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
056    import EDU.oswego.cs.dl.util.concurrent.Executor;
057    import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
058    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
059    import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
060    
061    /**
062     * A MessageContainerManager for Durable queues
063     * 
064     * @version $Revision: 1.1.1.1 $
065     */
066    public class DurableQueueBoundedMessageManager implements MessageContainerManager, Runnable {
067        private static final int DEFAULT_GARBAGE_COLLECTION_CAPACITY_LIMIT = 10;
068        private static final long DEFAULT_INACTIVE_TIMEOUT = 30 * 1000;
069        private static final Log log = LogFactory.getLog(DurableQueueBoundedMessageManager.class);
070        private MemoryBoundedQueueManager queueManager;
071        private ConcurrentHashMap containers;
072        private ConcurrentHashMap subscriptions;
073        private FilterFactory filterFactory;
074        private SynchronizedBoolean started;
075        private SynchronizedBoolean doingGarbageCollection;
076        private Map destinations;
077        private DestinationMap destinationMap;
078        private PooledExecutor threadPool;
079        private long inactiveTimeout;
080        private int garbageCoolectionCapacityLimit;
081        private RedeliveryPolicy redeliveryPolicy;
082        private DeadLetterPolicy deadLetterPolicy;
083        private final PersistenceAdapter persistenceAdapter;
084        
085        protected static class DurableQueueThreadFactory implements ThreadFactory {
086            /**
087             * @param command - command to run
088             * @return a new thread
089             */
090            public Thread newThread(Runnable command) {
091                Thread result = new Thread(command, "Durable Queue Worker");
092                result.setPriority(Thread.NORM_PRIORITY + 1);
093                result.setDaemon(true);
094                return result;
095            }
096        }
097    
098        /**
099         * Constructor for DurableQueueBoundedMessageManager
100         * 
101         * @param mgr
102         * @param redeliveryPolicy
103         * @param deadLetterPolicy
104         */
105        public DurableQueueBoundedMessageManager(PersistenceAdapter persistenceAdapter, MemoryBoundedQueueManager mgr, RedeliveryPolicy redeliveryPolicy,
106                DeadLetterPolicy deadLetterPolicy) {
107            this.persistenceAdapter = persistenceAdapter;
108            this.queueManager = mgr;
109            this.redeliveryPolicy = redeliveryPolicy;
110            this.deadLetterPolicy = deadLetterPolicy;
111            this.containers = new ConcurrentHashMap();
112            this.destinationMap = new DestinationMap();
113            this.destinations = new ConcurrentHashMap();
114            this.subscriptions = new ConcurrentHashMap();
115            this.filterFactory = new FilterFactoryImpl();
116            this.started = new SynchronizedBoolean(false);
117            this.doingGarbageCollection = new SynchronizedBoolean(false);
118            this.threadPool = new PooledExecutor();
119            this.threadPool.setThreadFactory(new DurableQueueThreadFactory());
120            this.inactiveTimeout = DEFAULT_INACTIVE_TIMEOUT;
121            this.garbageCoolectionCapacityLimit = DEFAULT_GARBAGE_COLLECTION_CAPACITY_LIMIT;
122        }
123    
124        /**
125         * @return Returns the garbageCoolectionCapacityLimit.
126         */
127        public int getGarbageCoolectionCapacityLimit() {
128            return garbageCoolectionCapacityLimit;
129        }
130    
131        /**
132         * @param garbageCoolectionCapacityLimit The garbageCoolectionCapacityLimit to set.
133         */
134        public void setGarbageCoolectionCapacityLimit(int garbageCoolectionCapacityLimit) {
135            this.garbageCoolectionCapacityLimit = garbageCoolectionCapacityLimit;
136        }
137    
138        /**
139         * @return Returns the inactiveTimeout.
140         */
141        public long getInactiveTimeout() {
142            return inactiveTimeout;
143        }
144    
145        /**
146         * @param inactiveTimeout The inactiveTimeout to set.
147         */
148        public void setInactiveTimeout(long inactiveTimeout) {
149            this.inactiveTimeout = inactiveTimeout;
150        }
151    
152        /**
153         * start the manager
154         * 
155         * @throws JMSException
156         */
157        public void start() throws JMSException {
158            if (started.commit(false, true)) {
159                for (Iterator i = containers.values().iterator();i.hasNext();) {
160                    DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next();
161                    container.start();
162                }
163                try {
164                    threadPool.execute(this);//start garbage collection
165                }
166                catch (InterruptedException e) {
167                    JMSException jmsEx = new JMSException("Garbage collection interupted on start()");
168                    jmsEx.setLinkedException(e);
169                    throw jmsEx;
170                }
171            }
172        }
173    
174        /**
175         * stop the manager
176         * 
177         * @throws JMSException
178         */
179        public void stop() throws JMSException {
180            if (started.commit(true, false)) {
181                for (Iterator i = containers.values().iterator();i.hasNext();) {
182                    DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next();
183                    container.stop();
184                }
185                threadPool.interruptAll();
186                            threadPool.shutdownNow();
187            }
188        }
189    
190        /**
191         * collect expired messages
192         */
193        public void run() {
194            while (started.get()) {
195                doGarbageCollection();
196                try {
197                    Thread.sleep(2000);
198                }
199                catch (InterruptedException e) {
200                }
201            }
202        }
203    
204        /**
205         * Add a consumer if appropiate
206         * 
207         * @param client
208         * @param info
209         * @throws JMSException
210         */
211        public synchronized void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
212            ActiveMQDestination destination = info.getDestination();
213            if ( !destination.isQueue() || destination.isTemporary() )
214                return;
215                
216            DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) containers
217                    .get(destination);
218            if (container == null) {
219                container = createContainer(destination,false);
220            }
221            if (log.isDebugEnabled()) {
222                log.debug("Adding consumer: " + info);
223            }
224            
225            DurableQueueSubscription ts = container.addConsumer(createFilter(info), info, client);
226            if (ts != null) {
227                subscriptions.put(info.getConsumerId(), ts);
228            }
229            String name = destination.getPhysicalName();
230            if (!destinations.containsKey(name)) {
231                destinations.put(name, destination);
232            }
233        }
234    
235        /**
236         * @param client
237         * @param destination
238         * @param isDeadLetterQueue is this queue a dead letter queue 
239         * @return the container
240         * @throws JMSException
241         */
242        private DurableQueueBoundedMessageContainer createContainer(ActiveMQDestination destination, boolean isDeadLetterQueue) throws JMSException {
243            MessageStore messageStore = persistenceAdapter.createQueueMessageStore(destination.getPhysicalName());
244            DurableQueueBoundedMessageContainer container = new DurableQueueBoundedMessageContainer(messageStore, threadPool, queueManager, destination, isDeadLetterQueue ? null : redeliveryPolicy, deadLetterPolicy);
245            addContainer(container);
246            if (started.get()) {
247                container.start();            
248            }
249            return container;
250        }
251    
252        /**
253         * @param client
254         * @param info
255         * @throws JMSException
256         */
257        public synchronized void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
258            ActiveMQDestination destination = info.getDestination();
259            if ( !destination.isQueue() || destination.isTemporary() )
260                return;
261            
262            for (Iterator i = containers.values().iterator();i.hasNext();) {
263                DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next();
264                if (container != null) {
265                    container.removeConsumer(info);
266                }
267            }
268            subscriptions.remove(info.getConsumerId());
269        }
270    
271        /**
272         * Delete a durable subscriber
273         * 
274         * @param clientId
275         * @param subscriberName
276         * @throws JMSException if the subscriber doesn't exist or is still active
277         */
278        public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
279        }
280    
281        /**
282         * @param client
283         * @param message
284         * @throws JMSException
285         */
286        public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException {
287            ActiveMQDestination destination = message.getJMSActiveMQDestination();
288            if (!destination.isQueue() || destination.isTemporary() || !message.isPersistent())
289                return;
290    
291            if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) {
292                doGarbageCollection();
293            }
294            DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) containers.get(destination);
295            if (container == null) {
296                container = createContainer(destination, false);
297            }
298            
299            Set set = destinationMap.get(message.getJMSActiveMQDestination());
300            for (Iterator i = set.iterator();i.hasNext();) {
301                container = (DurableQueueBoundedMessageContainer) i.next();
302                container.enqueue(message);
303            }
304        }
305    
306        /**
307         * @param client
308         * @param ack
309         * @throws JMSException
310         */
311        public void acknowledgeMessage(final BrokerClient client, final MessageAck ack) throws JMSException {
312            
313            ActiveMQDestination destination = ack.getDestination();
314            if (destination == null) {
315                log.warn("Ignoring acknowledgeMessage() on null destination: " + ack);
316                return;
317            }
318            if (!destination.isQueue() || destination.isTemporary() || !ack.isPersistent())
319                return;
320    
321            final DurableQueueSubscription ts = (DurableQueueSubscription) subscriptions.get(ack.getConsumerId());
322            if (ts == null)
323                return;
324    
325            final DurableMessagePointer messagePointer = ts.acknowledgeMessage(ack.getMessageID());
326            if (messagePointer == null )
327                return;
328            
329            if( ts.isBrowser() ) {
330                ts.addAckedMessage(messagePointer);
331                return;
332            }
333            
334            if (!ack.isMessageRead() || ack.isExpired()) {
335                redeliverMessage(ts, ack, messagePointer);
336                return;
337            } 
338            
339            // Let the message store ack the message.
340            messagePointer.getMessageStore().removeMessage(ack);        
341            if (TransactionManager.isCurrentTransaction()) {
342                // Hook in a callback on first acked message
343                if (!ts.hasAckedMessage()) {
344                    TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask() {
345                        public void execute() throws Throwable {
346                            
347                            List ackList = ts.listAckedMessages();
348                            HashMap redeliverMap = new HashMap();
349                           //for (int x = ackList.size()-1; x >= 0 ; x--){    
350                           for (int x = 0; x < ackList.size(); x++){    
351                                DurableMessagePointer messagePointer = (DurableMessagePointer) ackList.get(x);
352                                ActiveMQMessage message = messagePointer.getMessage();
353                                message.setJMSRedelivered(true);
354                                if (message.incrementRedeliveryCount() > redeliveryPolicy.getMaximumRetryCount()) {
355                                    if (log.isDebugEnabled()){
356                                        log.debug("Message: " + message + " has exceeded its retry count");
357                                    }
358                                    // TODO: see if we can use the deadLetterPolicy of the container that dispatched the message.
359                                    deadLetterPolicy.sendToDeadLetter(message);
360                                }
361                                else if (ack.isExpired()) {
362                                    if (log.isDebugEnabled()){
363                                        log.debug("Message: " + message + " has expired");
364                                    }
365                                    // TODO: see if we can use the deadLetterPolicy of the container that dispatched the message.
366                                    deadLetterPolicy.sendToDeadLetter(message);
367                                }
368                                else {
369                                    Set containers = destinationMap.get(message.getJMSActiveMQDestination());
370                                    for (Iterator i = containers.iterator();i.hasNext();) {
371                                        DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next();
372                                        LinkedList l = (LinkedList) redeliverMap.get(container);
373                                        if( l==null ) {
374                                            l = new LinkedList();
375                                            redeliverMap.put(container, l);
376                                        }
377                                        l.add(messagePointer);
378                                    }
379                                }
380                            }
381                            
382                            for (Iterator i = redeliverMap.keySet().iterator();i.hasNext();) {
383                                DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next();
384                                List l = (List) redeliverMap.get(container);
385                                container.redeliver(l);
386                            }
387                            
388                            ts.removeAllAckedMessages();
389                        }
390                    });
391                    TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() {
392                        public void execute() throws Throwable {
393                            ts.removeAllAckedMessages();
394                        }
395                    });
396                }
397                // We need to keep track of messages that were acked. If we
398                // rollback.
399                ts.addAckedMessage(messagePointer);
400            }
401        }
402    
403        /**
404         * @param client
405         * @param ack
406         * @param message 
407         * @throws JMSException
408         */
409        private void redeliverMessage(DurableQueueSubscription ts, MessageAck ack, DurableMessagePointer message) throws JMSException {
410            message.getMessage().setJMSRedelivered(true);
411            if (message.incrementDeliveryCount() >= redeliveryPolicy.getMaximumRetryCount()) {
412                if (log.isDebugEnabled()){
413                    log.debug("Message: " + message + " has exceeded its retry count");
414                }
415                deadLetterPolicy.sendToDeadLetter(message.getMessage());
416            }
417            else if (ack.isExpired()) {
418                if (log.isDebugEnabled()){
419                    log.debug("Message: " + message + " has expired");
420                }
421                deadLetterPolicy.sendToDeadLetter(message.getMessage());
422            }
423            else {
424                Set set = destinationMap.get(message.getMessage().getJMSActiveMQDestination());
425                for (Iterator i = set.iterator();i.hasNext();) {
426                    DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next();
427                    container.redeliver(message);
428                    break;
429                }
430            }
431        }
432        
433        /**
434         * @throws JMSException
435         */
436        public void poll() throws JMSException {
437        }
438    
439        /**
440         * @param physicalName
441         * @return MessageContainer
442         * @throws JMSException
443         */
444        public MessageContainer getContainer(String physicalName) throws JMSException {
445            ActiveMQDestination key = (ActiveMQDestination) destinations.get(physicalName);
446            if (key != null) {
447                return (MessageContainer) containers.get(key);
448            }
449            return null;
450        }
451    
452        /**
453         * @return a map of destinations
454         */
455        public Map getDestinations() {
456            return Collections.unmodifiableMap(containers);
457        }
458    
459        /**
460         * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination}
461         * objects used by non-broker consumers directly connected to this container
462         *
463         * @return
464         */
465        public Map getLocalDestinations() {
466            Map localDestinations = new HashMap();
467            for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
468                DurableSubscription sub = (DurableSubscription) iter.next();
469                if (sub.isLocalSubscription()) {
470                    final ActiveMQDestination dest = sub.getDestination();
471                    localDestinations.put(dest.getPhysicalName(), dest);
472                }
473            }
474            return Collections.unmodifiableMap(localDestinations);
475        }
476    
477        /**
478         * @return the DeadLetterPolicy for this Container Manager
479         */
480        public DeadLetterPolicy getDeadLetterPolicy() {
481            return deadLetterPolicy;
482        }
483    
484        /**
485         * Set the DeadLetterPolicy for this Container Manager
486         * 
487         * @param policy
488         */
489        public void setDeadLetterPolicy(DeadLetterPolicy policy) {
490            this.deadLetterPolicy = policy;
491        }
492    
493        /**
494         * Create filter for a Consumer
495         * 
496         * @param info
497         * @return the Fitler
498         * @throws javax.jms.JMSException
499         */
500        protected Filter createFilter(ConsumerInfo info) throws JMSException {
501            Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
502            if (info.isNoLocal()) {
503                filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
504            }
505            return filter;
506        }
507    
508        private void doGarbageCollection() {
509            
510            if (doingGarbageCollection.commit(true, false)) {
511                
512                if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) {
513                    for (Iterator i = containers.values().iterator();i.hasNext();) {
514                        DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next();
515                        container.removeExpiredMessages();
516                        log.warn("memory limit low - forced to remove expired messages: "
517                                + container.getDestinationName());
518                    }
519                }
520                
521                //if still below the limit - clear queues with no subscribers
522                //which have been inactive for a while
523                if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) {
524                    for (Iterator i = containers.values().iterator();i.hasNext();) {
525                        DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next();
526                        if (!container.isActive() && (container.getIdleTimestamp() < (System.currentTimeMillis() - inactiveTimeout))) {
527                            removeContainer(container);
528                            log.warn("memory limit low - forced to remove inactive and idle queue: "
529                                    + container.getDestinationName());
530                        }
531                    }
532                }
533                //if still now below limit - clear inactive queues
534                if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) {
535                        for (Iterator i = containers.values().iterator();i.hasNext();) {
536                            DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next();
537                            if (!container.isActive() && !container.isEmpty()) {
538                                removeContainer(container);
539                            log.warn("memory limit low - forced to remove inactive queue: "
540                                    + container.getDestinationName());
541                            }
542                        }
543                }
544                doingGarbageCollection.set(false);
545            }
546        }
547    
548        private synchronized void addContainer(DurableQueueBoundedMessageContainer container) {
549            containers.put(container.getDestination(), container);
550            destinationMap.put(container.getDestination(), container);
551        }
552    
553        private synchronized void removeContainer(DurableQueueBoundedMessageContainer container) {
554            try {
555                container.close();
556                log.info("closed inactive Durable queue container: " + container.getDestinationName());
557            }
558            catch (JMSException e) {
559                log.warn("failure closing container", e);
560            }
561            containers.remove(container.getDestination());
562            destinationMap.remove(container.getDestination(), container);
563        }
564    
565        protected Executor getThreadPool() {
566            return threadPool;
567        }
568    
569        public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
570            createContainer(dest, false);
571        }
572        
573        public Map getMessageContainerAdmins() throws JMSException {
574            return Collections.unmodifiableMap(containers);
575        }
576    
577    
578        public void destroyMessageContainer(ActiveMQDestination dest) throws JMSException {
579            if ( !dest.isQueue() )
580                return;
581    
582            DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) containers.remove(dest);
583            if( container != null ) {
584                container.empty();
585                container.stop();
586            }
587            destinationMap.removeAll(dest);
588        }
589    
590        public void sendToDeadLetterQueue(String deadLetterName, ActiveMQMessage message) throws JMSException {
591            
592            ActiveMQQueue destination = new ActiveMQQueue(deadLetterName);
593            DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) containers.get(destination);
594            if (container == null) {
595                container = createContainer(destination, true);
596            }        
597            container.enqueue(message);
598            
599        }
600    }