001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import java.io.IOException;
020    import java.util.Arrays;
021    
022    import javax.jms.JMSException;
023    
024    import org.apache.activemq.filter.BooleanExpression;
025    import org.apache.activemq.filter.MessageEvaluationContext;
026    import org.apache.activemq.util.JMSExceptionSupport;
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    
030    /**
031     * @openwire:marshaller code="91"
032     * @version $Revision: 1.12 $
033     */
034    public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
035    
036        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
037        static final Log LOG = LogFactory.getLog(NetworkBridgeFilter.class);
038    
039        private BrokerId networkBrokerId;
040        private int networkTTL;
041    
042        public NetworkBridgeFilter() {
043        }
044    
045        public NetworkBridgeFilter(BrokerId remoteBrokerPath, int networkTTL) {
046            this.networkBrokerId = remoteBrokerPath;
047            this.networkTTL = networkTTL;
048        }
049    
050        public byte getDataStructureType() {
051            return DATA_STRUCTURE_TYPE;
052        }
053    
054        public boolean isMarshallAware() {
055            return false;
056        }
057    
058        public boolean matches(MessageEvaluationContext mec) throws JMSException {
059            try {
060                // for Queues - the message can be acknowledged and dropped whilst
061                // still
062                // in the dispatch loop
063                // so need to get the reference to it
064                Message message = mec.getMessage();
065                return message != null && matchesForwardingFilter(message);
066            } catch (IOException e) {
067                throw JMSExceptionSupport.create(e);
068            }
069        }
070    
071        public Object evaluate(MessageEvaluationContext message) throws JMSException {
072            return matches(message) ? Boolean.TRUE : Boolean.FALSE;
073        }
074    
075        protected boolean matchesForwardingFilter(Message message) {
076    
077            if (contains(message.getBrokerPath(), networkBrokerId)) {
078                if (LOG.isTraceEnabled()) {
079                    LOG.trace("Message all ready routed once through this broker ("
080                            + networkBrokerId + "), path: "
081                            + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
082                }
083                return false;
084            }
085    
086            int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
087    
088            if (hops >= networkTTL) {
089                if (LOG.isTraceEnabled()) {
090                    LOG.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message);
091                }
092                return false;
093            }
094    
095            // Don't propagate advisory messages about network subscriptions
096            if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
097                ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
098                hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
099                if (hops >= networkTTL) {
100                    if (LOG.isTraceEnabled()) {
101                        LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
102                    }
103                    return false;
104                }
105            }
106            return true;
107        }
108    
109        public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
110            if (brokerPath != null && brokerId != null) {
111                for (int i = 0; i < brokerPath.length; i++) {
112                    if (brokerId.equals(brokerPath[i])) {
113                        return true;
114                    }
115                }
116            }
117            return false;
118        }
119    
120        /**
121         * @openwire:property version=1
122         */
123        public int getNetworkTTL() {
124            return networkTTL;
125        }
126    
127        public void setNetworkTTL(int networkTTL) {
128            this.networkTTL = networkTTL;
129        }
130    
131        /**
132         * @openwire:property version=1 cache=true
133         */
134        public BrokerId getNetworkBrokerId() {
135            return networkBrokerId;
136        }
137    
138        public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
139            this.networkBrokerId = remoteBrokerPath;
140        }
141    
142    }