001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import org.apache.activemq.broker.Broker;
020    import org.apache.activemq.broker.BrokerService;
021    import org.apache.activemq.broker.ConnectionContext;
022    import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
023    import org.apache.activemq.broker.region.Destination;
024    import org.apache.activemq.broker.region.DestinationFactory;
025    import org.apache.activemq.broker.region.DestinationFactoryImpl;
026    import org.apache.activemq.broker.region.DestinationInterceptor;
027    import org.apache.activemq.broker.region.Queue;
028    import org.apache.activemq.broker.region.Region;
029    import org.apache.activemq.broker.region.RegionBroker;
030    import org.apache.activemq.broker.region.Subscription;
031    import org.apache.activemq.broker.region.Topic;
032    import org.apache.activemq.broker.region.TopicSubscription;
033    import org.apache.activemq.command.ActiveMQDestination;
034    import org.apache.activemq.command.ActiveMQMessage;
035    import org.apache.activemq.command.ActiveMQTopic;
036    import org.apache.activemq.command.ConsumerInfo;
037    import org.apache.activemq.command.Message;
038    import org.apache.activemq.command.MessageId;
039    import org.apache.activemq.command.SubscriptionInfo;
040    import org.apache.activemq.store.MessageRecoveryListener;
041    import org.apache.activemq.store.PersistenceAdapter;
042    import org.apache.activemq.store.TopicMessageStore;
043    import org.apache.activemq.thread.TaskRunnerFactory;
044    import org.apache.activemq.usage.SystemUsage;
045    import org.apache.activemq.util.JMXSupport;
046    import org.apache.activemq.util.ServiceStopper;
047    import org.apache.activemq.util.SubscriptionKey;
048    import org.apache.commons.logging.Log;
049    import org.apache.commons.logging.LogFactory;
050    import java.io.IOException;
051    import java.util.ArrayList;
052    import java.util.HashMap;
053    import java.util.Hashtable;
054    import java.util.Iterator;
055    import java.util.List;
056    import java.util.Map;
057    import java.util.Set;
058    import java.util.Map.Entry;
059    import java.util.concurrent.ConcurrentHashMap;
060    import java.util.concurrent.CopyOnWriteArraySet;
061    import javax.management.InstanceNotFoundException;
062    import javax.management.MalformedObjectNameException;
063    import javax.management.ObjectName;
064    import javax.management.openmbean.CompositeData;
065    import javax.management.openmbean.CompositeDataSupport;
066    import javax.management.openmbean.CompositeType;
067    import javax.management.openmbean.OpenDataException;
068    import javax.management.openmbean.TabularData;
069    import javax.management.openmbean.TabularDataSupport;
070    import javax.management.openmbean.TabularType;
071    
072    public class ManagedRegionBroker extends RegionBroker {
073        private static final Log LOG = LogFactory.getLog(ManagedRegionBroker.class);
074        private final ManagementContext managementContext;
075        private final ObjectName brokerObjectName;
076        private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
077        private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
078        private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
079        private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
080        private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
081        private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
082        private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
083        private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
084        private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
085        private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
086        private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
087        private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
088        private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
089        /* This is the first broker in the broker interceptor chain. */
090        private Broker contextBroker;
091    
092        public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
093                                   DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
094            super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor);
095            this.managementContext = context;
096            this.brokerObjectName = brokerObjectName;
097        }
098    
099        public void start() throws Exception {
100            super.start();
101            // build all existing durable subscriptions
102            buildExistingSubscriptions();
103        }
104    
105        protected void doStop(ServiceStopper stopper) {
106            super.doStop(stopper);
107            // lets remove any mbeans not yet removed
108            for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
109                ObjectName name = iter.next();
110                try {
111                    managementContext.unregisterMBean(name);
112                } catch (InstanceNotFoundException e) {
113                    LOG.warn("The MBean: " + name + " is no longer registered with JMX");
114                } catch (Exception e) {
115                    stopper.onException(this, e);
116                }
117            }
118            registeredMBeans.clear();
119        }
120    
121        protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
122            return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
123        }
124    
125        protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
126            return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
127        }
128    
129        protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
130            return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
131        }
132    
133        protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
134            return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
135        }
136    
137        public void register(ActiveMQDestination destName, Destination destination) {
138            // TODO refactor to allow views for custom destinations
139            try {
140                ObjectName objectName = createObjectName(destName);
141                DestinationView view;
142                if (destination instanceof Queue) {
143                    view = new QueueView(this, (Queue)destination);
144                } else if (destination instanceof Topic) {
145                    view = new TopicView(this, (Topic)destination);
146                } else {
147                    view = null;
148                    LOG.warn("JMX View is not supported for custom destination: " + destination);
149                }
150                if (view != null) {
151                    registerDestination(objectName, destName, view);
152                }
153            } catch (Exception e) {
154                LOG.error("Failed to register destination " + destName, e);
155            }
156        }
157    
158        public void unregister(ActiveMQDestination destName) {
159            try {
160                ObjectName objectName = createObjectName(destName);
161                unregisterDestination(objectName);
162            } catch (Exception e) {
163                LOG.error("Failed to unregister " + destName, e);
164            }
165        }
166    
167        public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
168            String connectionClientId = context.getClientId();
169            ObjectName brokerJmxObjectName = brokerObjectName;
170            String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName);
171    
172            SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
173            try {
174                ObjectName objectName = new ObjectName(objectNameStr);
175                SubscriptionView view;
176                if (sub.getConsumerInfo().isDurable()) {
177                    view = new DurableSubscriptionView(this, context.getClientId(), sub);
178                } else {
179                    if (sub instanceof TopicSubscription) {
180                        view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription)sub);
181                    } else {
182                        view = new SubscriptionView(context.getClientId(), sub);
183                    }
184                }
185                registerSubscription(objectName, sub.getConsumerInfo(), key, view);
186                subscriptionMap.put(sub, objectName);
187                return objectName;
188            } catch (Exception e) {
189                LOG.error("Failed to register subscription " + sub, e);
190                return null;
191            }
192        }
193    
194        public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) {
195            Hashtable map = brokerJmxObjectName.getKeyPropertyList();
196            String brokerDomain = brokerJmxObjectName.getDomain();
197            String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
198            String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
199            String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
200            String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
201            String persistentMode = "persistentMode=";
202            String consumerId = "";
203            if (sub.getConsumerInfo().isDurable()) {
204                persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
205            } else {
206                persistentMode += "Non-Durable";
207                if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) {
208                    consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
209                }
210            }
211            objectNameStr += persistentMode + ",";
212            objectNameStr += destinationType + ",";
213            objectNameStr += destinationName + ",";
214            objectNameStr += clientId;
215            objectNameStr += consumerId;
216            return objectNameStr;
217        }
218    
219        public void unregisterSubscription(Subscription sub) {
220            ObjectName name = subscriptionMap.remove(sub);
221            if (name != null) {
222                try {
223                    unregisterSubscription(name);
224                } catch (Exception e) {
225                    LOG.error("Failed to unregister subscription " + sub, e);
226                }
227            }
228        }
229    
230        protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
231            if (dest.isQueue()) {
232                if (dest.isTemporary()) {
233                    temporaryQueues.put(key, view);
234                } else {
235                    queues.put(key, view);
236                }
237            } else {
238                if (dest.isTemporary()) {
239                    temporaryTopics.put(key, view);
240                } else {
241                    topics.put(key, view);
242                }
243            }
244            try {
245                AnnotatedMBean.registerMBean(managementContext, view, key);
246                registeredMBeans.add(key);
247            } catch (Throwable e) {
248                LOG.warn("Failed to register MBean: " + key);
249                LOG.debug("Failure reason: " + e, e);
250            }
251        }
252    
253        protected void unregisterDestination(ObjectName key) throws Exception {
254            topics.remove(key);
255            queues.remove(key);
256            temporaryQueues.remove(key);
257            temporaryTopics.remove(key);
258            if (registeredMBeans.remove(key)) {
259                try {
260                    managementContext.unregisterMBean(key);
261                } catch (Throwable e) {
262                    LOG.warn("Failed to unregister MBean: " + key);
263                    LOG.debug("Failure reason: " + e, e);
264                }
265            }
266        }
267    
268        protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
269            ActiveMQDestination dest = info.getDestination();
270            if (dest.isQueue()) {
271                if (dest.isTemporary()) {
272                    temporaryQueueSubscribers.put(key, view);
273                } else {
274                    queueSubscribers.put(key, view);
275                }
276            } else {
277                if (dest.isTemporary()) {
278                    temporaryTopicSubscribers.put(key, view);
279                } else {
280                    if (info.isDurable()) {
281                        durableTopicSubscribers.put(key, view);
282                        // unregister any inactive durable subs
283                        try {
284                            ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
285                            if (inactiveName != null) {
286                                inactiveDurableTopicSubscribers.remove(inactiveName);
287                                registeredMBeans.remove(inactiveName);
288                                managementContext.unregisterMBean(inactiveName);
289                            }
290                        } catch (Throwable e) {
291                            LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
292                        }
293                    } else {
294                        topicSubscribers.put(key, view);
295                    }
296                }
297            }
298    
299            try {
300                AnnotatedMBean.registerMBean(managementContext, view, key);
301                registeredMBeans.add(key);
302            } catch (Throwable e) {
303                LOG.warn("Failed to register MBean: " + key);
304                LOG.debug("Failure reason: " + e, e);
305            }
306    
307        }
308    
309        protected void unregisterSubscription(ObjectName key) throws Exception {
310            queueSubscribers.remove(key);
311            topicSubscribers.remove(key);
312            inactiveDurableTopicSubscribers.remove(key);
313            temporaryQueueSubscribers.remove(key);
314            temporaryTopicSubscribers.remove(key);
315            if (registeredMBeans.remove(key)) {
316                try {
317                    managementContext.unregisterMBean(key);
318                } catch (Throwable e) {
319                    LOG.warn("Failed to unregister MBean: " + key);
320                    LOG.debug("Failure reason: " + e, e);
321                }
322            }
323            DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
324            if (view != null) {
325                // need to put this back in the inactive list
326                SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
327                SubscriptionInfo info = new SubscriptionInfo();
328                info.setClientId(subscriptionKey.getClientId());
329                info.setSubscriptionName(subscriptionKey.getSubscriptionName());
330                info.setDestination(new ActiveMQTopic(view.getDestinationName()));
331                addInactiveSubscription(subscriptionKey, info);
332            }
333        }
334    
335        protected void buildExistingSubscriptions() throws Exception {
336            Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
337            Set destinations = destinationFactory.getDestinations();
338            if (destinations != null) {
339                for (Iterator iter = destinations.iterator(); iter.hasNext();) {
340                    ActiveMQDestination dest = (ActiveMQDestination)iter.next();
341                    if (dest.isTopic()) {
342                        SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
343                        if (infos != null) {
344                            for (int i = 0; i < infos.length; i++) {
345                                SubscriptionInfo info = infos[i];
346                                LOG.debug("Restoring durable subscription: " + info);
347                                SubscriptionKey key = new SubscriptionKey(info);
348                                subscriptions.put(key, info);
349                            }
350                        }
351                    }
352                }
353            }
354            for (Iterator i = subscriptions.entrySet().iterator(); i.hasNext();) {
355                Map.Entry entry = (Entry)i.next();
356                SubscriptionKey key = (SubscriptionKey)entry.getKey();
357                SubscriptionInfo info = (SubscriptionInfo)entry.getValue();
358                addInactiveSubscription(key, info);
359            }
360        }
361    
362        protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) {
363            Hashtable map = brokerObjectName.getKeyPropertyList();
364            try {
365                ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=Subscription," + "active=false,"
366                                                       + "name=" + JMXSupport.encodeObjectNamePart(key.toString()) + "");
367                SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info);
368    
369                try {
370                    AnnotatedMBean.registerMBean(managementContext, view, objectName);
371                    registeredMBeans.add(objectName);
372                } catch (Throwable e) {
373                    LOG.warn("Failed to register MBean: " + key);
374                    LOG.debug("Failure reason: " + e, e);
375                }
376    
377                inactiveDurableTopicSubscribers.put(objectName, view);
378                subscriptionKeys.put(key, objectName);
379            } catch (Exception e) {
380                LOG.error("Failed to register subscription " + info, e);
381            }
382        }
383    
384        public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
385            List<Message> messages = getSubscriberMessages(view);
386            CompositeData c[] = new CompositeData[messages.size()];
387            for (int i = 0; i < c.length; i++) {
388                try {
389                    c[i] = OpenTypeSupport.convert(messages.get(i));
390                } catch (Throwable e) {
391                    LOG.error("failed to browse : " + view, e);
392                }
393            }
394            return c;
395        }
396    
397        public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
398            OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
399            List<Message> messages = getSubscriberMessages(view);
400            CompositeType ct = factory.getCompositeType();
401            TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
402            TabularDataSupport rc = new TabularDataSupport(tt);
403            for (int i = 0; i < messages.size(); i++) {
404                rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
405            }
406            return rc;
407        }
408    
409        protected List<Message> getSubscriberMessages(SubscriptionView view) {
410            // TODO It is very dangerous operation for big backlogs
411            if (!(destinationFactory instanceof DestinationFactoryImpl)) {
412                throw new RuntimeException("unsupported by " + destinationFactory);
413            }
414            PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
415            final List<Message> result = new ArrayList<Message>();
416            try {
417                ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
418                TopicMessageStore store = adapter.createTopicMessageStore(topic);
419                store.recover(new MessageRecoveryListener() {
420                    public boolean recoverMessage(Message message) throws Exception {
421                        result.add(message);
422                        return true;
423                    }
424    
425                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
426                        throw new RuntimeException("Should not be called.");
427                    }
428    
429                    public boolean hasSpace() {
430                        return true;
431                    }
432                    
433                    public boolean isDuplicate(MessageId id) {
434                        return false;
435                    }
436                });
437            } catch (Throwable e) {
438                LOG.error("Failed to browse messages for Subscription " + view, e);
439            }
440            return result;
441    
442        }
443    
444        protected ObjectName[] getTopics() {
445            Set<ObjectName> set = topics.keySet();
446            return set.toArray(new ObjectName[set.size()]);
447        }
448    
449        protected ObjectName[] getQueues() {
450            Set<ObjectName> set = queues.keySet();
451            return set.toArray(new ObjectName[set.size()]);
452        }
453    
454        protected ObjectName[] getTemporaryTopics() {
455            Set<ObjectName> set = temporaryTopics.keySet();
456            return set.toArray(new ObjectName[set.size()]);
457        }
458    
459        protected ObjectName[] getTemporaryQueues() {
460            Set<ObjectName> set = temporaryQueues.keySet();
461            return set.toArray(new ObjectName[set.size()]);
462        }
463    
464        protected ObjectName[] getTopicSubscribers() {
465            Set<ObjectName> set = topicSubscribers.keySet();
466            return set.toArray(new ObjectName[set.size()]);
467        }
468    
469        protected ObjectName[] getDurableTopicSubscribers() {
470            Set<ObjectName> set = durableTopicSubscribers.keySet();
471            return set.toArray(new ObjectName[set.size()]);
472        }
473    
474        protected ObjectName[] getQueueSubscribers() {
475            Set<ObjectName> set = queueSubscribers.keySet();
476            return set.toArray(new ObjectName[set.size()]);
477        }
478    
479        protected ObjectName[] getTemporaryTopicSubscribers() {
480            Set<ObjectName> set = temporaryTopicSubscribers.keySet();
481            return set.toArray(new ObjectName[set.size()]);
482        }
483    
484        protected ObjectName[] getTemporaryQueueSubscribers() {
485            Set<ObjectName> set = temporaryQueueSubscribers.keySet();
486            return set.toArray(new ObjectName[set.size()]);
487        }
488    
489        protected ObjectName[] getInactiveDurableTopicSubscribers() {
490            Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
491            return set.toArray(new ObjectName[set.size()]);
492        }
493    
494        public Broker getContextBroker() {
495            return contextBroker;
496        }
497    
498        public void setContextBroker(Broker contextBroker) {
499            this.contextBroker = contextBroker;
500        }
501    
502        protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException {
503            // Build the object name for the destination
504            Hashtable map = brokerObjectName.getKeyPropertyList();
505            ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type="
506                                                   + JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination="
507                                                   + JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
508            return objectName;
509        }
510    }