001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.jmx; 018 019 import org.apache.activemq.ActiveMQConnectionFactory; 020 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; 021 import org.apache.activemq.broker.region.Destination; 022 import org.apache.activemq.broker.region.Subscription; 023 import org.apache.activemq.command.ActiveMQDestination; 024 import org.apache.activemq.command.ActiveMQMessage; 025 import org.apache.activemq.command.ActiveMQTextMessage; 026 import org.apache.activemq.command.Message; 027 import org.apache.activemq.filter.BooleanExpression; 028 import org.apache.activemq.filter.MessageEvaluationContext; 029 import org.apache.activemq.selector.SelectorParser; 030 import org.apache.commons.logging.Log; 031 import org.apache.commons.logging.LogFactory; 032 import java.io.IOException; 033 import java.util.ArrayList; 034 import java.util.Collections; 035 import java.util.Iterator; 036 import java.util.List; 037 import java.util.Map; 038 import javax.jms.Connection; 039 import javax.jms.InvalidSelectorException; 040 import javax.jms.MessageProducer; 041 import javax.jms.Session; 042 import javax.management.MalformedObjectNameException; 043 import javax.management.ObjectName; 044 import javax.management.openmbean.CompositeData; 045 import javax.management.openmbean.CompositeDataSupport; 046 import javax.management.openmbean.CompositeType; 047 import javax.management.openmbean.OpenDataException; 048 import javax.management.openmbean.TabularData; 049 import javax.management.openmbean.TabularDataSupport; 050 import javax.management.openmbean.TabularType; 051 052 public class DestinationView implements DestinationViewMBean { 053 private static final Log LOG = LogFactory.getLog(DestinationViewMBean.class); 054 protected final Destination destination; 055 protected final ManagedRegionBroker broker; 056 057 public DestinationView(ManagedRegionBroker broker, Destination destination) { 058 this.broker = broker; 059 this.destination = destination; 060 } 061 062 public void gc() { 063 destination.gc(); 064 } 065 066 public String getName() { 067 return destination.getName(); 068 } 069 070 public void resetStatistics() { 071 destination.getDestinationStatistics().reset(); 072 } 073 074 public long getEnqueueCount() { 075 return destination.getDestinationStatistics().getEnqueues().getCount(); 076 } 077 078 public long getDequeueCount() { 079 return destination.getDestinationStatistics().getDequeues().getCount(); 080 } 081 082 public long getDispatchCount() { 083 return destination.getDestinationStatistics().getDispatched().getCount(); 084 } 085 086 public long getInFlightCount() { 087 return destination.getDestinationStatistics().getInflight().getCount(); 088 } 089 090 public long getExpiredCount() { 091 return destination.getDestinationStatistics().getExpired().getCount(); 092 } 093 094 public long getConsumerCount() { 095 return destination.getDestinationStatistics().getConsumers().getCount(); 096 } 097 098 public long getQueueSize() { 099 return destination.getDestinationStatistics().getMessages().getCount(); 100 } 101 102 public long getMessagesCached() { 103 return destination.getDestinationStatistics().getMessagesCached().getCount(); 104 } 105 106 public int getMemoryPercentUsage() { 107 return destination.getMemoryUsage().getPercentUsage(); 108 } 109 110 public long getMemoryLimit() { 111 return destination.getMemoryUsage().getLimit(); 112 } 113 114 public void setMemoryLimit(long limit) { 115 destination.getMemoryUsage().setLimit(limit); 116 } 117 118 public double getAverageEnqueueTime() { 119 return destination.getDestinationStatistics().getProcessTime().getAverageTime(); 120 } 121 122 public long getMaxEnqueueTime() { 123 return destination.getDestinationStatistics().getProcessTime().getMaxTime(); 124 } 125 126 public long getMinEnqueueTime() { 127 return destination.getDestinationStatistics().getProcessTime().getMinTime(); 128 } 129 130 public CompositeData[] browse() throws OpenDataException { 131 try { 132 return browse(null); 133 } catch (InvalidSelectorException e) { 134 // should not happen. 135 throw new RuntimeException(e); 136 } 137 } 138 139 public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException { 140 Message[] messages = destination.browse(); 141 ArrayList<CompositeData> c = new ArrayList<CompositeData>(); 142 143 MessageEvaluationContext ctx = new MessageEvaluationContext(); 144 ctx.setDestination(destination.getActiveMQDestination()); 145 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 146 147 for (int i = 0; i < messages.length; i++) { 148 try { 149 150 if (selectorExpression == null) { 151 c.add(OpenTypeSupport.convert(messages[i])); 152 } else { 153 ctx.setMessageReference(messages[i]); 154 if (selectorExpression.matches(ctx)) { 155 c.add(OpenTypeSupport.convert(messages[i])); 156 } 157 } 158 159 } catch (Throwable e) { 160 // TODO DELETE ME 161 System.out.println(e); 162 e.printStackTrace(); 163 // TODO DELETE ME 164 LOG.warn("exception browsing destination", e); 165 } 166 } 167 168 CompositeData rc[] = new CompositeData[c.size()]; 169 c.toArray(rc); 170 return rc; 171 } 172 173 /** 174 * Browses the current destination returning a list of messages 175 */ 176 public List<Object> browseMessages() throws InvalidSelectorException { 177 return browseMessages(null); 178 } 179 180 /** 181 * Browses the current destination with the given selector returning a list 182 * of messages 183 */ 184 public List<Object> browseMessages(String selector) throws InvalidSelectorException { 185 Message[] messages = destination.browse(); 186 ArrayList<Object> answer = new ArrayList<Object>(); 187 188 MessageEvaluationContext ctx = new MessageEvaluationContext(); 189 ctx.setDestination(destination.getActiveMQDestination()); 190 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 191 192 for (int i = 0; i < messages.length; i++) { 193 try { 194 Message message = messages[i]; 195 if (selectorExpression == null) { 196 answer.add(OpenTypeSupport.convert(message)); 197 } else { 198 ctx.setMessageReference(message); 199 if (selectorExpression.matches(ctx)) { 200 answer.add(message); 201 } 202 } 203 204 } catch (Throwable e) { 205 LOG.warn("exception browsing destination", e); 206 } 207 } 208 return answer; 209 } 210 211 public TabularData browseAsTable() throws OpenDataException { 212 try { 213 return browseAsTable(null); 214 } catch (InvalidSelectorException e) { 215 throw new RuntimeException(e); 216 } 217 } 218 219 public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException { 220 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); 221 Message[] messages = destination.browse(); 222 CompositeType ct = factory.getCompositeType(); 223 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" }); 224 TabularDataSupport rc = new TabularDataSupport(tt); 225 226 MessageEvaluationContext ctx = new MessageEvaluationContext(); 227 ctx.setDestination(destination.getActiveMQDestination()); 228 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 229 230 for (int i = 0; i < messages.length; i++) { 231 try { 232 if (selectorExpression == null) { 233 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); 234 } else { 235 ctx.setMessageReference(messages[i]); 236 if (selectorExpression.matches(ctx)) { 237 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); 238 } 239 } 240 } catch (Throwable e) { 241 LOG.warn("exception browsing destination", e); 242 } 243 } 244 245 return rc; 246 } 247 248 public String sendTextMessage(String body) throws Exception { 249 return sendTextMessage(Collections.EMPTY_MAP, body); 250 } 251 252 public String sendTextMessage(Map headers, String body) throws Exception { 253 return sendTextMessage(headers, body, null, null); 254 } 255 256 public String sendTextMessage(String body, String user, String password) throws Exception { 257 return sendTextMessage(Collections.EMPTY_MAP, body, user, password); 258 } 259 260 public String sendTextMessage(Map headers, String body, String userName, String password) throws Exception { 261 262 String brokerUrl = "vm://" + broker.getBrokerName(); 263 ActiveMQDestination dest = destination.getActiveMQDestination(); 264 265 ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl); 266 Connection connection = null; 267 try { 268 269 connection = cf.createConnection(userName, password); 270 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 271 MessageProducer producer = session.createProducer(dest); 272 ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body); 273 274 for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) { 275 Map.Entry entry = (Map.Entry) iter.next(); 276 msg.setObjectProperty((String) entry.getKey(), entry.getValue()); 277 } 278 279 producer.setDeliveryMode(msg.getJMSDeliveryMode()); 280 producer.setPriority(msg.getPriority()); 281 long ttl = msg.getExpiration() - System.currentTimeMillis(); 282 producer.setTimeToLive(ttl > 0 ? ttl : 0); 283 producer.send(msg); 284 285 return msg.getJMSMessageID(); 286 287 } finally { 288 connection.close(); 289 } 290 291 } 292 293 public int getMaxAuditDepth() { 294 return destination.getMaxAuditDepth(); 295 } 296 297 public int getMaxProducersToAudit() { 298 return destination.getMaxProducersToAudit(); 299 } 300 301 public boolean isEnableAudit() { 302 return destination.isEnableAudit(); 303 } 304 305 public void setEnableAudit(boolean enableAudit) { 306 destination.setEnableAudit(enableAudit); 307 } 308 309 public void setMaxAuditDepth(int maxAuditDepth) { 310 destination.setMaxAuditDepth(maxAuditDepth); 311 } 312 313 public void setMaxProducersToAudit(int maxProducersToAudit) { 314 destination.setMaxProducersToAudit(maxProducersToAudit); 315 } 316 317 public float getMemoryUsagePortion() { 318 return destination.getMemoryUsage().getUsagePortion(); 319 } 320 321 public long getProducerCount() { 322 return destination.getDestinationStatistics().getProducers().getCount(); 323 } 324 325 public boolean isProducerFlowControl() { 326 return destination.isProducerFlowControl(); 327 } 328 329 public void setMemoryUsagePortion(float value) { 330 destination.getMemoryUsage().setUsagePortion(value); 331 } 332 333 public void setProducerFlowControl(boolean producerFlowControl) { 334 destination.setProducerFlowControl(producerFlowControl); 335 } 336 337 /** 338 * Set's the interval at which warnings about producers being blocked by 339 * resource usage will be triggered. Values of 0 or less will disable 340 * warnings 341 * 342 * @param blockedProducerWarningInterval the interval at which warning about 343 * blocked producers will be triggered. 344 */ 345 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 346 destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval); 347 } 348 349 /** 350 * 351 * @return the interval at which warning about blocked producers will be 352 * triggered. 353 */ 354 public long getBlockedProducerWarningInterval() { 355 return destination.getBlockedProducerWarningInterval(); 356 } 357 358 public int getMaxPageSize() { 359 return destination.getMaxPageSize(); 360 } 361 362 public void setMaxPageSize(int pageSize) { 363 destination.setMaxPageSize(pageSize); 364 } 365 366 public boolean isUseCache() { 367 return destination.isUseCache(); 368 } 369 370 public void setUseCache(boolean value) { 371 destination.setUseCache(value); 372 } 373 374 public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException { 375 List<Subscription> subscriptions = destination.getConsumers(); 376 ObjectName[] answer = new ObjectName[subscriptions.size()]; 377 ObjectName objectName = broker.getBrokerService().getBrokerObjectName(); 378 int index = 0; 379 for (Subscription subscription : subscriptions) { 380 String connectionClientId = subscription.getContext().getClientId(); 381 String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription, connectionClientId, objectName); 382 answer[index++] = new ObjectName(objectNameStr); 383 } 384 return answer; 385 } 386 387 }