001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.virtual;
018    
019    import org.apache.activemq.broker.BrokerService;
020    import org.apache.activemq.broker.BrokerServiceAware;
021    import org.apache.activemq.broker.ProducerBrokerExchange;
022    import org.apache.activemq.broker.region.Destination;
023    import org.apache.activemq.broker.region.DestinationFilter;
024    import org.apache.activemq.broker.region.DestinationInterceptor;
025    import org.apache.activemq.command.ActiveMQDestination;
026    import org.apache.activemq.command.ActiveMQTopic;
027    import org.apache.activemq.command.Message;
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    
031    /**
032     * Creates <a href="http://activemq.org/site/mirrored-queues.html">Mirrored
033     * Queue</a> using a prefix and postfix to define the topic name on which to mirror the queue to.
034     *
035     * @version $Revision: 650766 $
036     * @org.apache.xbean.XBean
037     */
038    public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware {
039        private static final transient Log LOG = LogFactory.getLog(MirroredQueue.class);
040        private String prefix = "VirtualTopic.Mirror.";
041        private String postfix = "";
042        private boolean copyMessage = true;
043        private BrokerService brokerService;
044    
045        public Destination intercept(final Destination destination) {
046            if (destination.getActiveMQDestination().isQueue()) {
047                if (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues()) {
048                    try {
049                        final Destination mirrorDestination = getMirrorDestination(destination);
050                        if (mirrorDestination != null) {
051                            return new DestinationFilter(destination) {
052                                public void send(ProducerBrokerExchange context, Message message) throws Exception {
053                                    message.setDestination(mirrorDestination.getActiveMQDestination());
054                                    mirrorDestination.send(context, message);
055        
056                                    if (isCopyMessage()) {
057                                        message = message.copy();
058                                    }
059                                    message.setDestination(destination.getActiveMQDestination());
060                                    super.send(context, message);
061                                }
062                            };
063                        }
064                    }
065                    catch (Exception e) {
066                        LOG.error("Failed to lookup the mirror destination for: " + destination + ". Reason: " + e, e);
067                    }
068                }
069            }
070            return destination;
071        }
072        
073    
074        public void remove(Destination destination) {
075            if (brokerService == null) {
076                throw new IllegalArgumentException("No brokerService injected!");
077            }
078            ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination());
079            if (topic != null) {
080                try {
081                    brokerService.removeDestination(topic);
082                } catch (Exception e) {
083                    LOG.error("Failed to remove mirror destination for " + destination + ". Reason: " + e,e);
084                }
085            }
086            
087        }
088    
089        // Properties
090        // -------------------------------------------------------------------------
091    
092        public String getPostfix() {
093            return postfix;
094        }
095    
096        /**
097         * Sets any postix used to identify the queue consumers
098         */
099        public void setPostfix(String postfix) {
100            this.postfix = postfix;
101        }
102    
103        public String getPrefix() {
104            return prefix;
105        }
106    
107        /**
108         * Sets the prefix wildcard used to identify the queue consumers for a given
109         * topic
110         */
111        public void setPrefix(String prefix) {
112            this.prefix = prefix;
113        }
114    
115        public boolean isCopyMessage() {
116            return copyMessage;
117        }
118    
119        /**
120         * Sets whether a copy of the message will be sent to each destination.
121         * Defaults to true so that the forward destination is set as the
122         * destination of the message
123         */
124        public void setCopyMessage(boolean copyMessage) {
125            this.copyMessage = copyMessage;
126        }
127    
128        public void setBrokerService(BrokerService brokerService) {
129            this.brokerService = brokerService;
130        }
131    
132        // Implementation methods
133        //-------------------------------------------------------------------------
134        protected Destination getMirrorDestination(Destination destination) throws Exception {
135            if (brokerService == null) {
136                throw new IllegalArgumentException("No brokerService injected!");
137            }
138            ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination());
139            return brokerService.getDestination(topic);
140        }
141    
142        protected ActiveMQDestination getMirrorTopic(ActiveMQDestination original) {
143            return new ActiveMQTopic(prefix + original.getPhysicalName() + postfix);
144        }
145    
146    }