001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import javax.management.openmbean.CompositeData;
020    import javax.management.openmbean.OpenDataException;
021    import javax.jms.JMSException;
022    
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.broker.region.Queue;
025    import org.apache.activemq.command.ActiveMQDestination;
026    import org.apache.activemq.command.Message;
027    
028    /**
029     * Provides a JMX Management view of a Queue.
030     */
031    public class QueueView extends DestinationView implements QueueViewMBean {
032        public QueueView(ManagedRegionBroker broker, Queue destination) {
033            super(broker, destination);
034        }
035    
036        public CompositeData getMessage(String messageId) throws OpenDataException {
037            Message rc = ((Queue)destination).getMessage(messageId);
038            if (rc == null) {
039                return null;
040            }
041            return OpenTypeSupport.convert(rc);
042        }
043    
044        public void purge() throws Exception {
045            ((Queue)destination).purge();
046        }
047    
048        public boolean removeMessage(String messageId) throws Exception {
049            return ((Queue)destination).removeMessage(messageId);
050        }
051    
052        public int removeMatchingMessages(String selector) throws Exception {
053            return ((Queue)destination).removeMatchingMessages(selector);
054        }
055    
056        public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
057            return ((Queue)destination).removeMatchingMessages(selector, maximumMessages);
058        }
059    
060        public boolean copyMessageTo(String messageId, String destinationName) throws Exception {
061            ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker());
062            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
063            return ((Queue)destination).copyMessageTo(context, messageId, toDestination);
064        }
065    
066        public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception {
067            ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker());
068            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
069            return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination);
070        }
071    
072        public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
073            ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker());
074            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
075            return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages);
076        }
077    
078        public boolean moveMessageTo(String messageId, String destinationName) throws Exception {
079            ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker());
080            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
081            return ((Queue)destination).moveMessageTo(context, messageId, toDestination);
082        }
083    
084        public int moveMatchingMessagesTo(String selector, String destinationName) throws Exception {
085            ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker());
086            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
087            return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination);
088        }
089    
090        public int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
091            ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker());
092            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
093            return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages);
094        }
095    
096        /**
097         * Moves a message back to its original destination
098         */
099        public boolean retryMessage(String messageId) throws Exception {
100            Queue queue = (Queue) destination;
101            Message rc = queue.getMessage(messageId);
102            if (rc != null) {
103                rc = rc.copy();
104                rc.getMessage().setRedeliveryCounter(0);
105                ActiveMQDestination originalDestination = rc.getOriginalDestination();
106                if (originalDestination != null) {
107                    ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker());
108                    return queue.moveMessageTo(context, rc, originalDestination);
109                }
110                else {
111                    throw new JMSException("No original destination for message: "+ messageId);
112                }
113            }
114            else {
115                throw new JMSException("Could not find message: "+ messageId);
116            }
117        }
118        
119        public int cursorSize() {
120            Queue queue = (Queue) destination;
121            if (queue.getMessages() != null){
122                return queue.getMessages().size();
123            }
124            return 0;
125        }
126    
127       
128        public boolean doesCursorHaveMessagesBuffered() {
129           Queue queue = (Queue) destination;
130           if (queue.getMessages() != null){
131               return queue.getMessages().hasMessagesBufferedToDeliver();
132           }
133           return false;
134    
135        }
136    
137        
138        public boolean doesCursorHaveSpace() {
139            Queue queue = (Queue) destination;
140            if (queue.getMessages() != null){
141                return queue.getMessages().hasSpace();
142            }
143            return false;
144        }
145    
146        
147        public long getCursorMemoryUsage() {
148            Queue queue = (Queue) destination;
149            if (queue.getMessages() != null &&  queue.getMessages().getSystemUsage() != null){
150                return queue.getMessages().getSystemUsage().getMemoryUsage().getUsage();
151            }
152            return 0;
153        }
154    
155        public int getCursorPercentUsage() {
156            Queue queue = (Queue) destination;
157            if (queue.getMessages() != null &&  queue.getMessages().getSystemUsage() != null){
158                return queue.getMessages().getSystemUsage().getMemoryUsage().getPercentUsage();
159            }
160            return 0;
161        }
162    
163        public boolean isCursorFull() {
164            Queue queue = (Queue) destination;
165            if (queue.getMessages() != null){
166                return queue.getMessages().isFull();
167            }
168            return false;
169        }
170    }