001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.util.ArrayList; 022 import java.util.Collections; 023 import java.util.HashMap; 024 import java.util.Map; 025 import java.util.Set; 026 import java.util.concurrent.ConcurrentHashMap; 027 import java.util.concurrent.CopyOnWriteArrayList; 028 029 import javax.jms.InvalidClientIDException; 030 import javax.jms.JMSException; 031 import org.apache.activemq.broker.Broker; 032 import org.apache.activemq.broker.BrokerService; 033 import org.apache.activemq.broker.Connection; 034 import org.apache.activemq.broker.ConnectionContext; 035 import org.apache.activemq.broker.ConsumerBrokerExchange; 036 import org.apache.activemq.broker.EmptyBroker; 037 import org.apache.activemq.broker.ProducerBrokerExchange; 038 import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 039 import org.apache.activemq.broker.region.policy.PolicyMap; 040 import org.apache.activemq.command.ActiveMQDestination; 041 import org.apache.activemq.command.BrokerId; 042 import org.apache.activemq.command.BrokerInfo; 043 import org.apache.activemq.command.ConnectionId; 044 import org.apache.activemq.command.ConnectionInfo; 045 import org.apache.activemq.command.ConsumerInfo; 046 import org.apache.activemq.command.DestinationInfo; 047 import org.apache.activemq.command.Message; 048 import org.apache.activemq.command.MessageAck; 049 import org.apache.activemq.command.MessageDispatch; 050 import org.apache.activemq.command.MessageDispatchNotification; 051 import org.apache.activemq.command.MessagePull; 052 import org.apache.activemq.command.ProducerInfo; 053 import org.apache.activemq.command.RemoveSubscriptionInfo; 054 import org.apache.activemq.command.Response; 055 import org.apache.activemq.command.TransactionId; 056 import org.apache.activemq.kaha.Store; 057 import org.apache.activemq.state.ConnectionState; 058 import org.apache.activemq.thread.TaskRunnerFactory; 059 import org.apache.activemq.usage.SystemUsage; 060 import org.apache.activemq.util.BrokerSupport; 061 import org.apache.activemq.util.IdGenerator; 062 import org.apache.activemq.util.LongSequenceGenerator; 063 import org.apache.activemq.util.ServiceStopper; 064 import org.apache.commons.logging.Log; 065 import org.apache.commons.logging.LogFactory; 066 067 /** 068 * Routes Broker operations to the correct messaging regions for processing. 069 * 070 * @version $Revision$ 071 */ 072 public class RegionBroker extends EmptyBroker { 073 public static final String ORIGINAL_EXPIRATION = "originalExpiration"; 074 private static final Log LOG = LogFactory.getLog(RegionBroker.class); 075 private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); 076 077 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 078 protected DestinationFactory destinationFactory; 079 protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>()); 080 081 private final Region queueRegion; 082 private final Region topicRegion; 083 private final Region tempQueueRegion; 084 private final Region tempTopicRegion; 085 protected final BrokerService brokerService; 086 private boolean started; 087 private boolean keepDurableSubsActive; 088 089 private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>(); 090 private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 091 private final CopyOnWriteArrayList<BrokerInfo> brokerInfos = new CopyOnWriteArrayList<BrokerInfo>(); 092 093 private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 094 private BrokerId brokerId; 095 private String brokerName; 096 private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>(); 097 private final DestinationInterceptor destinationInterceptor; 098 private ConnectionContext adminConnectionContext; 099 100 public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory, 101 DestinationInterceptor destinationInterceptor) throws IOException { 102 this.brokerService = brokerService; 103 if (destinationFactory == null) { 104 throw new IllegalArgumentException("null destinationFactory"); 105 } 106 this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId()); 107 this.destinationFactory = destinationFactory; 108 queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 109 topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 110 this.destinationInterceptor = destinationInterceptor; 111 tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 112 tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 113 } 114 115 public Map<ActiveMQDestination, Destination> getDestinationMap() { 116 Map<ActiveMQDestination, Destination> answer = getQueueRegion().getDestinationMap(); 117 answer.putAll(getTopicRegion().getDestinationMap()); 118 return answer; 119 } 120 121 public Set <Destination> getDestinations(ActiveMQDestination destination) { 122 switch (destination.getDestinationType()) { 123 case ActiveMQDestination.QUEUE_TYPE: 124 return queueRegion.getDestinations(destination); 125 case ActiveMQDestination.TOPIC_TYPE: 126 return topicRegion.getDestinations(destination); 127 case ActiveMQDestination.TEMP_QUEUE_TYPE: 128 return tempQueueRegion.getDestinations(destination); 129 case ActiveMQDestination.TEMP_TOPIC_TYPE: 130 return tempTopicRegion.getDestinations(destination); 131 default: 132 return Collections.emptySet(); 133 } 134 } 135 136 public Broker getAdaptor(Class type) { 137 if (type.isInstance(this)) { 138 return this; 139 } 140 return null; 141 } 142 143 public Region getQueueRegion() { 144 return queueRegion; 145 } 146 147 public Region getTempQueueRegion() { 148 return tempQueueRegion; 149 } 150 151 public Region getTempTopicRegion() { 152 return tempTopicRegion; 153 } 154 155 public Region getTopicRegion() { 156 return topicRegion; 157 } 158 159 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 160 return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 161 } 162 163 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 164 return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 165 } 166 167 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 168 return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 169 } 170 171 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 172 return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 173 } 174 175 public void start() throws Exception { 176 ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive); 177 started = true; 178 queueRegion.start(); 179 topicRegion.start(); 180 tempQueueRegion.start(); 181 tempTopicRegion.start(); 182 } 183 184 public void stop() throws Exception { 185 started = false; 186 ServiceStopper ss = new ServiceStopper(); 187 doStop(ss); 188 ss.throwFirstException(); 189 // clear the state 190 clientIdSet.clear(); 191 connections.clear(); 192 destinations.clear(); 193 brokerInfos.clear(); 194 } 195 196 public PolicyMap getDestinationPolicy() { 197 return brokerService != null ? brokerService.getDestinationPolicy() : null; 198 } 199 200 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 201 String clientId = info.getClientId(); 202 if (clientId == null) { 203 throw new InvalidClientIDException("No clientID specified for connection request"); 204 } 205 synchronized (clientIdSet) { 206 ConnectionContext oldContext = clientIdSet.get(clientId); 207 if (oldContext != null) { 208 if (context.isFaultTolerant() || context.isNetworkConnection()){ 209 //remove the old connection 210 try{ 211 removeConnection(oldContext, info, new Exception("remove stale client")); 212 }catch(Exception e){ 213 LOG.warn("Failed to remove stale connection ",e); 214 } 215 }else{ 216 throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " 217 + oldContext.getConnection().getRemoteAddress()); 218 } 219 } else { 220 clientIdSet.put(clientId, context); 221 } 222 } 223 224 connections.add(context.getConnection()); 225 } 226 227 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 228 String clientId = info.getClientId(); 229 if (clientId == null) { 230 throw new InvalidClientIDException("No clientID specified for connection disconnect request"); 231 } 232 synchronized (clientIdSet) { 233 ConnectionContext oldValue = clientIdSet.get(clientId); 234 // we may be removing the duplicate connection, not the first 235 // connection to be created 236 // so lets check that their connection IDs are the same 237 if (oldValue == context) { 238 if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) { 239 clientIdSet.remove(clientId); 240 } 241 } 242 } 243 connections.remove(context.getConnection()); 244 } 245 246 protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) { 247 return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2)); 248 } 249 250 public Connection[] getClients() throws Exception { 251 ArrayList<Connection> l = new ArrayList<Connection>(connections); 252 Connection rc[] = new Connection[l.size()]; 253 l.toArray(rc); 254 return rc; 255 } 256 257 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { 258 259 Destination answer; 260 261 answer = destinations.get(destination); 262 if (answer != null) { 263 return answer; 264 } 265 266 switch (destination.getDestinationType()) { 267 case ActiveMQDestination.QUEUE_TYPE: 268 answer = queueRegion.addDestination(context, destination); 269 break; 270 case ActiveMQDestination.TOPIC_TYPE: 271 answer = topicRegion.addDestination(context, destination); 272 break; 273 case ActiveMQDestination.TEMP_QUEUE_TYPE: 274 answer = tempQueueRegion.addDestination(context, destination); 275 break; 276 case ActiveMQDestination.TEMP_TOPIC_TYPE: 277 answer = tempTopicRegion.addDestination(context, destination); 278 break; 279 default: 280 throw createUnknownDestinationTypeException(destination); 281 } 282 283 destinations.put(destination, answer); 284 return answer; 285 286 } 287 288 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 289 290 if (destinations.containsKey(destination)) { 291 switch (destination.getDestinationType()) { 292 case ActiveMQDestination.QUEUE_TYPE: 293 queueRegion.removeDestination(context, destination, timeout); 294 break; 295 case ActiveMQDestination.TOPIC_TYPE: 296 topicRegion.removeDestination(context, destination, timeout); 297 break; 298 case ActiveMQDestination.TEMP_QUEUE_TYPE: 299 tempQueueRegion.removeDestination(context, destination, timeout); 300 break; 301 case ActiveMQDestination.TEMP_TOPIC_TYPE: 302 tempTopicRegion.removeDestination(context, destination, timeout); 303 break; 304 default: 305 throw createUnknownDestinationTypeException(destination); 306 } 307 destinations.remove(destination); 308 } 309 310 } 311 312 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 313 addDestination(context, info.getDestination()); 314 315 } 316 317 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 318 removeDestination(context, info.getDestination(), info.getTimeout()); 319 320 } 321 322 public ActiveMQDestination[] getDestinations() throws Exception { 323 ArrayList<ActiveMQDestination> l; 324 325 l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet()); 326 327 ActiveMQDestination rc[] = new ActiveMQDestination[l.size()]; 328 l.toArray(rc); 329 return rc; 330 } 331 332 public void addProducer(ConnectionContext context, ProducerInfo info) 333 throws Exception { 334 ActiveMQDestination destination = info.getDestination(); 335 if (destination != null) { 336 337 // This seems to cause the destination to be added but without advisories firing... 338 context.getBroker().addDestination(context, destination); 339 switch (destination.getDestinationType()) { 340 case ActiveMQDestination.QUEUE_TYPE: 341 queueRegion.addProducer(context, info); 342 break; 343 case ActiveMQDestination.TOPIC_TYPE: 344 topicRegion.addProducer(context, info); 345 break; 346 case ActiveMQDestination.TEMP_QUEUE_TYPE: 347 tempQueueRegion.addProducer(context, info); 348 break; 349 case ActiveMQDestination.TEMP_TOPIC_TYPE: 350 tempTopicRegion.addProducer(context, info); 351 break; 352 } 353 } 354 } 355 356 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 357 ActiveMQDestination destination = info.getDestination(); 358 if (destination != null) { 359 switch (destination.getDestinationType()) { 360 case ActiveMQDestination.QUEUE_TYPE: 361 queueRegion.removeProducer(context, info); 362 break; 363 case ActiveMQDestination.TOPIC_TYPE: 364 topicRegion.removeProducer(context, info); 365 break; 366 case ActiveMQDestination.TEMP_QUEUE_TYPE: 367 tempQueueRegion.removeProducer(context, info); 368 break; 369 case ActiveMQDestination.TEMP_TOPIC_TYPE: 370 tempTopicRegion.removeProducer(context, info); 371 break; 372 } 373 } 374 } 375 376 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 377 ActiveMQDestination destination = info.getDestination(); 378 switch (destination.getDestinationType()) { 379 case ActiveMQDestination.QUEUE_TYPE: 380 return queueRegion.addConsumer(context, info); 381 382 case ActiveMQDestination.TOPIC_TYPE: 383 return topicRegion.addConsumer(context, info); 384 385 case ActiveMQDestination.TEMP_QUEUE_TYPE: 386 return tempQueueRegion.addConsumer(context, info); 387 388 case ActiveMQDestination.TEMP_TOPIC_TYPE: 389 return tempTopicRegion.addConsumer(context, info); 390 391 default: 392 throw createUnknownDestinationTypeException(destination); 393 } 394 } 395 396 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 397 ActiveMQDestination destination = info.getDestination(); 398 switch (destination.getDestinationType()) { 399 case ActiveMQDestination.QUEUE_TYPE: 400 queueRegion.removeConsumer(context, info); 401 break; 402 case ActiveMQDestination.TOPIC_TYPE: 403 topicRegion.removeConsumer(context, info); 404 break; 405 case ActiveMQDestination.TEMP_QUEUE_TYPE: 406 tempQueueRegion.removeConsumer(context, info); 407 break; 408 case ActiveMQDestination.TEMP_TOPIC_TYPE: 409 tempTopicRegion.removeConsumer(context, info); 410 break; 411 default: 412 throw createUnknownDestinationTypeException(destination); 413 } 414 } 415 416 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 417 topicRegion.removeSubscription(context, info); 418 } 419 420 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 421 message.setBrokerInTime(System.currentTimeMillis()); 422 if (producerExchange.isMutable() || producerExchange.getRegion() == null) { 423 ActiveMQDestination destination = message.getDestination(); 424 // ensure the destination is registered with the RegionBroker 425 producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination); 426 Region region; 427 switch (destination.getDestinationType()) { 428 case ActiveMQDestination.QUEUE_TYPE: 429 region = queueRegion; 430 break; 431 case ActiveMQDestination.TOPIC_TYPE: 432 region = topicRegion; 433 break; 434 case ActiveMQDestination.TEMP_QUEUE_TYPE: 435 region = tempQueueRegion; 436 break; 437 case ActiveMQDestination.TEMP_TOPIC_TYPE: 438 region = tempTopicRegion; 439 break; 440 default: 441 throw createUnknownDestinationTypeException(destination); 442 } 443 producerExchange.setRegion(region); 444 } 445 producerExchange.getRegion().send(producerExchange, message); 446 } 447 448 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 449 if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) { 450 ActiveMQDestination destination = ack.getDestination(); 451 Region region; 452 switch (destination.getDestinationType()) { 453 case ActiveMQDestination.QUEUE_TYPE: 454 region = queueRegion; 455 break; 456 case ActiveMQDestination.TOPIC_TYPE: 457 region = topicRegion; 458 break; 459 case ActiveMQDestination.TEMP_QUEUE_TYPE: 460 region = tempQueueRegion; 461 break; 462 case ActiveMQDestination.TEMP_TOPIC_TYPE: 463 region = tempTopicRegion; 464 break; 465 default: 466 throw createUnknownDestinationTypeException(destination); 467 } 468 consumerExchange.setRegion(region); 469 } 470 consumerExchange.getRegion().acknowledge(consumerExchange, ack); 471 } 472 473 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 474 ActiveMQDestination destination = pull.getDestination(); 475 switch (destination.getDestinationType()) { 476 case ActiveMQDestination.QUEUE_TYPE: 477 return queueRegion.messagePull(context, pull); 478 479 case ActiveMQDestination.TOPIC_TYPE: 480 return topicRegion.messagePull(context, pull); 481 482 case ActiveMQDestination.TEMP_QUEUE_TYPE: 483 return tempQueueRegion.messagePull(context, pull); 484 485 case ActiveMQDestination.TEMP_TOPIC_TYPE: 486 return tempTopicRegion.messagePull(context, pull); 487 default: 488 throw createUnknownDestinationTypeException(destination); 489 } 490 } 491 492 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 493 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 494 } 495 496 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 497 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 498 } 499 500 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 501 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 502 } 503 504 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 505 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 506 } 507 508 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 509 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 510 } 511 512 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 513 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 514 } 515 516 public void gc() { 517 queueRegion.gc(); 518 topicRegion.gc(); 519 } 520 521 public BrokerId getBrokerId() { 522 if (brokerId == null) { 523 brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId()); 524 } 525 return brokerId; 526 } 527 528 public void setBrokerId(BrokerId brokerId) { 529 this.brokerId = brokerId; 530 } 531 532 public String getBrokerName() { 533 if (brokerName == null) { 534 try { 535 brokerName = java.net.InetAddress.getLocalHost().getHostName().toLowerCase(); 536 } catch (Exception e) { 537 brokerName = "localhost"; 538 } 539 } 540 return brokerName; 541 } 542 543 public void setBrokerName(String brokerName) { 544 this.brokerName = brokerName; 545 } 546 547 public DestinationStatistics getDestinationStatistics() { 548 return destinationStatistics; 549 } 550 551 protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) { 552 return new JMSException("Unknown destination type: " + destination.getDestinationType()); 553 } 554 555 public synchronized void addBroker(Connection connection, BrokerInfo info) { 556 brokerInfos.add(info); 557 } 558 559 public synchronized void removeBroker(Connection connection, BrokerInfo info) { 560 if (info != null) { 561 brokerInfos.remove(info); 562 } 563 } 564 565 public synchronized BrokerInfo[] getPeerBrokerInfos() { 566 BrokerInfo[] result = new BrokerInfo[brokerInfos.size()]; 567 result = brokerInfos.toArray(result); 568 return result; 569 } 570 571 public void preProcessDispatch(MessageDispatch messageDispatch) { 572 Message message = messageDispatch.getMessage(); 573 if (message != null) { 574 long endTime = System.currentTimeMillis(); 575 message.setBrokerOutTime(endTime); 576 if (getBrokerService().isEnableStatistics()) { 577 long totalTime = endTime - message.getBrokerInTime(); 578 message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime); 579 } 580 } 581 } 582 583 public void postProcessDispatch(MessageDispatch messageDispatch) { 584 } 585 586 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 587 ActiveMQDestination destination = messageDispatchNotification.getDestination(); 588 switch (destination.getDestinationType()) { 589 case ActiveMQDestination.QUEUE_TYPE: 590 queueRegion.processDispatchNotification(messageDispatchNotification); 591 break; 592 case ActiveMQDestination.TOPIC_TYPE: 593 topicRegion.processDispatchNotification(messageDispatchNotification); 594 break; 595 case ActiveMQDestination.TEMP_QUEUE_TYPE: 596 tempQueueRegion.processDispatchNotification(messageDispatchNotification); 597 break; 598 case ActiveMQDestination.TEMP_TOPIC_TYPE: 599 tempTopicRegion.processDispatchNotification(messageDispatchNotification); 600 break; 601 default: 602 throw createUnknownDestinationTypeException(destination); 603 } 604 } 605 606 public boolean isSlaveBroker() { 607 return brokerService.isSlave(); 608 } 609 610 public boolean isStopped() { 611 return !started; 612 } 613 614 public Set<ActiveMQDestination> getDurableDestinations() { 615 return destinationFactory.getDestinations(); 616 } 617 618 protected void doStop(ServiceStopper ss) { 619 ss.stop(queueRegion); 620 ss.stop(topicRegion); 621 ss.stop(tempQueueRegion); 622 ss.stop(tempTopicRegion); 623 } 624 625 public boolean isKeepDurableSubsActive() { 626 return keepDurableSubsActive; 627 } 628 629 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 630 this.keepDurableSubsActive = keepDurableSubsActive; 631 } 632 633 public DestinationInterceptor getDestinationInterceptor() { 634 return destinationInterceptor; 635 } 636 637 public ConnectionContext getAdminConnectionContext() { 638 return adminConnectionContext; 639 } 640 641 public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { 642 this.adminConnectionContext = adminConnectionContext; 643 } 644 645 public Map<ConnectionId, ConnectionState> getConnectionStates() { 646 return connectionStates; 647 } 648 649 public Store getTempDataStore() { 650 return brokerService.getTempDataStore(); 651 } 652 653 public URI getVmConnectorURI() { 654 return brokerService.getVmConnectorURI(); 655 } 656 657 public void brokerServiceStarted() { 658 } 659 660 public BrokerService getBrokerService() { 661 return brokerService; 662 } 663 664 public boolean isExpired(MessageReference messageReference) { 665 boolean expired = false; 666 if (messageReference.isExpired()) { 667 try { 668 // prevent duplicate expiry processing 669 Message message = messageReference.getMessage(); 670 synchronized (message) { 671 expired = stampAsExpired(message); 672 } 673 } catch (IOException e) { 674 LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e); 675 } 676 } 677 return expired; 678 } 679 680 private boolean stampAsExpired(Message message) throws IOException { 681 boolean stamped=false; 682 if (message.getProperty(ORIGINAL_EXPIRATION) == null) { 683 long expiration=message.getExpiration(); 684 message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration)); 685 stamped = true; 686 } 687 return stamped; 688 } 689 690 691 public void messageExpired(ConnectionContext context, MessageReference node) { 692 if (LOG.isDebugEnabled()) { 693 LOG.debug("Message expired " + node); 694 } 695 getRoot().sendToDeadLetterQueue(context, node); 696 } 697 698 public void sendToDeadLetterQueue(ConnectionContext context, 699 MessageReference node){ 700 try{ 701 if(node!=null){ 702 Message message=node.getMessage(); 703 if(message!=null && node.getRegionDestination()!=null){ 704 DeadLetterStrategy deadLetterStrategy=node 705 .getRegionDestination().getDeadLetterStrategy(); 706 if(deadLetterStrategy!=null){ 707 if(deadLetterStrategy.isSendToDeadLetterQueue(message)){ 708 // message may be inflight to other subscriptions so do not modify 709 message = message.copy(); 710 stampAsExpired(message); 711 message.setExpiration(0); 712 if(!message.isPersistent()){ 713 message.setPersistent(true); 714 message.setProperty("originalDeliveryMode", 715 "NON_PERSISTENT"); 716 } 717 // The original destination and transaction id do 718 // not get filled when the message is first sent, 719 // it is only populated if the message is routed to 720 // another destination like the DLQ 721 ActiveMQDestination deadLetterDestination=deadLetterStrategy 722 .getDeadLetterQueueFor(message 723 .getDestination()); 724 if (context.getBroker()==null) { 725 context.setBroker(getRoot()); 726 } 727 BrokerSupport.resendNoCopy(context,message, 728 deadLetterDestination); 729 } 730 } else { 731 if (LOG.isDebugEnabled()) { 732 LOG.debug("Expired message with no DLQ strategy in place"); 733 } 734 } 735 } 736 } 737 }catch(Exception e){ 738 LOG.warn("Caught an exception sending to DLQ: "+node,e); 739 } 740 } 741 742 public Broker getRoot() { 743 try { 744 return getBrokerService().getBroker(); 745 } catch (Exception e) { 746 LOG.fatal("Trying to get Root Broker " + e); 747 throw new RuntimeException("The broker from the BrokerService should not throw an exception"); 748 } 749 } 750 751 /** 752 * @return the broker sequence id 753 */ 754 public long getBrokerSequenceId() { 755 synchronized(sequenceGenerator) { 756 return sequenceGenerator.getNextSequenceId(); 757 } 758 } 759 }