001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.Collections;
021    import java.util.List;
022    import java.util.concurrent.CopyOnWriteArrayList;
023    import javax.jms.InvalidSelectorException;
024    import javax.jms.JMSException;
025    import javax.management.ObjectName;
026    import org.apache.activemq.broker.Broker;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.command.ActiveMQDestination;
029    import org.apache.activemq.command.ConsumerId;
030    import org.apache.activemq.command.ConsumerInfo;
031    import org.apache.activemq.filter.BooleanExpression;
032    import org.apache.activemq.filter.DestinationFilter;
033    import org.apache.activemq.filter.LogicExpression;
034    import org.apache.activemq.filter.MessageEvaluationContext;
035    import org.apache.activemq.filter.NoLocalExpression;
036    import org.apache.activemq.selector.SelectorParser;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    
040    public abstract class AbstractSubscription implements Subscription {
041    
042        private static final Log LOG = LogFactory.getLog(AbstractSubscription.class);
043        protected Broker broker;
044        protected ConnectionContext context;
045        protected ConsumerInfo info;
046        protected final DestinationFilter destinationFilter;
047        protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
048        private BooleanExpression selectorExpression;
049        private ObjectName objectName;
050        private int cursorMemoryHighWaterMark = 70;
051    
052    
053        public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
054            this.broker = broker;
055            this.context = context;
056            this.info = info;
057            this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
058            this.selectorExpression = parseSelector(info);
059        }
060    
061        private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
062            BooleanExpression rc = null;
063            if (info.getSelector() != null) {
064                rc = SelectorParser.parse(info.getSelector());
065            }
066            if (info.isNoLocal()) {
067                if (rc == null) {
068                    rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
069                } else {
070                    rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
071                }
072            }
073            if (info.getAdditionalPredicate() != null) {
074                if (rc == null) {
075                    rc = info.getAdditionalPredicate();
076                } else {
077                    rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
078                }
079            }
080            return rc;
081        }
082    
083        public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
084            ConsumerId targetConsumerId = node.getTargetConsumerId();
085            if (targetConsumerId != null) {
086                if (!targetConsumerId.equals(info.getConsumerId())) {
087                    return false;
088                }
089            }
090            try {
091                return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);
092            } catch (JMSException e) {
093                LOG.info("Selector failed to evaluate: " + e.getMessage(), e);
094                return false;
095            }
096        }
097    
098        public boolean matches(ActiveMQDestination destination) {
099            return destinationFilter.matches(destination);
100        }
101    
102        public void add(ConnectionContext context, Destination destination) throws Exception {
103            destinations.add(destination);
104        }
105    
106        public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
107            destinations.remove(destination);
108            return Collections.EMPTY_LIST;
109        }
110    
111        public ConsumerInfo getConsumerInfo() {
112            return info;
113        }
114    
115        public void gc() {
116        }
117    
118        public boolean isSlave() {
119            return broker.getBrokerService().isSlave();
120        }
121    
122        public ConnectionContext getContext() {
123            return context;
124        }
125    
126        public ConsumerInfo getInfo() {
127            return info;
128        }
129    
130        public BooleanExpression getSelectorExpression() {
131            return selectorExpression;
132        }
133    
134        public String getSelector() {
135            return info.getSelector();
136        }
137    
138        public void setSelector(String selector) throws InvalidSelectorException {
139            ConsumerInfo copy = info.copy();
140            copy.setSelector(selector);
141            BooleanExpression newSelector = parseSelector(copy);
142            // its valid so lets actually update it now
143            info.setSelector(selector);
144            this.selectorExpression = newSelector;
145        }
146    
147        public ObjectName getObjectName() {
148            return objectName;
149        }
150    
151        public void setObjectName(ObjectName objectName) {
152            this.objectName = objectName;
153        }
154    
155        public int getPrefetchSize() {
156            return info.getPrefetchSize();
157        }
158        public void setPrefetchSize(int newSize) {
159            info.setPrefetchSize(newSize);
160        }
161    
162        public boolean isRecoveryRequired() {
163            return true;
164        }
165    
166        public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
167            boolean result = false;
168            MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
169            try {
170                msgContext.setDestination(message.getRegionDestination().getActiveMQDestination());
171                msgContext.setMessageReference(message);
172                result = matches(message, msgContext);
173                if (result) {
174                    doAddRecoveredMessage(message);
175                }
176    
177            } finally {
178                msgContext.clear();
179            }
180            return result;
181        }
182    
183        public ActiveMQDestination getActiveMQDestination() {
184            return info != null ? info.getDestination() : null;
185        }
186        
187        public boolean isBrowser() {
188            return info != null && info.isBrowser();
189        }
190        
191        public int getInFlightUsage() {
192            if (info.getPrefetchSize() > 0) {
193            return (getInFlightSize() * 100)/info.getPrefetchSize();
194            }
195            return Integer.MAX_VALUE;
196        }
197        
198        /**
199         * Add a destination
200         * @param destination
201         */
202        public void addDestination(Destination destination) {
203            
204        }
205           
206        
207        /**
208         * Remove a destination
209         * @param destination
210         */
211        public void removeDestination(Destination destination) {
212            
213        }
214        
215        public int getCursorMemoryHighWaterMark(){
216            return this.cursorMemoryHighWaterMark;
217        }
218    
219            public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
220                    this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
221            }
222        
223        public int countBeforeFull() {
224            return getDispatchedQueueSize() - info.getPrefetchSize();
225        }
226    
227        protected void doAddRecoveredMessage(MessageReference message) throws Exception {
228            add(message);
229        }
230    }