001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.util.ArrayList;
022    import java.util.Collections;
023    import java.util.HashMap;
024    import java.util.Map;
025    import java.util.Set;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.CopyOnWriteArrayList;
028    
029    import javax.jms.InvalidClientIDException;
030    import javax.jms.JMSException;
031    import org.apache.activemq.broker.Broker;
032    import org.apache.activemq.broker.BrokerService;
033    import org.apache.activemq.broker.Connection;
034    import org.apache.activemq.broker.ConnectionContext;
035    import org.apache.activemq.broker.ConsumerBrokerExchange;
036    import org.apache.activemq.broker.EmptyBroker;
037    import org.apache.activemq.broker.ProducerBrokerExchange;
038    import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
039    import org.apache.activemq.broker.region.policy.PolicyMap;
040    import org.apache.activemq.command.ActiveMQDestination;
041    import org.apache.activemq.command.BrokerId;
042    import org.apache.activemq.command.BrokerInfo;
043    import org.apache.activemq.command.ConnectionId;
044    import org.apache.activemq.command.ConnectionInfo;
045    import org.apache.activemq.command.ConsumerInfo;
046    import org.apache.activemq.command.DestinationInfo;
047    import org.apache.activemq.command.Message;
048    import org.apache.activemq.command.MessageAck;
049    import org.apache.activemq.command.MessageDispatch;
050    import org.apache.activemq.command.MessageDispatchNotification;
051    import org.apache.activemq.command.MessagePull;
052    import org.apache.activemq.command.ProducerInfo;
053    import org.apache.activemq.command.RemoveSubscriptionInfo;
054    import org.apache.activemq.command.Response;
055    import org.apache.activemq.command.TransactionId;
056    import org.apache.activemq.kaha.Store;
057    import org.apache.activemq.state.ConnectionState;
058    import org.apache.activemq.thread.TaskRunnerFactory;
059    import org.apache.activemq.usage.SystemUsage;
060    import org.apache.activemq.util.BrokerSupport;
061    import org.apache.activemq.util.IdGenerator;
062    import org.apache.activemq.util.LongSequenceGenerator;
063    import org.apache.activemq.util.ServiceStopper;
064    import org.apache.commons.logging.Log;
065    import org.apache.commons.logging.LogFactory;
066    
067    /**
068     * Routes Broker operations to the correct messaging regions for processing.
069     * 
070     * @version $Revision$
071     */
072    public class RegionBroker extends EmptyBroker {
073        public static final String ORIGINAL_EXPIRATION = "originalExpiration";
074        private static final Log LOG = LogFactory.getLog(RegionBroker.class);
075        private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
076    
077        protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
078        protected DestinationFactory destinationFactory;
079        protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
080    
081        private final Region queueRegion;
082        private final Region topicRegion;
083        private final Region tempQueueRegion;
084        private final Region tempTopicRegion;
085        protected final BrokerService brokerService;
086        private boolean started;
087        private boolean keepDurableSubsActive;
088    
089        private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
090        private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
091        private final CopyOnWriteArrayList<BrokerInfo> brokerInfos = new CopyOnWriteArrayList<BrokerInfo>();
092    
093        private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
094        private BrokerId brokerId;
095        private String brokerName;
096        private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
097        private final DestinationInterceptor destinationInterceptor;
098        private ConnectionContext adminConnectionContext;
099    
100        public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
101                            DestinationInterceptor destinationInterceptor) throws IOException {
102            this.brokerService = brokerService;
103            if (destinationFactory == null) {
104                throw new IllegalArgumentException("null destinationFactory");
105            }
106            this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
107            this.destinationFactory = destinationFactory;
108            queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
109            topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
110            this.destinationInterceptor = destinationInterceptor;
111            tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
112            tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
113        }
114    
115        public Map<ActiveMQDestination, Destination> getDestinationMap() {
116            Map<ActiveMQDestination, Destination> answer = getQueueRegion().getDestinationMap();
117            answer.putAll(getTopicRegion().getDestinationMap());
118            return answer;
119        }
120    
121        public Set <Destination> getDestinations(ActiveMQDestination destination) {
122            switch (destination.getDestinationType()) {
123            case ActiveMQDestination.QUEUE_TYPE:
124                return queueRegion.getDestinations(destination);
125            case ActiveMQDestination.TOPIC_TYPE:
126                return topicRegion.getDestinations(destination);
127            case ActiveMQDestination.TEMP_QUEUE_TYPE:
128                return tempQueueRegion.getDestinations(destination);
129            case ActiveMQDestination.TEMP_TOPIC_TYPE:
130                return tempTopicRegion.getDestinations(destination);
131            default:
132                return Collections.emptySet();
133            }
134        }
135    
136        public Broker getAdaptor(Class type) {
137            if (type.isInstance(this)) {
138                return this;
139            }
140            return null;
141        }
142    
143        public Region getQueueRegion() {
144            return queueRegion;
145        }
146    
147        public Region getTempQueueRegion() {
148            return tempQueueRegion;
149        }
150    
151        public Region getTempTopicRegion() {
152            return tempTopicRegion;
153        }
154    
155        public Region getTopicRegion() {
156            return topicRegion;
157        }
158    
159        protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
160            return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
161        }
162    
163        protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
164            return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
165        }
166    
167        protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
168            return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
169        }
170    
171        protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
172            return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
173        }
174    
175        public void start() throws Exception {
176            ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
177            started = true;
178            queueRegion.start();
179            topicRegion.start();
180            tempQueueRegion.start();
181            tempTopicRegion.start();
182        }
183    
184        public void stop() throws Exception {
185            started = false;
186            ServiceStopper ss = new ServiceStopper();
187            doStop(ss);
188            ss.throwFirstException();
189            // clear the state
190            clientIdSet.clear();
191            connections.clear();
192            destinations.clear();
193            brokerInfos.clear();
194        }
195    
196        public PolicyMap getDestinationPolicy() {
197            return brokerService != null ? brokerService.getDestinationPolicy() : null;
198        }
199    
200        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
201            String clientId = info.getClientId();
202            if (clientId == null) {
203                throw new InvalidClientIDException("No clientID specified for connection request");
204            }
205            synchronized (clientIdSet) {
206                ConnectionContext oldContext = clientIdSet.get(clientId);
207                if (oldContext != null) {
208                    if (context.isFaultTolerant() || context.isNetworkConnection()){
209                            //remove the old connection
210                            try{
211                                    removeConnection(oldContext, info, new Exception("remove stale client"));
212                            }catch(Exception e){
213                                    LOG.warn("Failed to remove stale connection ",e);
214                            }
215                    }else{
216                    throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
217                                                       + oldContext.getConnection().getRemoteAddress());
218                    }
219                } else {
220                    clientIdSet.put(clientId, context);
221                }
222            }
223    
224            connections.add(context.getConnection());
225        }
226    
227        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
228            String clientId = info.getClientId();
229            if (clientId == null) {
230                throw new InvalidClientIDException("No clientID specified for connection disconnect request");
231            }
232            synchronized (clientIdSet) {
233                ConnectionContext oldValue = clientIdSet.get(clientId);
234                // we may be removing the duplicate connection, not the first
235                // connection to be created
236                // so lets check that their connection IDs are the same
237                if (oldValue == context) {
238                    if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
239                        clientIdSet.remove(clientId);
240                    }
241                }
242            }
243            connections.remove(context.getConnection());
244        }
245    
246        protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
247            return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
248        }
249    
250        public Connection[] getClients() throws Exception {
251            ArrayList<Connection> l = new ArrayList<Connection>(connections);
252            Connection rc[] = new Connection[l.size()];
253            l.toArray(rc);
254            return rc;
255        }
256    
257        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
258    
259            Destination answer;
260    
261            answer = destinations.get(destination);
262            if (answer != null) {
263                return answer;
264            }
265    
266            switch (destination.getDestinationType()) {
267            case ActiveMQDestination.QUEUE_TYPE:
268                answer = queueRegion.addDestination(context, destination);
269                break;
270            case ActiveMQDestination.TOPIC_TYPE:
271                answer = topicRegion.addDestination(context, destination);
272                break;
273            case ActiveMQDestination.TEMP_QUEUE_TYPE:
274                answer = tempQueueRegion.addDestination(context, destination);
275                break;
276            case ActiveMQDestination.TEMP_TOPIC_TYPE:
277                answer = tempTopicRegion.addDestination(context, destination);
278                break;
279            default:
280                throw createUnknownDestinationTypeException(destination);
281            }
282    
283            destinations.put(destination, answer);
284            return answer;
285    
286        }
287    
288        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
289    
290            if (destinations.containsKey(destination)) {
291                switch (destination.getDestinationType()) {
292                case ActiveMQDestination.QUEUE_TYPE:
293                    queueRegion.removeDestination(context, destination, timeout);
294                    break;
295                case ActiveMQDestination.TOPIC_TYPE:
296                    topicRegion.removeDestination(context, destination, timeout);
297                    break;
298                case ActiveMQDestination.TEMP_QUEUE_TYPE:
299                    tempQueueRegion.removeDestination(context, destination, timeout);
300                    break;
301                case ActiveMQDestination.TEMP_TOPIC_TYPE:
302                    tempTopicRegion.removeDestination(context, destination, timeout);
303                    break;
304                default:
305                    throw createUnknownDestinationTypeException(destination);
306                }
307                destinations.remove(destination);
308            }
309    
310        }
311    
312        public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
313            addDestination(context, info.getDestination());
314    
315        }
316    
317        public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
318            removeDestination(context, info.getDestination(), info.getTimeout());
319    
320        }
321    
322        public ActiveMQDestination[] getDestinations() throws Exception {
323            ArrayList<ActiveMQDestination> l;
324    
325            l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet());
326    
327            ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
328            l.toArray(rc);
329            return rc;
330        }
331    
332        public void addProducer(ConnectionContext context, ProducerInfo info)
333                throws Exception {
334            ActiveMQDestination destination = info.getDestination();
335            if (destination != null) {
336    
337                // This seems to cause the destination to be added but without advisories firing...
338                context.getBroker().addDestination(context, destination);
339                switch (destination.getDestinationType()) {
340                case ActiveMQDestination.QUEUE_TYPE:
341                    queueRegion.addProducer(context, info);
342                    break;
343                case ActiveMQDestination.TOPIC_TYPE:
344                    topicRegion.addProducer(context, info);
345                    break;
346                case ActiveMQDestination.TEMP_QUEUE_TYPE:
347                    tempQueueRegion.addProducer(context, info);
348                    break;
349                case ActiveMQDestination.TEMP_TOPIC_TYPE:
350                    tempTopicRegion.addProducer(context, info);
351                    break;
352                }
353            }
354        }
355    
356        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
357            ActiveMQDestination destination = info.getDestination();
358            if (destination != null) {
359                switch (destination.getDestinationType()) {
360                case ActiveMQDestination.QUEUE_TYPE:
361                    queueRegion.removeProducer(context, info);
362                    break;
363                case ActiveMQDestination.TOPIC_TYPE:
364                    topicRegion.removeProducer(context, info);
365                    break;
366                case ActiveMQDestination.TEMP_QUEUE_TYPE:
367                    tempQueueRegion.removeProducer(context, info);
368                    break;
369                case ActiveMQDestination.TEMP_TOPIC_TYPE:
370                    tempTopicRegion.removeProducer(context, info);
371                    break;
372                }
373            }
374        }
375    
376        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
377            ActiveMQDestination destination = info.getDestination();
378            switch (destination.getDestinationType()) {
379            case ActiveMQDestination.QUEUE_TYPE:
380                return queueRegion.addConsumer(context, info);
381    
382            case ActiveMQDestination.TOPIC_TYPE:
383                return topicRegion.addConsumer(context, info);
384    
385            case ActiveMQDestination.TEMP_QUEUE_TYPE:
386                return tempQueueRegion.addConsumer(context, info);
387    
388            case ActiveMQDestination.TEMP_TOPIC_TYPE:
389                return tempTopicRegion.addConsumer(context, info);
390    
391            default:
392                throw createUnknownDestinationTypeException(destination);
393            }
394        }
395    
396        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
397            ActiveMQDestination destination = info.getDestination();
398            switch (destination.getDestinationType()) {
399            case ActiveMQDestination.QUEUE_TYPE:
400                queueRegion.removeConsumer(context, info);
401                break;
402            case ActiveMQDestination.TOPIC_TYPE:
403                topicRegion.removeConsumer(context, info);
404                break;
405            case ActiveMQDestination.TEMP_QUEUE_TYPE:
406                tempQueueRegion.removeConsumer(context, info);
407                break;
408            case ActiveMQDestination.TEMP_TOPIC_TYPE:
409                tempTopicRegion.removeConsumer(context, info);
410                break;
411            default:
412                throw createUnknownDestinationTypeException(destination);
413            }
414        }
415    
416        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
417            topicRegion.removeSubscription(context, info);
418        }
419    
420        public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
421            message.setBrokerInTime(System.currentTimeMillis());
422            if (producerExchange.isMutable() || producerExchange.getRegion() == null) {
423                ActiveMQDestination destination = message.getDestination();
424                // ensure the destination is registered with the RegionBroker
425                producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination);
426                Region region;
427                switch (destination.getDestinationType()) {
428                case ActiveMQDestination.QUEUE_TYPE:
429                    region = queueRegion;
430                    break;
431                case ActiveMQDestination.TOPIC_TYPE:
432                    region = topicRegion;
433                    break;
434                case ActiveMQDestination.TEMP_QUEUE_TYPE:
435                    region = tempQueueRegion;
436                    break;
437                case ActiveMQDestination.TEMP_TOPIC_TYPE:
438                    region = tempTopicRegion;
439                    break;
440                default:
441                    throw createUnknownDestinationTypeException(destination);
442                }
443                producerExchange.setRegion(region);
444            }
445            producerExchange.getRegion().send(producerExchange, message);
446        }
447    
448        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
449            if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
450                ActiveMQDestination destination = ack.getDestination();
451                Region region;
452                switch (destination.getDestinationType()) {
453                case ActiveMQDestination.QUEUE_TYPE:
454                    region = queueRegion;
455                    break;
456                case ActiveMQDestination.TOPIC_TYPE:
457                    region = topicRegion;
458                    break;
459                case ActiveMQDestination.TEMP_QUEUE_TYPE:
460                    region = tempQueueRegion;
461                    break;
462                case ActiveMQDestination.TEMP_TOPIC_TYPE:
463                    region = tempTopicRegion;
464                    break;
465                default:
466                    throw createUnknownDestinationTypeException(destination);
467                }
468                consumerExchange.setRegion(region);
469            }
470            consumerExchange.getRegion().acknowledge(consumerExchange, ack);
471        }
472    
473        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
474            ActiveMQDestination destination = pull.getDestination();
475            switch (destination.getDestinationType()) {
476            case ActiveMQDestination.QUEUE_TYPE:
477                return queueRegion.messagePull(context, pull);
478    
479            case ActiveMQDestination.TOPIC_TYPE:
480                return topicRegion.messagePull(context, pull);
481    
482            case ActiveMQDestination.TEMP_QUEUE_TYPE:
483                return tempQueueRegion.messagePull(context, pull);
484    
485            case ActiveMQDestination.TEMP_TOPIC_TYPE:
486                return tempTopicRegion.messagePull(context, pull);
487            default:
488                throw createUnknownDestinationTypeException(destination);
489            }
490        }
491    
492        public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
493            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
494        }
495    
496        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
497            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
498        }
499    
500        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
501            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
502        }
503    
504        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
505            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
506        }
507    
508        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
509            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
510        }
511    
512        public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
513            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
514        }
515    
516        public void gc() {
517            queueRegion.gc();
518            topicRegion.gc();
519        }
520    
521        public BrokerId getBrokerId() {
522            if (brokerId == null) {
523                brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
524            }
525            return brokerId;
526        }
527    
528        public void setBrokerId(BrokerId brokerId) {
529            this.brokerId = brokerId;
530        }
531    
532        public String getBrokerName() {
533            if (brokerName == null) {
534                try {
535                    brokerName = java.net.InetAddress.getLocalHost().getHostName().toLowerCase();
536                } catch (Exception e) {
537                    brokerName = "localhost";
538                }
539            }
540            return brokerName;
541        }
542    
543        public void setBrokerName(String brokerName) {
544            this.brokerName = brokerName;
545        }
546    
547        public DestinationStatistics getDestinationStatistics() {
548            return destinationStatistics;
549        }
550    
551        protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
552            return new JMSException("Unknown destination type: " + destination.getDestinationType());
553        }
554    
555        public synchronized void addBroker(Connection connection, BrokerInfo info) {
556            brokerInfos.add(info);
557        }
558    
559        public synchronized void removeBroker(Connection connection, BrokerInfo info) {
560            if (info != null) {
561                brokerInfos.remove(info);
562            }
563        }
564    
565        public synchronized BrokerInfo[] getPeerBrokerInfos() {
566            BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
567            result = brokerInfos.toArray(result);
568            return result;
569        }
570    
571        public void preProcessDispatch(MessageDispatch messageDispatch) {
572            Message message = messageDispatch.getMessage();
573            if (message != null) {
574                long endTime = System.currentTimeMillis();
575                message.setBrokerOutTime(endTime);
576                if (getBrokerService().isEnableStatistics()) {
577                    long totalTime = endTime - message.getBrokerInTime();
578                    message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime);
579                }
580            }
581        }
582    
583        public void postProcessDispatch(MessageDispatch messageDispatch) {
584        }
585    
586        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
587            ActiveMQDestination destination = messageDispatchNotification.getDestination();
588            switch (destination.getDestinationType()) {
589            case ActiveMQDestination.QUEUE_TYPE:
590                queueRegion.processDispatchNotification(messageDispatchNotification);
591                break;
592            case ActiveMQDestination.TOPIC_TYPE:
593                topicRegion.processDispatchNotification(messageDispatchNotification);
594                break;
595            case ActiveMQDestination.TEMP_QUEUE_TYPE:
596                tempQueueRegion.processDispatchNotification(messageDispatchNotification);
597                break;
598            case ActiveMQDestination.TEMP_TOPIC_TYPE:
599                tempTopicRegion.processDispatchNotification(messageDispatchNotification);
600                break;
601            default:
602                throw createUnknownDestinationTypeException(destination);
603            }
604        }
605    
606        public boolean isSlaveBroker() {
607            return brokerService.isSlave();
608        }
609    
610        public boolean isStopped() {
611            return !started;
612        }
613    
614        public Set<ActiveMQDestination> getDurableDestinations() {
615            return destinationFactory.getDestinations();
616        }
617    
618        protected void doStop(ServiceStopper ss) {
619            ss.stop(queueRegion);
620            ss.stop(topicRegion);
621            ss.stop(tempQueueRegion);
622            ss.stop(tempTopicRegion);
623        }
624    
625        public boolean isKeepDurableSubsActive() {
626            return keepDurableSubsActive;
627        }
628    
629        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
630            this.keepDurableSubsActive = keepDurableSubsActive;
631        }
632    
633        public DestinationInterceptor getDestinationInterceptor() {
634            return destinationInterceptor;
635        }
636    
637        public ConnectionContext getAdminConnectionContext() {
638            return adminConnectionContext;
639        }
640    
641        public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
642            this.adminConnectionContext = adminConnectionContext;
643        }
644    
645        public Map<ConnectionId, ConnectionState> getConnectionStates() {
646            return connectionStates;
647        }
648    
649        public Store getTempDataStore() {
650            return brokerService.getTempDataStore();
651        }
652    
653        public URI getVmConnectorURI() {
654            return brokerService.getVmConnectorURI();
655        }
656    
657        public void brokerServiceStarted() {
658        }
659    
660        public BrokerService getBrokerService() {
661            return brokerService;
662        }
663    
664        public boolean isExpired(MessageReference messageReference) {
665            boolean expired = false;
666            if (messageReference.isExpired()) {
667                try {
668                    // prevent duplicate expiry processing
669                    Message message = messageReference.getMessage();
670                    synchronized (message) {
671                        expired = stampAsExpired(message);
672                    }
673                } catch (IOException e) {
674                    LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e);
675                }
676            }
677            return expired;
678        }
679       
680        private boolean stampAsExpired(Message message) throws IOException {
681            boolean stamped=false;
682            if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
683                long expiration=message.getExpiration();     
684                message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
685                stamped = true;
686            }
687            return stamped;
688        }
689    
690        
691        public void messageExpired(ConnectionContext context, MessageReference node) {
692            if (LOG.isDebugEnabled()) {
693                LOG.debug("Message expired " + node);
694            }
695            getRoot().sendToDeadLetterQueue(context, node);
696        }
697        
698        public void sendToDeadLetterQueue(ConnectionContext context,
699                    MessageReference node){
700                    try{
701                            if(node!=null){
702                                    Message message=node.getMessage();
703                                    if(message!=null && node.getRegionDestination()!=null){
704                                            DeadLetterStrategy deadLetterStrategy=node
705                                                    .getRegionDestination().getDeadLetterStrategy();
706                                            if(deadLetterStrategy!=null){
707                                                    if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
708                                                        // message may be inflight to other subscriptions so do not modify
709                                                        message = message.copy();
710                                                        stampAsExpired(message);
711                                                        message.setExpiration(0);
712                                                        if(!message.isPersistent()){
713                                                                message.setPersistent(true);
714                                                                message.setProperty("originalDeliveryMode",
715                                                                            "NON_PERSISTENT");
716                                                            }
717                                                            // The original destination and transaction id do
718                                                            // not get filled when the message is first sent,
719                                                            // it is only populated if the message is routed to
720                                                            // another destination like the DLQ
721                                                            ActiveMQDestination deadLetterDestination=deadLetterStrategy
722                                                                    .getDeadLetterQueueFor(message
723                                                                            .getDestination());
724                                                            if (context.getBroker()==null) {
725                                                                    context.setBroker(getRoot());
726                                                            }
727                                                            BrokerSupport.resendNoCopy(context,message,
728                                                                    deadLetterDestination);
729                                                    }
730                                            } else {
731                                                if (LOG.isDebugEnabled()) {
732                                                    LOG.debug("Expired message with no DLQ strategy in place");
733                                                }
734                                            }
735                                    }
736                            }
737                    }catch(Exception e){
738                            LOG.warn("Caught an exception sending to DLQ: "+node,e);
739                    }
740            }
741    
742        public Broker getRoot() {
743            try {
744                return getBrokerService().getBroker();
745            } catch (Exception e) {
746                LOG.fatal("Trying to get Root Broker " + e);
747                throw new RuntimeException("The broker from the BrokerService should not throw an exception");
748            }
749        }
750        
751        /**
752         * @return the broker sequence id
753         */
754        public long getBrokerSequenceId() {
755            synchronized(sequenceGenerator) {
756                return sequenceGenerator.getNextSequenceId();
757            }
758        }
759    }