001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import org.apache.activemq.broker.BrokerService;
020    import org.apache.activemq.broker.ConnectionContext;
021    import org.apache.activemq.broker.ProducerBrokerExchange;
022    import org.apache.activemq.broker.region.policy.DispatchPolicy;
023    import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
024    import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
025    import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
026    import org.apache.activemq.command.ActiveMQDestination;
027    import org.apache.activemq.command.ExceptionResponse;
028    import org.apache.activemq.command.Message;
029    import org.apache.activemq.command.MessageAck;
030    import org.apache.activemq.command.MessageId;
031    import org.apache.activemq.command.ProducerAck;
032    import org.apache.activemq.command.ProducerInfo;
033    import org.apache.activemq.command.Response;
034    import org.apache.activemq.command.SubscriptionInfo;
035    import org.apache.activemq.filter.MessageEvaluationContext;
036    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
037    import org.apache.activemq.store.MessageRecoveryListener;
038    import org.apache.activemq.store.TopicMessageStore;
039    import org.apache.activemq.thread.Task;
040    import org.apache.activemq.thread.TaskRunner;
041    import org.apache.activemq.thread.TaskRunnerFactory;
042    import org.apache.activemq.thread.Valve;
043    import org.apache.activemq.transaction.Synchronization;
044    import org.apache.activemq.usage.Usage;
045    import org.apache.activemq.util.SubscriptionKey;
046    import org.apache.commons.logging.Log;
047    import org.apache.commons.logging.LogFactory;
048    import java.io.IOException;
049    import java.util.ArrayList;
050    import java.util.LinkedList;
051    import java.util.List;
052    import java.util.Set;
053    import java.util.concurrent.ConcurrentHashMap;
054    import java.util.concurrent.CopyOnWriteArrayList;
055    import java.util.concurrent.CopyOnWriteArraySet;
056    
057    /**
058     * The Topic is a destination that sends a copy of a message to every active
059     * Subscription registered.
060     * 
061     * @version $Revision: 1.21 $
062     */
063    public class Topic extends BaseDestination implements Task {
064        protected static final Log LOG = LogFactory.getLog(Topic.class);
065        private final TopicMessageStore topicStore;
066        protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
067        protected final Valve dispatchValve = new Valve(true);
068        private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
069        private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
070        private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
071        private final TaskRunner taskRunner;
072        private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
073        private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
074            public void run() {
075                try {
076                    Topic.this.taskRunner.wakeup();
077                } catch (InterruptedException e) {
078                }
079            };
080        };
081    
082        public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
083            super(brokerService, store, destination, parentStats);
084            this.topicStore = store;
085            //set default subscription recovery policy
086            subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
087            this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
088        }
089    
090        public void initialize() throws Exception {
091            super.initialize();
092            if (store != null) {
093                // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics.
094                // int messageCount = store.getMessageCount();
095                // destinationStatistics.getMessages().setCount(messageCount);
096            }
097        }
098    
099        public List<Subscription> getConsumers() {
100            synchronized (consumers) {
101                return new ArrayList<Subscription>(consumers);
102            }
103        }
104    
105        public boolean lock(MessageReference node, LockOwner sub) {
106            return true;
107        }
108    
109        public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
110    
111            destinationStatistics.getConsumers().increment();
112    
113            if (!sub.getConsumerInfo().isDurable()) {
114    
115                // Do a retroactive recovery if needed.
116                if (sub.getConsumerInfo().isRetroactive()) {
117    
118                    // synchronize with dispatch method so that no new messages are
119                    // sent
120                    // while we are recovering a subscription to avoid out of order
121                    // messages.
122                    dispatchValve.turnOff();
123                    try {
124    
125                        synchronized (consumers) {
126                            sub.add(context, this);
127                            consumers.add(sub);
128                        }
129                        subscriptionRecoveryPolicy.recover(context, this, sub);
130    
131                    } finally {
132                        dispatchValve.turnOn();
133                    }
134    
135                } else {
136                    synchronized (consumers) {
137                        sub.add(context, this);
138                        consumers.add(sub);
139                    }
140                }
141            } else {
142                sub.add(context, this);
143                DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
144                durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
145            }
146        }
147    
148        public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
149            if (!sub.getConsumerInfo().isDurable()) {
150                destinationStatistics.getConsumers().decrement();
151                synchronized (consumers) {
152                    consumers.remove(sub);
153                }
154            }
155            sub.remove(context, this);
156        }
157    
158        public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
159            if (topicStore != null) {
160                topicStore.deleteSubscription(key.clientId, key.subscriptionName);
161                Object removed = durableSubcribers.remove(key);
162                if (removed != null) {
163                    destinationStatistics.getConsumers().decrement();
164                }
165            }
166        }
167    
168        public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
169            // synchronize with dispatch method so that no new messages are sent
170            // while
171            // we are recovering a subscription to avoid out of order messages.
172            dispatchValve.turnOff();
173            try {
174    
175                if (topicStore == null) {
176                    return;
177                }
178    
179                // Recover the durable subscription.
180                String clientId = subscription.getSubscriptionKey().getClientId();
181                String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
182                String selector = subscription.getConsumerInfo().getSelector();
183                SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
184                if (info != null) {
185                    // Check to see if selector changed.
186                    String s1 = info.getSelector();
187                    if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
188                        // Need to delete the subscription
189                        topicStore.deleteSubscription(clientId, subscriptionName);
190                        info = null;
191                    } else {
192                        synchronized (consumers) {
193                            consumers.add(subscription);
194                        }
195                    }
196                }
197                // Do we need to create the subscription?
198                if (info == null) {
199                    info = new SubscriptionInfo();
200                    info.setClientId(clientId);
201                    info.setSelector(selector);
202                    info.setSubscriptionName(subscriptionName);
203                    info.setDestination(getActiveMQDestination());
204                    // This destination is an actual destination id.
205                    info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
206                    // This destination might be a pattern
207                    synchronized (consumers) {
208                        consumers.add(subscription);
209                        topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
210                    }
211                }
212    
213                final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
214                msgContext.setDestination(destination);
215                if (subscription.isRecoveryRequired()) {
216                    topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
217                        public boolean recoverMessage(Message message) throws Exception {
218                            message.setRegionDestination(Topic.this);
219                            try {
220                                msgContext.setMessageReference(message);
221                                if (subscription.matches(message, msgContext)) {
222                                    subscription.add(message);
223                                }
224                            } catch (IOException e) {
225                                LOG.error("Failed to recover this message " + message);
226                            }
227                            return true;
228                        }
229    
230                        public boolean recoverMessageReference(MessageId messageReference) throws Exception {
231                            throw new RuntimeException("Should not be called.");
232                        }
233    
234                        public boolean hasSpace() {
235                            return true;
236                        }
237    
238                        public boolean isDuplicate(MessageId id) {
239                            return false;
240                        }
241                    });
242                }
243            } finally {
244                dispatchValve.turnOn();
245            }
246        }
247    
248        public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
249            synchronized (consumers) {
250                consumers.remove(sub);
251            }
252            sub.remove(context, this);
253        }
254    
255        protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
256            if (subscription.getConsumerInfo().isRetroactive()) {
257                subscriptionRecoveryPolicy.recover(context, this, subscription);
258            }
259        }
260    
261        public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
262            final ConnectionContext context = producerExchange.getConnectionContext();
263    
264            final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
265            final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
266    
267            // There is delay between the client sending it and it arriving at the
268            // destination.. it may have expired.
269            if (message.isExpired()) {
270                broker.messageExpired(context, message);
271                getDestinationStatistics().getExpired().increment();
272                if (sendProducerAck) {
273                    ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
274                    context.getConnection().dispatchAsync(ack);
275                }
276                return;
277            }
278    
279            if (memoryUsage.isFull()) {
280                isFull(context, memoryUsage);
281                fastProducer(context, producerInfo);
282    
283                if (isProducerFlowControl() && context.isProducerFlowControl()) {
284    
285                    if (warnOnProducerFlowControl) {
286                        warnOnProducerFlowControl = false;
287                        LOG.info("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached for " + getActiveMQDestination().getQualifiedName()
288                                + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
289                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
290                    }
291    
292                    if (systemUsage.isSendFailIfNoSpace()) {
293                        throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
294                                + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
295                    }
296    
297                    // We can avoid blocking due to low usage if the producer is sending
298                    // a sync message or
299                    // if it is using a producer window
300                    if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
301                        synchronized (messagesWaitingForSpace) {
302                            messagesWaitingForSpace.add(new Runnable() {
303                                public void run() {
304    
305                                    try {
306    
307                                        // While waiting for space to free up... the
308                                        // message may have expired.
309                                        if (message.isExpired()) {
310                                            broker.messageExpired(context, message);
311                                            getDestinationStatistics().getExpired().increment();
312                                        } else {
313                                            doMessageSend(producerExchange, message);
314                                        }
315    
316                                        if (sendProducerAck) {
317                                            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
318                                            context.getConnection().dispatchAsync(ack);
319                                        } else {
320                                            Response response = new Response();
321                                            response.setCorrelationId(message.getCommandId());
322                                            context.getConnection().dispatchAsync(response);
323                                        }
324    
325                                    } catch (Exception e) {
326                                        if (!sendProducerAck && !context.isInRecoveryMode()) {
327                                            ExceptionResponse response = new ExceptionResponse(e);
328                                            response.setCorrelationId(message.getCommandId());
329                                            context.getConnection().dispatchAsync(response);
330                                        }
331                                    }
332    
333                                }
334                            });
335    
336                            // If the user manager is not full, then the task will not
337                            // get called..
338                            if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
339                                // so call it directly here.
340                                sendMessagesWaitingForSpaceTask.run();
341                            }
342                            context.setDontSendReponse(true);
343                            return;
344                        }
345    
346                    } else {
347                        // Producer flow control cannot be used, so we have do the flow
348                        // control at the broker
349                        // by blocking this thread until there is space available.
350                        
351                        if (memoryUsage.isFull()) {
352                            if (context.isInTransaction()) {
353    
354                                int count = 0;
355                                while (!memoryUsage.waitForSpace(1000)) {
356                                    if (context.getStopping().get()) {
357                                        throw new IOException("Connection closed, send aborted.");
358                                    }
359                                    if (count > 2 && context.isInTransaction()) {
360                                        count = 0;
361                                        int size = context.getTransaction().size();
362                                        LOG.warn("Waiting for space to send  transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
363                                    }
364                                }
365                            } else {
366                                waitForSpace(context, memoryUsage, "Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
367                                        + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
368                            }
369                        }
370    
371                        // The usage manager could have delayed us by the time
372                        // we unblock the message could have expired..
373                        if (message.isExpired()) {
374                            getDestinationStatistics().getExpired().increment();
375                            if (LOG.isDebugEnabled()) {
376                                LOG.debug("Expired message: " + message);
377                            }
378                            return;
379                        }
380                    }
381                }
382            }
383    
384            doMessageSend(producerExchange, message);
385            messageDelivered(context, message);
386            if (sendProducerAck) {
387                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
388                context.getConnection().dispatchAsync(ack);
389            }
390        }
391    
392        /**
393         * do send the message - this needs to be synchronized to ensure messages
394         * are stored AND dispatched in the right order
395         * 
396         * @param producerExchange
397         * @param message
398         * @throws IOException
399         * @throws Exception
400         */
401        synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
402            final ConnectionContext context = producerExchange.getConnectionContext();
403            message.setRegionDestination(this);
404            message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
405    
406            if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
407                if (systemUsage.getStoreUsage().isFull()) {
408                    final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
409                            + " See http://activemq.apache.org/producer-flow-control.html for more info";
410                    if (systemUsage.isSendFailIfNoSpace()) {
411                        throw new javax.jms.ResourceAllocationException(logMessage);
412                    }
413    
414                    waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
415                }
416                topicStore.addMessage(context, message);
417            }
418    
419            message.incrementReferenceCount();
420    
421            if (context.isInTransaction()) {
422                context.getTransaction().addSynchronization(new Synchronization() {
423                    public void afterCommit() throws Exception {
424                        // It could take while before we receive the commit
425                        // operration.. by that time the message could have
426                        // expired..
427                        if (broker.isExpired(message)) {
428                            getDestinationStatistics().getExpired().increment();
429                            broker.messageExpired(context, message);
430                            message.decrementReferenceCount();
431                            return;
432                        }
433                        try {
434                            dispatch(context, message);
435                        } finally {
436                            message.decrementReferenceCount();
437                        }
438                    }
439                });
440    
441            } else {
442                try {
443                    dispatch(context, message);
444                } finally {
445                    message.decrementReferenceCount();
446                }
447            }
448    
449        }
450    
451        private boolean canOptimizeOutPersistence() {
452            return durableSubcribers.size() == 0;
453        }
454    
455        public String toString() {
456            return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
457        }
458    
459        public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException {
460            if (topicStore != null && node.isPersistent()) {
461                DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
462                SubscriptionKey key = dsub.getSubscriptionKey();
463                topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId());
464            }
465            messageConsumed(context, node);
466        }
467    
468        public void gc() {
469        }
470    
471        public Message loadMessage(MessageId messageId) throws IOException {
472            return topicStore != null ? topicStore.getMessage(messageId) : null;
473        }
474    
475        public void start() throws Exception {
476            this.subscriptionRecoveryPolicy.start();
477            if (memoryUsage != null) {
478                memoryUsage.start();
479            }
480    
481        }
482    
483        public void stop() throws Exception {
484            if (taskRunner != null) {
485                taskRunner.shutdown();
486            }
487            this.subscriptionRecoveryPolicy.stop();
488            if (memoryUsage != null) {
489                memoryUsage.stop();
490            }
491            if (this.topicStore != null) {
492                this.topicStore.stop();
493            }
494        }
495    
496        public Message[] browse() {
497            final Set<Message> result = new CopyOnWriteArraySet<Message>();
498            try {
499                if (topicStore != null) {
500                    topicStore.recover(new MessageRecoveryListener() {
501                        public boolean recoverMessage(Message message) throws Exception {
502                            result.add(message);
503                            return true;
504                        }
505    
506                        public boolean recoverMessageReference(MessageId messageReference) throws Exception {
507                            return true;
508                        }
509    
510                        public boolean hasSpace() {
511                            return true;
512                        }
513    
514                        public boolean isDuplicate(MessageId id) {
515                            return false;
516                        }
517                    });
518                    Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
519                    if (msgs != null) {
520                        for (int i = 0; i < msgs.length; i++) {
521                            result.add(msgs[i]);
522                        }
523                    }
524                }
525            } catch (Throwable e) {
526                LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
527            }
528            return result.toArray(new Message[result.size()]);
529        }
530    
531        public boolean iterate() {
532            synchronized (messagesWaitingForSpace) {
533                while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
534                    Runnable op = messagesWaitingForSpace.removeFirst();
535                    op.run();
536                }
537            }
538            return false;
539        }
540    
541        // Properties
542        // -------------------------------------------------------------------------
543    
544        public DispatchPolicy getDispatchPolicy() {
545            return dispatchPolicy;
546        }
547    
548        public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
549            this.dispatchPolicy = dispatchPolicy;
550        }
551    
552        public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
553            return subscriptionRecoveryPolicy;
554        }
555    
556        public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
557            this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
558        }
559    
560        // Implementation methods
561        // -------------------------------------------------------------------------
562    
563        public final void wakeup() {
564        }
565    
566        protected void dispatch(final ConnectionContext context, Message message) throws Exception {
567            // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics.
568            // destinationStatistics.getMessages().increment();
569            destinationStatistics.getEnqueues().increment();
570            dispatchValve.increment();
571            MessageEvaluationContext msgContext = null;
572            try {
573                if (!subscriptionRecoveryPolicy.add(context, message)) {
574                    return;
575                }
576                synchronized (consumers) {
577                    if (consumers.isEmpty()) {
578                        onMessageWithNoConsumers(context, message);
579                        return;
580                    }
581                }
582                msgContext = context.getMessageEvaluationContext();
583                msgContext.setDestination(destination);
584                msgContext.setMessageReference(message);
585                if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
586                    onMessageWithNoConsumers(context, message);
587                }
588    
589            } finally {
590                dispatchValve.decrement();
591                if (msgContext != null) {
592                    msgContext.clear();
593                }
594            }
595        }
596    
597        public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
598            broker.messageExpired(context, reference);
599            // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics.
600            // destinationStatistics.getMessages().decrement();
601            destinationStatistics.getEnqueues().decrement();
602            destinationStatistics.getExpired().increment();
603            MessageAck ack = new MessageAck();
604            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
605            ack.setDestination(destination);
606            ack.setMessageID(reference.getMessageId());
607            try {
608                acknowledge(context, subs, ack, reference);
609            } catch (IOException e) {
610                LOG.error("Failed to remove expired Message from the store ", e);
611            }
612        }
613    
614        private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException {
615            long start = System.currentTimeMillis();
616            long nextWarn = start + blockedProducerWarningInterval;
617            while (!usage.waitForSpace(1000)) {
618                if (context.getStopping().get()) {
619                    throw new IOException("Connection closed, send aborted.");
620                }
621    
622                long now = System.currentTimeMillis();
623                if (now >= nextWarn) {
624                    LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
625                    nextWarn = now + blockedProducerWarningInterval;
626                }
627            }
628        }
629    
630    }