001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.advisory; 018 019 import java.util.Iterator; 020 import java.util.Map; 021 import java.util.Set; 022 import java.util.concurrent.ConcurrentHashMap; 023 024 import org.apache.activemq.broker.Broker; 025 import org.apache.activemq.broker.BrokerFilter; 026 import org.apache.activemq.broker.ConnectionContext; 027 import org.apache.activemq.broker.ProducerBrokerExchange; 028 import org.apache.activemq.broker.region.Destination; 029 import org.apache.activemq.broker.region.MessageReference; 030 import org.apache.activemq.broker.region.Subscription; 031 import org.apache.activemq.command.ActiveMQDestination; 032 import org.apache.activemq.command.ActiveMQMessage; 033 import org.apache.activemq.command.ActiveMQTopic; 034 import org.apache.activemq.command.Command; 035 import org.apache.activemq.command.ConnectionId; 036 import org.apache.activemq.command.ConnectionInfo; 037 import org.apache.activemq.command.ConsumerId; 038 import org.apache.activemq.command.ConsumerInfo; 039 import org.apache.activemq.command.DestinationInfo; 040 import org.apache.activemq.command.Message; 041 import org.apache.activemq.command.MessageId; 042 import org.apache.activemq.command.ProducerId; 043 import org.apache.activemq.command.ProducerInfo; 044 import org.apache.activemq.security.SecurityContext; 045 import org.apache.activemq.state.ProducerState; 046 import org.apache.activemq.usage.Usage; 047 import org.apache.activemq.util.IdGenerator; 048 import org.apache.activemq.util.LongSequenceGenerator; 049 import org.apache.commons.logging.Log; 050 import org.apache.commons.logging.LogFactory; 051 052 /** 053 * This broker filter handles tracking the state of the broker for purposes of 054 * publishing advisory messages to advisory consumers. 055 * 056 * @version $Revision$ 057 */ 058 public class AdvisoryBroker extends BrokerFilter { 059 060 private static final Log LOG = LogFactory.getLog(AdvisoryBroker.class); 061 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 062 063 protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>(); 064 protected final ConcurrentHashMap<ConsumerId, ConsumerInfo> consumers = new ConcurrentHashMap<ConsumerId, ConsumerInfo>(); 065 protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>(); 066 protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>(); 067 protected final ProducerId advisoryProducerId = new ProducerId(); 068 069 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 070 071 public AdvisoryBroker(Broker next) { 072 super(next); 073 advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); 074 } 075 076 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 077 super.addConnection(context, info); 078 079 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 080 //do not distribute usernames or passwords in advisory 081 ConnectionInfo copy = info.copy(); 082 copy.setUserName(""); 083 copy.setPassword(""); 084 fireAdvisory(context, topic, copy); 085 connections.put(copy.getConnectionId(), copy); 086 } 087 088 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 089 Subscription answer = super.addConsumer(context, info); 090 091 // Don't advise advisory topics. 092 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 093 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); 094 consumers.put(info.getConsumerId(), info); 095 fireConsumerAdvisory(context, info.getDestination(), topic, info); 096 } else { 097 // We need to replay all the previously collected state objects 098 // for this newly added consumer. 099 if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) { 100 // Replay the connections. 101 for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext();) { 102 ConnectionInfo value = iter.next(); 103 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 104 fireAdvisory(context, topic, value, info.getConsumerId()); 105 } 106 } 107 108 // We need to replay all the previously collected destination 109 // objects 110 // for this newly added consumer. 111 if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) { 112 // Replay the destinations. 113 for (Iterator<DestinationInfo> iter = destinations.values().iterator(); iter.hasNext();) { 114 DestinationInfo value = iter.next(); 115 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(value.getDestination()); 116 fireAdvisory(context, topic, value, info.getConsumerId()); 117 } 118 } 119 120 // Replay the producers. 121 if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) { 122 for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) { 123 ProducerInfo value = iter.next(); 124 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination()); 125 fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId()); 126 } 127 } 128 129 // Replay the consumers. 130 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { 131 for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext();) { 132 ConsumerInfo value = iter.next(); 133 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); 134 fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId()); 135 } 136 } 137 } 138 return answer; 139 } 140 141 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 142 super.addProducer(context, info); 143 144 // Don't advise advisory topics. 145 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 146 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); 147 fireProducerAdvisory(context, info.getDestination(), topic, info); 148 producers.put(info.getProducerId(), info); 149 } 150 } 151 152 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { 153 Destination answer = super.addDestination(context, destination); 154 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 155 DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); 156 DestinationInfo previous = destinations.putIfAbsent(destination, info); 157 if( previous==null ) { 158 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 159 fireAdvisory(context, topic, info); 160 } 161 } 162 return answer; 163 } 164 165 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 166 ActiveMQDestination destination = info.getDestination(); 167 next.addDestinationInfo(context, info); 168 169 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 170 DestinationInfo previous = destinations.putIfAbsent(destination, info); 171 if( previous==null ) { 172 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 173 fireAdvisory(context, topic, info); 174 } 175 } 176 } 177 178 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 179 super.removeDestination(context, destination, timeout); 180 DestinationInfo info = destinations.remove(destination); 181 if (info != null) { 182 info.setDestination(destination); 183 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 184 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 185 fireAdvisory(context, topic, info); 186 try { 187 next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1); 188 } catch (Exception expectedIfDestinationDidNotExistYet) { 189 } 190 try { 191 next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1); 192 } catch (Exception expectedIfDestinationDidNotExistYet) { 193 } 194 } 195 196 } 197 198 public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception { 199 super.removeDestinationInfo(context, destInfo); 200 DestinationInfo info = destinations.remove(destInfo.getDestination()); 201 if (info != null) { 202 info.setDestination(destInfo.getDestination()); 203 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 204 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination()); 205 fireAdvisory(context, topic, info); 206 try { 207 next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1); 208 } catch (Exception expectedIfDestinationDidNotExistYet) { 209 } 210 try { 211 next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1); 212 213 } catch (Exception expectedIfDestinationDidNotExistYet) { 214 } 215 } 216 217 } 218 219 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 220 super.removeConnection(context, info, error); 221 222 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 223 fireAdvisory(context, topic, info.createRemoveCommand()); 224 connections.remove(info.getConnectionId()); 225 } 226 227 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 228 super.removeConsumer(context, info); 229 230 // Don't advise advisory topics. 231 ActiveMQDestination dest = info.getDestination(); 232 if (!AdvisorySupport.isAdvisoryTopic(dest)) { 233 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); 234 consumers.remove(info.getConsumerId()); 235 if (!dest.isTemporary() || destinations.contains(dest)) { 236 fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand()); 237 } 238 } 239 } 240 241 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 242 super.removeProducer(context, info); 243 244 // Don't advise advisory topics. 245 ActiveMQDestination dest = info.getDestination(); 246 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) { 247 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest); 248 producers.remove(info.getProducerId()); 249 if (!dest.isTemporary() || destinations.contains(dest)) { 250 fireProducerAdvisory(context, dest,topic, info.createRemoveCommand()); 251 } 252 } 253 } 254 255 public void messageExpired(ConnectionContext context, MessageReference messageReference) { 256 super.messageExpired(context, messageReference); 257 try { 258 if(!messageReference.isAdvisory()) { 259 ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination()); 260 Message payload = messageReference.getMessage().copy(); 261 payload.clearBody(); 262 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 263 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 264 fireAdvisory(context, topic, payload, null, advisoryMessage); 265 } 266 } catch (Exception e) { 267 LOG.warn("Failed to fire message expired advisory"); 268 } 269 } 270 271 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 272 super.messageConsumed(context, messageReference); 273 try { 274 if(!messageReference.isAdvisory()) { 275 ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination()); 276 Message payload = messageReference.getMessage().copy(); 277 payload.clearBody(); 278 fireAdvisory(context, topic,payload); 279 } 280 } catch (Exception e) { 281 LOG.warn("Failed to fire message consumed advisory"); 282 } 283 } 284 285 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 286 super.messageDelivered(context, messageReference); 287 try { 288 if (!messageReference.isAdvisory()) { 289 ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination()); 290 Message payload = messageReference.getMessage().copy(); 291 payload.clearBody(); 292 fireAdvisory(context, topic,payload); 293 } 294 } catch (Exception e) { 295 LOG.warn("Failed to fire message delivered advisory"); 296 } 297 } 298 299 public void messageDiscarded(ConnectionContext context, MessageReference messageReference) { 300 super.messageDiscarded(context, messageReference); 301 try { 302 if (!messageReference.isAdvisory()) { 303 ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination()); 304 Message payload = messageReference.getMessage().copy(); 305 payload.clearBody(); 306 fireAdvisory(context, topic,payload); 307 } 308 } catch (Exception e) { 309 LOG.warn("Failed to fire message discarded advisory"); 310 } 311 } 312 313 public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { 314 super.slowConsumer(context, destination,subs); 315 try { 316 ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination()); 317 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 318 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString()); 319 fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage); 320 } catch (Exception e) { 321 LOG.warn("Failed to fire message slow consumer advisory"); 322 } 323 } 324 325 public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { 326 super.fastProducer(context, producerInfo); 327 try { 328 ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination()); 329 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 330 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString()); 331 fireAdvisory(context, topic, producerInfo, null, advisoryMessage); 332 } catch (Exception e) { 333 LOG.warn("Failed to fire message fast producer advisory"); 334 } 335 } 336 337 public void isFull(ConnectionContext context, Destination destination, Usage usage) { 338 super.isFull(context, destination, usage); 339 if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) { 340 try { 341 342 ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination()); 343 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 344 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_USAGE_NAME, usage.getName()); 345 fireAdvisory(context, topic, null, null, advisoryMessage); 346 347 } catch (Exception e) { 348 LOG.warn("Failed to fire message is full advisory"); 349 } 350 } 351 } 352 353 public void nowMasterBroker() { 354 super.nowMasterBroker(); 355 try { 356 ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic(); 357 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 358 ConnectionContext context = new ConnectionContext(); 359 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 360 context.setBroker(getBrokerService().getBroker()); 361 fireAdvisory(context, topic,null,null,advisoryMessage); 362 } catch (Exception e) { 363 LOG.warn("Failed to fire message master broker advisory"); 364 } 365 } 366 367 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { 368 fireAdvisory(context, topic, command, null); 369 } 370 371 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 372 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 373 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 374 } 375 376 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception { 377 fireConsumerAdvisory(context, consumerDestination,topic, command, null); 378 } 379 380 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 381 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 382 int count = 0; 383 Set<Destination>set = getDestinations(consumerDestination); 384 if (set != null) { 385 for (Destination dest:set) { 386 count += dest.getDestinationStatistics().getConsumers().getCount(); 387 } 388 } 389 advisoryMessage.setIntProperty("consumerCount", count); 390 391 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 392 } 393 394 protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception { 395 fireProducerAdvisory(context,producerDestination, topic, command, null); 396 } 397 398 protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 399 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 400 int count = 0; 401 if (producerDestination != null) { 402 Set<Destination> set = getDestinations(producerDestination); 403 if (set != null) { 404 for (Destination dest : set) { 405 count += dest.getDestinationStatistics().getProducers().getCount(); 406 } 407 } 408 } 409 advisoryMessage.setIntProperty("producerCount", count); 410 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 411 } 412 413 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception { 414 if (getBrokerService().isStarted()) { 415 //set properties 416 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); 417 String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; 418 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); 419 420 String[] uris = getBrokerService().getTransportConnectorURIs(); 421 String url = getBrokerService().getVmConnectorURI().toString(); 422 if (uris != null && uris.length > 0) { 423 url = uris[0]; 424 } 425 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); 426 427 //set the data structure 428 advisoryMessage.setDataStructure(command); 429 advisoryMessage.setPersistent(false); 430 advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 431 advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId())); 432 advisoryMessage.setTargetConsumerId(targetConsumerId); 433 advisoryMessage.setDestination(topic); 434 advisoryMessage.setResponseRequired(false); 435 advisoryMessage.setProducerId(advisoryProducerId); 436 boolean originalFlowControl = context.isProducerFlowControl(); 437 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 438 producerExchange.setConnectionContext(context); 439 producerExchange.setMutable(true); 440 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 441 try { 442 context.setProducerFlowControl(false); 443 next.send(producerExchange, advisoryMessage); 444 } finally { 445 context.setProducerFlowControl(originalFlowControl); 446 } 447 } 448 } 449 450 public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() { 451 return connections; 452 } 453 454 public Map<ConsumerId, ConsumerInfo> getAdvisoryConsumers() { 455 return consumers; 456 } 457 458 public Map<ProducerId, ProducerInfo> getAdvisoryProducers() { 459 return producers; 460 } 461 462 public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() { 463 return destinations; 464 } 465 }