001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.service.boundedvm;
020    import java.util.Collections;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.LinkedList;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Set;
027    
028    import javax.jms.JMSException;
029    
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    import org.activemq.broker.BrokerClient;
033    import org.activemq.filter.AndFilter;
034    import org.activemq.filter.DestinationMap;
035    import org.activemq.filter.Filter;
036    import org.activemq.filter.FilterFactory;
037    import org.activemq.filter.FilterFactoryImpl;
038    import org.activemq.filter.NoLocalFilter;
039    import org.activemq.io.util.MemoryBoundedQueue;
040    import org.activemq.io.util.MemoryBoundedQueueManager;
041    import org.activemq.message.ActiveMQDestination;
042    import org.activemq.message.ActiveMQMessage;
043    import org.activemq.message.ConsumerInfo;
044    import org.activemq.message.MessageAck;
045    import org.activemq.service.DeadLetterPolicy;
046    import org.activemq.service.MessageContainer;
047    import org.activemq.service.MessageContainerManager;
048    import org.activemq.service.RedeliveryPolicy;
049    import org.activemq.service.TransactionManager;
050    import org.activemq.service.TransactionTask;
051    
052    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
053    import EDU.oswego.cs.dl.util.concurrent.Executor;
054    import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
055    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
056    import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
057    
058    /**
059     * A MessageContainerManager for transient queues
060     * 
061     * @version $Revision: 1.1.1.1 $
062     */
063    /**
064     * A manager of MessageContainer instances
065     */
066    public class TransientQueueBoundedMessageManager implements MessageContainerManager, Runnable {
067        private static final int DEFAULT_GARBAGE_COLLECTION_CAPACITY_LIMIT = 10;
068        private static final long DEFAULT_INACTIVE_TIMEOUT = 30 * 1000;
069        private static final Log log = LogFactory.getLog(TransientQueueBoundedMessageManager.class);
070        private MemoryBoundedQueueManager queueManager;
071        private ConcurrentHashMap containers;
072        private ConcurrentHashMap subscriptions;
073        private FilterFactory filterFactory;
074        private SynchronizedBoolean started;
075        private SynchronizedBoolean doingGarbageCollection;
076        private Map destinations;
077        private DestinationMap destinationMap;
078        private PooledExecutor threadPool;
079        private long inactiveTimeout;
080        private int garbageCoolectionCapacityLimit;
081        private RedeliveryPolicy redeliveryPolicy;
082        private DeadLetterPolicy deadLetterPolicy;
083        protected static class TransientQueueThreadFactory implements ThreadFactory {
084            /**
085             * @param command - command to run
086             * @return a new thread
087             */
088            public Thread newThread(Runnable command) {
089                Thread result = new Thread(command);
090                result.setPriority(Thread.NORM_PRIORITY + 1);
091                result.setDaemon(true);
092                return result;
093            }
094        }
095    
096        /**
097         * Constructor for TransientQueueBoundedMessageManager
098         * 
099         * @param mgr
100         * @param redeliveryPolicy
101         * @param deadLetterPolicy
102         */
103        public TransientQueueBoundedMessageManager(MemoryBoundedQueueManager mgr, RedeliveryPolicy redeliveryPolicy,
104                DeadLetterPolicy deadLetterPolicy) {
105            this.queueManager = mgr;
106            this.redeliveryPolicy = redeliveryPolicy;
107            this.deadLetterPolicy = deadLetterPolicy;
108            this.containers = new ConcurrentHashMap();
109            this.destinationMap = new DestinationMap();
110            this.destinations = new ConcurrentHashMap();
111            this.subscriptions = new ConcurrentHashMap();
112            this.filterFactory = new FilterFactoryImpl();
113            this.started = new SynchronizedBoolean(false);
114            this.doingGarbageCollection = new SynchronizedBoolean(false);
115            this.threadPool = new PooledExecutor();
116            this.threadPool.setThreadFactory(new TransientQueueThreadFactory());
117            this.inactiveTimeout = DEFAULT_INACTIVE_TIMEOUT;
118            this.garbageCoolectionCapacityLimit = DEFAULT_GARBAGE_COLLECTION_CAPACITY_LIMIT;
119        }
120    
121        /**
122         * @return Returns the garbageCoolectionCapacityLimit.
123         */
124        public int getGarbageCoolectionCapacityLimit() {
125            return garbageCoolectionCapacityLimit;
126        }
127    
128        /**
129         * @param garbageCoolectionCapacityLimit The garbageCoolectionCapacityLimit to set.
130         */
131        public void setGarbageCoolectionCapacityLimit(int garbageCoolectionCapacityLimit) {
132            this.garbageCoolectionCapacityLimit = garbageCoolectionCapacityLimit;
133        }
134    
135        /**
136         * @return Returns the inactiveTimeout.
137         */
138        public long getInactiveTimeout() {
139            return inactiveTimeout;
140        }
141    
142        /**
143         * @param inactiveTimeout The inactiveTimeout to set.
144         */
145        public void setInactiveTimeout(long inactiveTimeout) {
146            this.inactiveTimeout = inactiveTimeout;
147        }
148    
149        /**
150         * start the manager
151         * 
152         * @throws JMSException
153         */
154        public void start() throws JMSException {
155            if (started.commit(false, true)) {
156                for (Iterator i = containers.values().iterator();i.hasNext();) {
157                    TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
158                    container.start();
159                }
160                try {
161                    threadPool.execute(this);//start garbage collection
162                }
163                catch (InterruptedException e) {
164                    JMSException jmsEx = new JMSException("Garbage collection interupted on start()");
165                    jmsEx.setLinkedException(e);
166                    throw jmsEx;
167                }
168            }
169        }
170    
171        /**
172         * stop the manager
173         * 
174         * @throws JMSException
175         */
176        public void stop() throws JMSException {
177            if (started.commit(true, false)) {
178                for (Iterator i = containers.values().iterator();i.hasNext();) {
179                    TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
180                    container.stop();
181                }
182                threadPool.interruptAll();
183                            threadPool.shutdownNow();
184            }
185        }
186    
187        /**
188         * collect expired messages
189         */
190        public void run() {
191            while (started.get()) {
192                doGarbageCollection();
193                try {
194                    Thread.sleep(2000);
195                }
196                catch (InterruptedException e) {
197                }
198            }
199        }
200    
201        /**
202         * Add a consumer if appropiate
203         * 
204         * @param client
205         * @param info
206         * @throws JMSException
207         */
208        public synchronized void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
209            ActiveMQDestination destination = info.getDestination();
210            if (destination.isQueue()) {
211                TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) containers
212                        .get(destination);
213                if (container == null) {
214                    MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString());
215                    container = new TransientQueueBoundedMessageContainer(threadPool, queueManager, destination,
216                            redeliveryPolicy, deadLetterPolicy);
217                    addContainer(container);
218                    if (started.get()) {
219                        container.start();
220                    }
221                }
222                if (log.isDebugEnabled()) {
223                    log.debug("Adding consumer: " + info);
224                }
225                
226                TransientQueueSubscription ts = container.addConsumer(createFilter(info), info, client);
227                if (ts != null) {
228                    subscriptions.put(info.getConsumerId(), ts);
229                }
230                String name = destination.getPhysicalName();
231                if (!destinations.containsKey(name)) {
232                    destinations.put(name, destination);
233                }
234            }
235        }
236    
237        /**
238         * @param client
239         * @param info
240         * @throws JMSException
241         */
242        public synchronized void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
243            ActiveMQDestination destination = info.getDestination();
244            if (destination.isQueue()) {
245                for (Iterator i = containers.values().iterator();i.hasNext();) {
246                    TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
247                    if (container != null) {
248                        container.removeConsumer(info);
249                    }
250                }
251                subscriptions.remove(info.getConsumerId());
252            }
253        }
254    
255        /**
256         * Delete a durable subscriber
257         * 
258         * @param clientId
259         * @param subscriberName
260         * @throws JMSException if the subscriber doesn't exist or is still active
261         */
262        public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
263        }
264    
265        /**
266         * @param client
267         * @param message
268         * @throws JMSException
269         */
270        public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException {
271            ActiveMQDestination destination = message.getJMSActiveMQDestination();
272            if ( !destination.isQueue() || !message.isTemporary() )
273                return;
274    
275            // If there is no transaction.. then this executes directly.
276            TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
277                public void execute() throws Throwable {
278                    doSendMessage(client, message);
279                }
280            });
281        }
282    
283        /**
284         * @param client
285         * @param message
286         * @throws JMSException
287         */
288        private void doSendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
289    
290            ActiveMQDestination destination = message.getJMSActiveMQDestination();
291            if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) {
292                doGarbageCollection();
293            }
294            TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) containers
295                    .get(destination);
296            if (container == null) {
297                MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString());
298                container = new TransientQueueBoundedMessageContainer(threadPool, queueManager, destination,
299                        redeliveryPolicy, deadLetterPolicy);
300                addContainer(container);
301                if (started.get()) {
302                    container.start();
303                }
304            }
305            Set set = destinationMap.get(message.getJMSActiveMQDestination());
306            for (Iterator i = set.iterator();i.hasNext();) {
307                container = (TransientQueueBoundedMessageContainer) i.next();
308                container.enqueue(message);
309            }
310        }
311    
312        /**
313         * @param client
314         * @param ack
315         * @throws JMSException
316         */
317        public void acknowledgeMessage(final BrokerClient client, final MessageAck ack) throws JMSException {
318            if (!ack.getDestination().isQueue() || !ack.isTemporary())
319                return;
320    
321            final TransientQueueSubscription ts = (TransientQueueSubscription) subscriptions.get(ack.getConsumerId());
322            if (ts == null)
323                return;
324    
325            final ActiveMQMessage message = ts.acknowledgeMessage(ack.getMessageID());
326            if (message == null )
327                return;
328            
329            if( ts.isBrowser() ) {
330                ts.addAckedMessage(message);
331                return;
332            }
333    
334            if (!ack.isMessageRead() || ack.isExpired()) {
335                redeliverMessage(ts, ack, message);
336            } else if (TransactionManager.isCurrentTransaction()) {
337                // Hook in a callbacks on first acked message
338                if (!ts.hasAckedMessage()) {
339                    TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask() {
340                        public void execute() throws Throwable {
341                            
342                            List ackList = ts.listAckedMessages();
343                            HashMap redeliverMap = new HashMap();
344                            for (Iterator iter = ackList.iterator(); iter.hasNext();) {
345                                
346                                ActiveMQMessage message = (ActiveMQMessage) iter.next();
347                                message.setJMSRedelivered(true);
348                                if (message.incrementRedeliveryCount() > redeliveryPolicy.getMaximumRetryCount()) {
349                                    if (log.isDebugEnabled()){
350                                        log.debug("Message: " + message + " has exceeded its retry count");
351                                    }
352                                    deadLetterPolicy.sendToDeadLetter(message);
353                                }
354                                else if (ack.isExpired()) {
355                                    if (log.isDebugEnabled()){
356                                        log.debug("Message: " + message + " has expired");
357                                    }
358                                    deadLetterPolicy.sendToDeadLetter(message);
359                                }
360                                else {
361                                    Set containers = destinationMap.get(message.getJMSActiveMQDestination());
362                                    for (Iterator i = containers.iterator();i.hasNext();) {
363                                        TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
364                                        LinkedList l = (LinkedList) redeliverMap.get(container);
365                                        if( l==null ) {
366                                            l = new LinkedList();
367                                            redeliverMap.put(container, l);
368                                        }
369                                        l.add(message);
370                                    }
371                                }
372                            }
373                            
374                            for (Iterator i = redeliverMap.keySet().iterator();i.hasNext();) {
375                                TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
376                                List l = (List) redeliverMap.get(container);
377                                container.redeliver(l);
378                            }
379                            
380                            ts.removeAllAckedMessages();
381                        }
382                    });
383                    TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() {
384                        public void execute() throws Throwable {
385                            ts.removeAllAckedMessages();
386                        }
387                    });
388                }
389                // We need to keep track of messages that were acked. If we
390                // rollback.
391                ts.addAckedMessage(message);
392            }
393        }
394    
395        /**
396         * @param client
397         * @param ack
398         * @param message 
399         * @throws JMSException
400         */
401        private void redeliverMessage(TransientQueueSubscription ts, MessageAck ack, ActiveMQMessage message) throws JMSException {
402            message.setJMSRedelivered(true);
403            if (message.incrementRedeliveryCount() > redeliveryPolicy.getMaximumRetryCount()) {
404                if (log.isDebugEnabled()){
405                    log.debug("Message: " + message + " has exceeded its retry count");
406                }
407                deadLetterPolicy.sendToDeadLetter(message);
408            }
409            else if (ack.isExpired()) {
410                if (log.isDebugEnabled()){
411                    log.debug("Message: " + message + " has expired");
412                }
413                deadLetterPolicy.sendToDeadLetter(message);
414            }
415            else {
416                Set set = destinationMap.get(message.getJMSActiveMQDestination());
417                for (Iterator i = set.iterator();i.hasNext();) {
418                    TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
419                    container.redeliver(message);
420                    break;
421                }
422            }
423        }
424        
425        /**
426         * @throws JMSException
427         */
428        public void poll() throws JMSException {
429        }
430    
431        /**
432         * @param physicalName
433         * @return MessageContainer
434         * @throws JMSException
435         */
436        public MessageContainer getContainer(String physicalName) throws JMSException {
437            Object key = destinations.get(physicalName);
438            if (key != null) {
439                return (MessageContainer) containers.get(key);
440            }
441            return null;
442        }
443    
444        /**
445         * @return a map of destinations
446         */
447        public Map getDestinations() {
448            return Collections.unmodifiableMap(destinations);
449        }
450    
451        /**
452         * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination}
453         * objects used by non-broker consumers directly connected to this container
454         *
455         * @return
456         */
457        public Map getLocalDestinations() {
458            Map localDestinations = new HashMap();
459            for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
460                TransientSubscription sub = (TransientSubscription) iter.next();
461                if (sub.isLocalSubscription()) {
462                    final ActiveMQDestination dest = sub.getDestination();
463                    localDestinations.put(dest.getPhysicalName(), dest);
464                }
465            }
466            return Collections.unmodifiableMap(localDestinations);
467        }
468    
469        /**
470         * @return the DeadLetterPolicy for this Container Manager
471         */
472        public DeadLetterPolicy getDeadLetterPolicy() {
473            return deadLetterPolicy;
474        }
475    
476        /**
477         * Set the DeadLetterPolicy for this Container Manager
478         * 
479         * @param policy
480         */
481        public void setDeadLetterPolicy(DeadLetterPolicy policy) {
482            this.deadLetterPolicy = policy;
483        }
484    
485        /**
486         * Create filter for a Consumer
487         * 
488         * @param info
489         * @return the Fitler
490         * @throws javax.jms.JMSException
491         */
492        protected Filter createFilter(ConsumerInfo info) throws JMSException {
493            Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
494            if (info.isNoLocal()) {
495                filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
496            }
497            return filter;
498        }
499    
500        private void doGarbageCollection() {
501            if (doingGarbageCollection.commit(false, true)) {
502                if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) {
503                    for (Iterator i = containers.values().iterator();i.hasNext();) {
504                        TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
505                        container.removeExpiredMessages();
506                        log.warn("memory limit low - forced to remove expired messages: "
507                                + container.getDestinationName());
508                    }
509                }
510                //if still below the limit - clear queues with no subscribers
511                //which have been inactive for a while
512                if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) {
513                    for (Iterator i = containers.values().iterator();i.hasNext();) {
514                        TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
515                        if (!container.isActive()
516                                && (container.getIdleTimestamp() < (System.currentTimeMillis() - inactiveTimeout))) {
517                            removeContainer(container);
518                            log.warn("memory limit low - forced to remove inactive and idle queue: "
519                                    + container.getDestinationName());
520                        }
521                    }
522                }
523                //if still now below limit - clear temporary queues with no subscribers
524                if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) {
525                    for (Iterator i = containers.values().iterator();i.hasNext();) {
526                        TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
527                        if (!container.isActive() && container.getDestination().isTemporary()) {
528                            removeContainer(container);
529                            log.warn("memory limit low - forced to remove inactive temporary queue: "
530                                    + container.getDestinationName());
531                        }
532                    }
533                }
534                //if still now below limit - clear inactive queues
535                if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) {
536                        for (Iterator i = containers.values().iterator();i.hasNext();) {
537                            TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
538                            if (!container.isActive() && !container.isEmpty()) {
539                                removeContainer(container);
540                            log.warn("memory limit low - forced to remove inactive queue: "
541                                    + container.getDestinationName());
542                            }
543                        }
544                }
545                doingGarbageCollection.set(false);
546            }
547        }
548    
549        private synchronized void addContainer(TransientQueueBoundedMessageContainer container) {
550            containers.put(container.getDestination(), container);
551            destinationMap.put(container.getDestination(), container);
552        }
553    
554        private synchronized void removeContainer(TransientQueueBoundedMessageContainer container) {
555            try {
556                container.close();
557                log.info("closed inactive transient queue container: " + container.getDestinationName());
558            }
559            catch (JMSException e) {
560                log.warn("failure closing container", e);
561            }
562            containers.remove(container.getDestination());
563            destinationMap.remove(container.getDestination(), container);
564        }
565    
566        protected Executor getThreadPool() {
567            return threadPool;
568        }
569    
570        public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
571        }
572    
573        public void destroyMessageContainer(ActiveMQDestination dest) throws JMSException {
574            if ( !dest.isQueue() )
575                return;
576    
577            TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) containers.remove(dest);
578            if( container != null ) {
579                container.stop();
580            }
581            destinationMap.removeAll(dest);
582        }
583    
584        public Map getMessageContainerAdmins() throws JMSException {
585            return Collections.EMPTY_MAP;
586        }
587    }