001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.LinkedList;
021    import java.util.concurrent.atomic.AtomicLong;
022    
023    import javax.jms.JMSException;
024    
025    import org.apache.activemq.broker.Broker;
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
028    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
029    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
030    import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
031    import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
032    import org.apache.activemq.command.ConsumerControl;
033    import org.apache.activemq.command.ConsumerInfo;
034    import org.apache.activemq.command.Message;
035    import org.apache.activemq.command.MessageAck;
036    import org.apache.activemq.command.MessageDispatch;
037    import org.apache.activemq.command.MessageDispatchNotification;
038    import org.apache.activemq.command.MessagePull;
039    import org.apache.activemq.command.Response;
040    import org.apache.activemq.transaction.Synchronization;
041    import org.apache.activemq.usage.SystemUsage;
042    import org.apache.commons.logging.Log;
043    import org.apache.commons.logging.LogFactory;
044    
045    public class TopicSubscription extends AbstractSubscription {
046    
047        private static final Log LOG = LogFactory.getLog(TopicSubscription.class);
048        private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
049        
050        protected PendingMessageCursor matched;
051        protected final SystemUsage usageManager;
052        protected AtomicLong dispatchedCounter = new AtomicLong();
053           
054        boolean singleDestination = true;
055        Destination destination;
056    
057        private int maximumPendingMessages = -1;
058        private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
059        private int discarded;
060        private final Object matchedListMutex = new Object();
061        private final AtomicLong enqueueCounter = new AtomicLong(0);
062        private final AtomicLong dequeueCounter = new AtomicLong(0);
063        private int memoryUsageHighWaterMark = 95;
064        private boolean slowConsumer;
065    
066        public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
067            super(broker, context, info);
068            this.usageManager = usageManager;
069            String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
070            if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
071                this.matched = new VMPendingMessageCursor();
072            } else {
073                this.matched = new FilePendingMessageCursor(broker,matchedName);
074            }
075        }
076    
077        public void init() throws Exception {
078            this.matched.setSystemUsage(usageManager);
079            this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
080            this.matched.start();
081        }
082    
083        public void add(MessageReference node) throws Exception {
084            enqueueCounter.incrementAndGet();
085            if (!isFull() && matched.isEmpty()  && !isSlave()) {
086                // if maximumPendingMessages is set we will only discard messages which
087                // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
088                dispatch(node);
089                slowConsumer=false;
090            } else {
091                //we are slow
092                if(!slowConsumer) {
093                    slowConsumer=true;
094                    for (Destination dest: destinations) {
095                        dest.slowConsumer(getContext(), this);
096                    }
097                }
098                if (maximumPendingMessages != 0) {
099                    synchronized(matchedListMutex){
100                            while (matched.isFull()){
101                            if (getContext().getStopping().get()) {
102                                LOG.warn("stopped waiting for space in pendingMessage cursor for: " + node.getMessageId());
103                                enqueueCounter.decrementAndGet();
104                                return;
105                            }
106                            matchedListMutex.wait(20);
107                            }
108                            matched.addMessageLast(node);
109                    }
110                    synchronized (matchedListMutex) {
111                        
112                        // NOTE - be careful about the slaveBroker!
113                        if (maximumPendingMessages > 0) {
114                            // calculate the high water mark from which point we
115                            // will eagerly evict expired messages
116                            int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
117                            if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
118                                max = maximumPendingMessages;
119                            }
120                            if (!matched.isEmpty() && matched.size() > max) {
121                                removeExpiredMessages();
122                            }
123                            // lets discard old messages as we are a slow consumer
124                            while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
125                                int pageInSize = matched.size() - maximumPendingMessages;
126                                // only page in a 1000 at a time - else we could
127                                // blow da memory
128                                pageInSize = Math.max(1000, pageInSize);
129                                LinkedList<MessageReference> list = null;
130                                MessageReference[] oldMessages=null;
131                                synchronized(matched){
132                                    list = matched.pageInList(pageInSize);
133                                    oldMessages = messageEvictionStrategy.evictMessages(list);
134                                    for (MessageReference ref : list) {
135                                        ref.decrementReferenceCount();
136                                    }
137                                }
138                                int messagesToEvict = 0;
139                                if (oldMessages != null){
140                                        messagesToEvict = oldMessages.length;
141                                        for (int i = 0; i < messagesToEvict; i++) {
142                                            MessageReference oldMessage = oldMessages[i];
143                                            discard(oldMessage);
144                                        }
145                                }
146                                // lets avoid an infinite loop if we are given a bad
147                                // eviction strategy
148                                // for a bad strategy lets just not evict
149                                if (messagesToEvict == 0) {
150                                    LOG.warn("No messages to evict returned from eviction strategy: " + messageEvictionStrategy);
151                                    break;
152                                }
153                            }
154                        }
155                    }
156                    dispatchMatched();
157                }
158            }
159        }
160    
161        /**
162         * Discard any expired messages from the matched list. Called from a
163         * synchronized block.
164         * 
165         * @throws IOException
166         */
167        protected void removeExpiredMessages() throws IOException {
168            try {
169                matched.reset();
170                while (matched.hasNext()) {
171                    MessageReference node = matched.next();
172                    node.decrementReferenceCount();
173                    if (broker.isExpired(node)) {
174                        matched.remove();
175                        dispatchedCounter.incrementAndGet();
176                        node.decrementReferenceCount();
177                        node.getRegionDestination().getDestinationStatistics().getExpired().increment();
178                        broker.messageExpired(getContext(), node);
179                        break;
180                    }
181                }
182            } finally {
183                matched.release();
184            }
185        }
186    
187        public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
188            synchronized (matchedListMutex) {
189                try {
190                    matched.reset();
191                    while (matched.hasNext()) {
192                        MessageReference node = matched.next();
193                        node.decrementReferenceCount();
194                        if (node.getMessageId().equals(mdn.getMessageId())) {
195                            matched.remove();
196                            dispatchedCounter.incrementAndGet();
197                            node.decrementReferenceCount();
198                            break;
199                        }
200                    }
201                } finally {
202                    matched.release();
203                }
204            }
205        }
206    
207        public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
208            // Handle the standard acknowledgment case.
209            if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
210                if (context.isInTransaction()) {
211                    context.getTransaction().addSynchronization(new Synchronization() {
212    
213                        public void afterCommit() throws Exception {
214                           synchronized (TopicSubscription.this) {
215                                if (singleDestination && destination != null) {
216                                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
217                                }
218                            }
219                            dequeueCounter.addAndGet(ack.getMessageCount());
220                            dispatchMatched();
221                        }
222                    });
223                } else {
224                    if (singleDestination && destination != null) {
225                        destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
226                        destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
227                    }
228                    dequeueCounter.addAndGet(ack.getMessageCount());
229                }
230                dispatchMatched();
231                return;
232            } else if (ack.isDeliveredAck()) {
233                // Message was delivered but not acknowledged: update pre-fetch
234                // counters.
235                // also. get these for a consumer expired message.
236                if (destination != null && !ack.isInTransaction()) {
237                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
238                    destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());   
239                }
240                dequeueCounter.addAndGet(ack.getMessageCount());
241                dispatchMatched();
242                return;
243            }
244            throw new JMSException("Invalid acknowledgment: " + ack);
245        }
246    
247        public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
248            // not supported for topics
249            return null;
250        }
251    
252        public int getPendingQueueSize() {
253            return matched();
254        }
255    
256        public int getDispatchedQueueSize() {
257            return (int)(dispatchedCounter.get() - dequeueCounter.get());
258        }
259    
260        public int getMaximumPendingMessages() {
261            return maximumPendingMessages;
262        }
263    
264        public long getDispatchedCounter() {
265            return dispatchedCounter.get();
266        }
267    
268        public long getEnqueueCounter() {
269            return enqueueCounter.get();
270        }
271    
272        public long getDequeueCounter() {
273            return dequeueCounter.get();
274        }
275    
276        /**
277         * @return the number of messages discarded due to being a slow consumer
278         */
279        public int discarded() {
280            synchronized (matchedListMutex) {
281                return discarded;
282            }
283        }
284    
285        /**
286         * @return the number of matched messages (messages targeted for the
287         *         subscription but not yet able to be dispatched due to the
288         *         prefetch buffer being full).
289         */
290        public int matched() {
291            synchronized (matchedListMutex) {
292                return matched.size();
293            }
294        }
295    
296        /**
297         * Sets the maximum number of pending messages that can be matched against
298         * this consumer before old messages are discarded.
299         */
300        public void setMaximumPendingMessages(int maximumPendingMessages) {
301            this.maximumPendingMessages = maximumPendingMessages;
302        }
303    
304        public MessageEvictionStrategy getMessageEvictionStrategy() {
305            return messageEvictionStrategy;
306        }
307    
308        /**
309         * Sets the eviction strategy used to decide which message to evict when the
310         * slow consumer needs to discard messages
311         */
312        public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
313            this.messageEvictionStrategy = messageEvictionStrategy;
314        }
315    
316        // Implementation methods
317        // -------------------------------------------------------------------------
318        public boolean isFull() {
319            return getDispatchedQueueSize()  >= info.getPrefetchSize();
320        }
321        
322        public int getInFlightSize() {
323            return getDispatchedQueueSize();
324        }
325        
326        
327        /**
328         * @return true when 60% or more room is left for dispatching messages
329         */
330        public boolean isLowWaterMark() {
331            return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
332        }
333    
334        /**
335         * @return true when 10% or less room is left for dispatching messages
336         */
337        public boolean isHighWaterMark() {
338            return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
339        }
340    
341        /**
342         * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
343         */
344        public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
345            this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
346        }
347    
348        /**
349         * @return the memoryUsageHighWaterMark
350         */
351        public int getMemoryUsageHighWaterMark() {
352            return this.memoryUsageHighWaterMark;
353        }
354    
355        /**
356         * @return the usageManager
357         */
358        public SystemUsage getUsageManager() {
359            return this.usageManager;
360        }
361    
362        /**
363         * @return the matched
364         */
365        public PendingMessageCursor getMatched() {
366            return this.matched;
367        }
368    
369        /**
370         * @param matched the matched to set
371         */
372        public void setMatched(PendingMessageCursor matched) {
373            this.matched = matched;
374        }
375    
376        /**
377         * inform the MessageConsumer on the client to change it's prefetch
378         * 
379         * @param newPrefetch
380         */
381        public void updateConsumerPrefetch(int newPrefetch) {
382            if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
383                ConsumerControl cc = new ConsumerControl();
384                cc.setConsumerId(info.getConsumerId());
385                cc.setPrefetch(newPrefetch);
386                context.getConnection().dispatchAsync(cc);
387            }
388        }
389    
390        private void dispatchMatched() throws IOException {       
391            synchronized (matchedListMutex) {
392                if (!matched.isEmpty() && !isFull()) {
393                    try {
394                        matched.reset();
395                       
396                        while (matched.hasNext() && !isFull()) {
397                            MessageReference message = (MessageReference) matched.next();
398                            message.decrementReferenceCount();
399                            matched.remove();
400                            // Message may have been sitting in the matched list a
401                            // while
402                            // waiting for the consumer to ak the message.
403                            if (message.isExpired()) {
404                                discard(message);
405                                continue; // just drop it.
406                            }
407                            dispatch(message);
408                        }
409                    } finally {
410                        matched.release();
411                    }
412                }
413            }
414        }
415    
416        private void dispatch(final MessageReference node) throws IOException {
417            Message message = (Message)node;
418            node.incrementReferenceCount();
419            // Make sure we can dispatch a message.
420            MessageDispatch md = new MessageDispatch();
421            md.setMessage(message);
422            md.setConsumerId(info.getConsumerId());
423            md.setDestination(node.getRegionDestination().getActiveMQDestination());
424            dispatchedCounter.incrementAndGet();
425            // Keep track if this subscription is receiving messages from a single
426            // destination.
427            if (singleDestination) {
428                if (destination == null) {
429                    destination = node.getRegionDestination();
430                } else {
431                    if (destination != node.getRegionDestination()) {
432                        singleDestination = false;
433                    }
434                }
435            }
436            if (info.isDispatchAsync()) {
437                md.setTransmitCallback(new Runnable() {
438    
439                    public void run() {
440                        node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
441                        node.getRegionDestination().getDestinationStatistics().getInflight().increment();
442                        node.decrementReferenceCount();
443                    }
444                });
445                context.getConnection().dispatchAsync(md);
446            } else {
447                context.getConnection().dispatchSync(md);
448                node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
449                node.getRegionDestination().getDestinationStatistics().getInflight().increment();
450                node.decrementReferenceCount();
451            }
452        }
453    
454        private void discard(MessageReference message) {
455            message.decrementReferenceCount();
456            matched.remove(message);
457            discarded++;
458            if(destination != null) {
459                destination.getDestinationStatistics().getDequeues().increment();
460            }
461            if (LOG.isDebugEnabled()) {
462                LOG.debug("Discarding message " + message);
463            }
464            Destination dest = message.getRegionDestination();
465            if (dest != null) {
466                dest.messageDiscarded(getContext(), message);
467            }
468            broker.getRoot().sendToDeadLetterQueue(getContext(), message);
469        }
470    
471        public String toString() {
472            return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
473                   + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
474        }
475    
476        public void destroy() {
477            synchronized (matchedListMutex) {
478                try {
479                    matched.destroy();
480                } catch (Exception e) {
481                    LOG.warn("Failed to destroy cursor", e);
482                }
483            }
484        }
485    
486        public int getPrefetchSize() {
487            return (int)info.getPrefetchSize();
488        }
489    
490    }