001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.util; 018 019 import java.io.IOException; 020 import java.util.Set; 021 import javax.annotation.PostConstruct; 022 import org.apache.activemq.broker.BrokerPluginSupport; 023 import org.apache.activemq.broker.Connection; 024 import org.apache.activemq.broker.ConnectionContext; 025 import org.apache.activemq.broker.ConsumerBrokerExchange; 026 import org.apache.activemq.broker.ProducerBrokerExchange; 027 import org.apache.activemq.broker.region.Destination; 028 import org.apache.activemq.broker.region.MessageReference; 029 import org.apache.activemq.broker.region.Subscription; 030 import org.apache.activemq.command.ActiveMQDestination; 031 import org.apache.activemq.command.BrokerInfo; 032 import org.apache.activemq.command.ConnectionInfo; 033 import org.apache.activemq.command.ConsumerInfo; 034 import org.apache.activemq.command.DestinationInfo; 035 import org.apache.activemq.command.Message; 036 import org.apache.activemq.command.MessageAck; 037 import org.apache.activemq.command.MessageDispatch; 038 import org.apache.activemq.command.MessageDispatchNotification; 039 import org.apache.activemq.command.MessagePull; 040 import org.apache.activemq.command.ProducerInfo; 041 import org.apache.activemq.command.RemoveSubscriptionInfo; 042 import org.apache.activemq.command.Response; 043 import org.apache.activemq.command.SessionInfo; 044 import org.apache.activemq.command.TransactionId; 045 import org.apache.activemq.usage.Usage; 046 import org.apache.commons.logging.Log; 047 import org.apache.commons.logging.LogFactory; 048 049 /** 050 * A simple Broker intercepter which allows you to enable/disable logging. 051 * 052 * @org.apache.xbean.XBean 053 */ 054 055 public class LoggingBrokerPlugin extends BrokerPluginSupport { 056 057 private static final Log LOG = LogFactory.getLog(LoggingBrokerPlugin.class); 058 059 private boolean logAll = false; 060 private boolean logMessageEvents = false; 061 private boolean logConnectionEvents = true; 062 private boolean logTransactionEvents = false; 063 private boolean logConsumerEvents = false; 064 private boolean logProducerEvents = false; 065 private boolean logInternalEvents = false; 066 067 /** 068 * 069 * @throws Exception 070 * @org.apache.xbean.InitMethod 071 */ 072 @PostConstruct 073 public void afterPropertiesSet() throws Exception { 074 LOG.info("Created LoggingBrokerPlugin: " + this.toString()); 075 } 076 077 public boolean isLogAll() { 078 return logAll; 079 } 080 081 /** 082 * Log all Events that go through the Plugin 083 */ 084 public void setLogAll(boolean logAll) { 085 this.logAll = logAll; 086 } 087 088 public boolean isLogMessageEvents() { 089 return logMessageEvents; 090 } 091 092 /** 093 * Log Events that are related to message processing 094 */ 095 public void setLogMessageEvents(boolean logMessageEvents) { 096 this.logMessageEvents = logMessageEvents; 097 } 098 099 public boolean isLogConnectionEvents() { 100 return logConnectionEvents; 101 } 102 103 /** 104 * Log Events that are related to connections and sessions 105 */ 106 public void setLogConnectionEvents(boolean logConnectionEvents) { 107 this.logConnectionEvents = logConnectionEvents; 108 } 109 110 public boolean isLogTransactionEvents() { 111 return logTransactionEvents; 112 } 113 114 /** 115 * Log Events that are related to transaction processing 116 */ 117 public void setLogTransactionEvents(boolean logTransactionEvents) { 118 this.logTransactionEvents = logTransactionEvents; 119 } 120 121 public boolean isLogConsumerEvents() { 122 return logConsumerEvents; 123 } 124 125 /** 126 * Log Events that are related to Consumers 127 */ 128 public void setLogConsumerEvents(boolean logConsumerEvents) { 129 this.logConsumerEvents = logConsumerEvents; 130 } 131 132 public boolean isLogProducerEvents() { 133 return logProducerEvents; 134 } 135 136 /** 137 * Log Events that are related to Producers 138 */ 139 public void setLogProducerEvents(boolean logProducerEvents) { 140 this.logProducerEvents = logProducerEvents; 141 } 142 143 public boolean isLogInternalEvents() { 144 return logInternalEvents; 145 } 146 147 /** 148 * Log Events that are normally internal to the broker 149 */ 150 public void setLogInternalEvents(boolean logInternalEvents) { 151 this.logInternalEvents = logInternalEvents; 152 } 153 154 public void acknowledge(ConsumerBrokerExchange consumerExchange, 155 MessageAck ack) throws Exception { 156 if (isLogAll() || isLogConsumerEvents()) { 157 LOG.info("Acknowledging message for client ID : " 158 + consumerExchange.getConnectionContext().getClientId() 159 + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : "")); 160 if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) { 161 LOG.trace("Message count: " + ack.getMessageCount() 162 + ", First Message Id: " + ack.getFirstMessageId() 163 + ", Last Message Id: " + ack.getLastMessageId()); 164 } 165 } 166 super.acknowledge(consumerExchange, ack); 167 } 168 169 public Response messagePull(ConnectionContext context, MessagePull pull) 170 throws Exception { 171 if (isLogAll() || isLogConsumerEvents()) { 172 LOG.info("Message Pull from : " + context.getClientId() + " on " 173 + pull.getDestination().getPhysicalName()); 174 } 175 return super.messagePull(context, pull); 176 } 177 178 public void addConnection(ConnectionContext context, ConnectionInfo info) 179 throws Exception { 180 if (isLogAll() || isLogConnectionEvents()) { 181 LOG.info("Adding Connection : " + context); 182 } 183 super.addConnection(context, info); 184 } 185 186 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) 187 throws Exception { 188 if (isLogAll() || isLogConsumerEvents()) { 189 LOG.info("Adding Consumer : " + info); 190 } 191 return super.addConsumer(context, info); 192 } 193 194 public void addProducer(ConnectionContext context, ProducerInfo info) 195 throws Exception { 196 if (isLogAll() || isLogProducerEvents()) { 197 LOG.info("Adding Producer :" + info); 198 } 199 super.addProducer(context, info); 200 } 201 202 public void commitTransaction(ConnectionContext context, TransactionId xid, 203 boolean onePhase) throws Exception { 204 if (isLogAll() || isLogTransactionEvents()) { 205 LOG.info("Commiting transaction : " + xid.getTransactionKey()); 206 } 207 super.commitTransaction(context, xid, onePhase); 208 } 209 210 public void removeSubscription(ConnectionContext context, 211 RemoveSubscriptionInfo info) throws Exception { 212 if (isLogAll() || isLogConsumerEvents()) { 213 LOG.info("Removing subscription : " + info); 214 } 215 super.removeSubscription(context, info); 216 } 217 218 public TransactionId[] getPreparedTransactions(ConnectionContext context) 219 throws Exception { 220 221 TransactionId[] result = super.getPreparedTransactions(context); 222 if ((isLogAll() || isLogTransactionEvents()) && result != null) { 223 StringBuffer tids = new StringBuffer(); 224 for (TransactionId tid : result) { 225 if (tids.length() > 0) { 226 tids.append(", "); 227 } 228 tids.append(tid.getTransactionKey()); 229 } 230 LOG.info("Prepared transactions : " + tids); 231 } 232 return result; 233 } 234 235 public int prepareTransaction(ConnectionContext context, TransactionId xid) 236 throws Exception { 237 if (isLogAll() || isLogTransactionEvents()) { 238 LOG.info("Preparing transaction : " + xid.getTransactionKey()); 239 } 240 return super.prepareTransaction(context, xid); 241 } 242 243 public void removeConnection(ConnectionContext context, 244 ConnectionInfo info, Throwable error) throws Exception { 245 if (isLogAll() || isLogConnectionEvents()) { 246 LOG.info("Removing Connection : " + info); 247 } 248 super.removeConnection(context, info, error); 249 } 250 251 public void removeConsumer(ConnectionContext context, ConsumerInfo info) 252 throws Exception { 253 if (isLogAll() || isLogConsumerEvents()) { 254 LOG.info("Removing Consumer : " + info); 255 } 256 super.removeConsumer(context, info); 257 } 258 259 public void removeProducer(ConnectionContext context, ProducerInfo info) 260 throws Exception { 261 if (isLogAll() || isLogProducerEvents()) { 262 LOG.info("Removing Producer : " + info); 263 } 264 super.removeProducer(context, info); 265 } 266 267 public void rollbackTransaction(ConnectionContext context, TransactionId xid) 268 throws Exception { 269 if (isLogAll() || isLogTransactionEvents()) { 270 LOG.info("Rolling back Transaction : " + xid.getTransactionKey()); 271 } 272 super.rollbackTransaction(context, xid); 273 } 274 275 public void send(ProducerBrokerExchange producerExchange, 276 Message messageSend) throws Exception { 277 if (isLogAll() || isLogProducerEvents()) { 278 LOG.info("Sending message : " + messageSend); 279 } 280 super.send(producerExchange, messageSend); 281 } 282 283 public void beginTransaction(ConnectionContext context, TransactionId xid) 284 throws Exception { 285 if (isLogAll() || isLogTransactionEvents()) { 286 LOG.info("Beginning transaction : " + xid.getTransactionKey()); 287 } 288 super.beginTransaction(context, xid); 289 } 290 291 public void forgetTransaction(ConnectionContext context, 292 TransactionId transactionId) throws Exception { 293 if (isLogAll() || isLogTransactionEvents()) { 294 LOG.info("Forgetting transaction : " 295 + transactionId.getTransactionKey()); 296 } 297 super.forgetTransaction(context, transactionId); 298 } 299 300 public Connection[] getClients() throws Exception { 301 Connection[] result = super.getClients(); 302 303 if (isLogAll() || isLogInternalEvents()) { 304 if (result == null) { 305 LOG.info("Get Clients returned empty list."); 306 } else { 307 StringBuffer cids = new StringBuffer(); 308 for (Connection c : result) { 309 cids.append(cids.length() > 0 ? ", " : ""); 310 cids.append(c.getConnectionId()); 311 } 312 LOG.info("Connected clients : " + cids); 313 } 314 } 315 return super.getClients(); 316 } 317 318 public org.apache.activemq.broker.region.Destination addDestination( 319 ConnectionContext context, ActiveMQDestination destination) 320 throws Exception { 321 if (isLogAll() || isLogInternalEvents()) { 322 LOG.info("Adding destination : " 323 + destination.getDestinationTypeAsString() + ":" 324 + destination.getPhysicalName()); 325 } 326 return super.addDestination(context, destination); 327 } 328 329 public void removeDestination(ConnectionContext context, 330 ActiveMQDestination destination, long timeout) throws Exception { 331 if (isLogAll() || isLogInternalEvents()) { 332 LOG.info("Removing destination : " 333 + destination.getDestinationTypeAsString() + ":" 334 + destination.getPhysicalName()); 335 } 336 super.removeDestination(context, destination, timeout); 337 } 338 339 public ActiveMQDestination[] getDestinations() throws Exception { 340 ActiveMQDestination[] result = super.getDestinations(); 341 if (isLogAll() || isLogInternalEvents()) { 342 if (result == null) { 343 LOG.info("Get Destinations returned empty list."); 344 } else { 345 StringBuffer destinations = new StringBuffer(); 346 for (ActiveMQDestination dest : result) { 347 destinations.append(destinations.length() > 0 ? ", " : ""); 348 destinations.append(dest.getPhysicalName()); 349 } 350 LOG.info("Get Destinations : " + destinations); 351 } 352 } 353 return result; 354 } 355 356 public void start() throws Exception { 357 if (isLogAll() || isLogInternalEvents()) { 358 LOG.info("Starting " + getBrokerName()); 359 } 360 super.start(); 361 } 362 363 public void stop() throws Exception { 364 if (isLogAll() || isLogInternalEvents()) { 365 LOG.info("Stopping " + getBrokerName()); 366 } 367 super.stop(); 368 } 369 370 public void addSession(ConnectionContext context, SessionInfo info) 371 throws Exception { 372 if (isLogAll() || isLogConnectionEvents()) { 373 LOG.info("Adding Session : " + info); 374 } 375 super.addSession(context, info); 376 } 377 378 public void removeSession(ConnectionContext context, SessionInfo info) 379 throws Exception { 380 if (isLogAll() || isLogConnectionEvents()) { 381 LOG.info("Removing Session : " + info); 382 } 383 super.removeSession(context, info); 384 } 385 386 public void addBroker(Connection connection, BrokerInfo info) { 387 if (isLogAll() || isLogInternalEvents()) { 388 LOG.info("Adding Broker " + info.getBrokerName()); 389 } 390 super.addBroker(connection, info); 391 } 392 393 public void removeBroker(Connection connection, BrokerInfo info) { 394 if (isLogAll() || isLogInternalEvents()) { 395 LOG.info("Removing Broker " + info.getBrokerName()); 396 } 397 super.removeBroker(connection, info); 398 } 399 400 public BrokerInfo[] getPeerBrokerInfos() { 401 BrokerInfo[] result = super.getPeerBrokerInfos(); 402 if (isLogAll() || isLogInternalEvents()) { 403 if (result == null) { 404 LOG.info("Get Peer Broker Infos returned empty list."); 405 } else { 406 StringBuffer peers = new StringBuffer(); 407 for (BrokerInfo bi : result) { 408 peers.append(peers.length() > 0 ? ", " : ""); 409 peers.append(bi.getBrokerName()); 410 } 411 LOG.info("Get Peer Broker Infos : " + peers); 412 } 413 } 414 return result; 415 } 416 417 public void preProcessDispatch(MessageDispatch messageDispatch) { 418 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 419 LOG.info("preProcessDispatch :" + messageDispatch); 420 } 421 super.preProcessDispatch(messageDispatch); 422 } 423 424 public void postProcessDispatch(MessageDispatch messageDispatch) { 425 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 426 LOG.info("postProcessDispatch :" + messageDispatch); 427 } 428 super.postProcessDispatch(messageDispatch); 429 } 430 431 public void processDispatchNotification( 432 MessageDispatchNotification messageDispatchNotification) 433 throws Exception { 434 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 435 LOG.info("ProcessDispatchNotification :" 436 + messageDispatchNotification); 437 } 438 super.processDispatchNotification(messageDispatchNotification); 439 } 440 441 public Set<ActiveMQDestination> getDurableDestinations() { 442 Set<ActiveMQDestination> result = super.getDurableDestinations(); 443 if (isLogAll() || isLogInternalEvents()) { 444 if (result == null) { 445 LOG.info("Get Durable Destinations returned empty list."); 446 } else { 447 StringBuffer destinations = new StringBuffer(); 448 for (ActiveMQDestination dest : result) { 449 destinations.append(destinations.length() > 0 ? ", " : ""); 450 destinations.append(dest.getPhysicalName()); 451 } 452 LOG.info("Get Durable Destinations : " + destinations); 453 } 454 } 455 return result; 456 } 457 458 public void addDestinationInfo(ConnectionContext context, 459 DestinationInfo info) throws Exception { 460 if (isLogAll() || isLogInternalEvents()) { 461 LOG.info("Adding destination info : " + info); 462 } 463 super.addDestinationInfo(context, info); 464 } 465 466 public void removeDestinationInfo(ConnectionContext context, 467 DestinationInfo info) throws Exception { 468 if (isLogAll() || isLogInternalEvents()) { 469 LOG.info("Removing destination info : " + info); 470 } 471 super.removeDestinationInfo(context, info); 472 } 473 474 public void messageExpired(ConnectionContext context, 475 MessageReference message) { 476 if (isLogAll() || isLogInternalEvents()) { 477 String msg = "Unable to display message."; 478 try { 479 msg = message.getMessage().toString(); 480 } catch (IOException ioe) { 481 } 482 LOG.info("Message has expired : " + msg); 483 } 484 super.messageExpired(context, message); 485 } 486 487 public void sendToDeadLetterQueue(ConnectionContext context, 488 MessageReference messageReference) { 489 if (isLogAll() || isLogInternalEvents()) { 490 String msg = "Unable to display message."; 491 try { 492 msg = messageReference.getMessage().toString(); 493 } catch (IOException ioe) { 494 } 495 LOG.info("Sending to DLQ : " + msg); 496 } 497 } 498 499 public void fastProducer(ConnectionContext context, 500 ProducerInfo producerInfo) { 501 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { 502 LOG.info("Fast Producer : " + producerInfo); 503 } 504 super.fastProducer(context, producerInfo); 505 } 506 507 public void isFull(ConnectionContext context, Destination destination, 508 Usage usage) { 509 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { 510 LOG.info("Destination is full : " + destination.getName()); 511 } 512 super.isFull(context, destination, usage); 513 } 514 515 public void messageConsumed(ConnectionContext context, 516 MessageReference messageReference) { 517 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 518 String msg = "Unable to display message."; 519 try { 520 msg = messageReference.getMessage().toString(); 521 } catch (IOException ioe) { 522 } 523 LOG.info("Message consumed : " + msg); 524 } 525 super.messageConsumed(context, messageReference); 526 } 527 528 public void messageDelivered(ConnectionContext context, 529 MessageReference messageReference) { 530 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 531 String msg = "Unable to display message."; 532 try { 533 msg = messageReference.getMessage().toString(); 534 } catch (IOException ioe) { 535 } 536 LOG.info("Message delivered : " + msg); 537 } 538 super.messageDelivered(context, messageReference); 539 } 540 541 public void messageDiscarded(ConnectionContext context, 542 MessageReference messageReference) { 543 if (isLogAll() || isLogInternalEvents()) { 544 String msg = "Unable to display message."; 545 try { 546 msg = messageReference.getMessage().toString(); 547 } catch (IOException ioe) { 548 } 549 LOG.info("Message discarded : " + msg); 550 } 551 super.messageDiscarded(context, messageReference); 552 } 553 554 public void slowConsumer(ConnectionContext context, 555 Destination destination, Subscription subs) { 556 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 557 LOG.info("Detected slow consumer on " + destination.getName()); 558 StringBuffer buf = new StringBuffer("Connection("); 559 buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId()); 560 buf.append(") Session("); 561 buf.append(subs.getConsumerInfo().getConsumerId().getSessionId()); 562 buf.append(")"); 563 LOG.info(buf); 564 } 565 super.slowConsumer(context, destination, subs); 566 } 567 568 public void nowMasterBroker() { 569 if (isLogAll() || isLogInternalEvents()) { 570 LOG.info("Is now the master broker : " + getBrokerName()); 571 } 572 super.nowMasterBroker(); 573 } 574 575 public String toString() { 576 StringBuffer buf = new StringBuffer(); 577 buf.append("LoggingBrokerPlugin("); 578 buf.append("logAll="); 579 buf.append(isLogAll()); 580 buf.append(", logConnectionEvents="); 581 buf.append(isLogConnectionEvents()); 582 buf.append(", logConsumerEvents="); 583 buf.append(isLogConsumerEvents()); 584 buf.append(", logProducerEvents="); 585 buf.append(isLogProducerEvents()); 586 buf.append(", logMessageEvents="); 587 buf.append(isLogMessageEvents()); 588 buf.append(", logTransactionEvents="); 589 buf.append(isLogTransactionEvents()); 590 buf.append(", logInternalEvents="); 591 buf.append(isLogInternalEvents()); 592 buf.append(")"); 593 return buf.toString(); 594 } 595 }