001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    
022    import org.apache.activemq.filter.BooleanExpression;
023    import org.apache.activemq.state.CommandVisitor;
024    
025    /**
026     * @openwire:marshaller code="5"
027     * @version $Revision: 1.20 $
028     */
029    public class ConsumerInfo extends BaseCommand {
030    
031        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO;
032    
033        public static final byte HIGH_PRIORITY = 10;
034        public static final byte NORMAL_PRIORITY = 0;
035        public static final byte NETWORK_CONSUMER_PRIORITY = -5;
036        public static final byte LOW_PRIORITY = -10;
037    
038        protected ConsumerId consumerId;
039        protected ActiveMQDestination destination;
040        protected int prefetchSize;
041        protected int maximumPendingMessageLimit;
042        protected boolean browser;
043        protected boolean dispatchAsync;
044        protected String selector;
045        protected String subscriptionName;
046        protected boolean noLocal;
047        protected boolean exclusive;
048        protected boolean retroactive;
049        protected byte priority;
050        protected BrokerId[] brokerPath;
051        protected boolean optimizedAcknowledge;
052        // used by the broker
053        protected transient int currentPrefetchSize;
054        // if true, the consumer will not send range
055        protected boolean noRangeAcks;
056        // acks.
057    
058        protected BooleanExpression additionalPredicate;
059        protected transient boolean networkSubscription; // this subscription
060        protected transient List<ConsumerId> networkConsumerIds; // the original consumerId
061    
062        // not marshalled, populated from RemoveInfo, the last message delivered, used
063        // to suppress redelivery on prefetched messages after close
064        private transient long lastDeliveredSequenceId;
065    
066        // originated from a
067        // network connection
068    
069        public ConsumerInfo() {
070        }
071    
072        public ConsumerInfo(ConsumerId consumerId) {
073            this.consumerId = consumerId;
074        }
075    
076        public ConsumerInfo(SessionInfo sessionInfo, long consumerId) {
077            this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId);
078        }
079    
080        public ConsumerInfo copy() {
081            ConsumerInfo info = new ConsumerInfo();
082            copy(info);
083            return info;
084        }
085    
086        public void copy(ConsumerInfo info) {
087            super.copy(info);
088            info.consumerId = consumerId;
089            info.destination = destination;
090            info.prefetchSize = prefetchSize;
091            info.maximumPendingMessageLimit = maximumPendingMessageLimit;
092            info.browser = browser;
093            info.dispatchAsync = dispatchAsync;
094            info.selector = selector;
095            info.subscriptionName = subscriptionName;
096            info.noLocal = noLocal;
097            info.exclusive = exclusive;
098            info.retroactive = retroactive;
099            info.priority = priority;
100            info.brokerPath = brokerPath;
101            info.networkSubscription = networkSubscription;
102            if (networkConsumerIds != null) {
103                if (info.networkConsumerIds==null){
104                    info.networkConsumerIds=new ArrayList<ConsumerId>();
105                }
106                info.networkConsumerIds.addAll(networkConsumerIds);
107            }
108        }
109    
110        public boolean isDurable() {
111            return subscriptionName != null;
112        }
113    
114        public byte getDataStructureType() {
115            return DATA_STRUCTURE_TYPE;
116        }
117    
118        /**
119         * Is used to uniquely identify the consumer to the broker.
120         * 
121         * @openwire:property version=1 cache=true
122         */
123        public ConsumerId getConsumerId() {
124            return consumerId;
125        }
126    
127        public void setConsumerId(ConsumerId consumerId) {
128            this.consumerId = consumerId;
129        }
130    
131        /**
132         * Is this consumer a queue browser?
133         * 
134         * @openwire:property version=1
135         */
136        public boolean isBrowser() {
137            return browser;
138        }
139    
140        public void setBrowser(boolean browser) {
141            this.browser = browser;
142        }
143    
144        /**
145         * The destination that the consumer is interested in receiving messages
146         * from. This destination could be a composite destination.
147         * 
148         * @openwire:property version=1 cache=true
149         */
150        public ActiveMQDestination getDestination() {
151            return destination;
152        }
153    
154        public void setDestination(ActiveMQDestination destination) {
155            this.destination = destination;
156        }
157    
158        /**
159         * How many messages a broker will send to the client without receiving an
160         * ack before he stops dispatching messages to the client.
161         * 
162         * @openwire:property version=1
163         */
164        public int getPrefetchSize() {
165            return prefetchSize;
166        }
167    
168        public void setPrefetchSize(int prefetchSize) {
169            this.prefetchSize = prefetchSize;
170            this.currentPrefetchSize = prefetchSize;
171        }
172    
173        /**
174         * How many messages a broker will keep around, above the prefetch limit,
175         * for non-durable topics before starting to discard older messages.
176         * 
177         * @openwire:property version=1
178         */
179        public int getMaximumPendingMessageLimit() {
180            return maximumPendingMessageLimit;
181        }
182    
183        public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
184            this.maximumPendingMessageLimit = maximumPendingMessageLimit;
185        }
186    
187        /**
188         * Should the broker dispatch a message to the consumer async? If he does it
189         * async, then he uses a more SEDA style of processing while if it is not
190         * done async, then he broker use a STP style of processing. STP is more
191         * appropriate in high bandwidth situations or when being used by and in vm
192         * transport.
193         * 
194         * @openwire:property version=1
195         */
196        public boolean isDispatchAsync() {
197            return dispatchAsync;
198        }
199    
200        public void setDispatchAsync(boolean dispatchAsync) {
201            this.dispatchAsync = dispatchAsync;
202        }
203    
204        /**
205         * The JMS selector used to filter out messages that this consumer is
206         * interested in.
207         * 
208         * @openwire:property version=1
209         */
210        public String getSelector() {
211            return selector;
212        }
213    
214        public void setSelector(String selector) {
215            this.selector = selector;
216        }
217    
218        /**
219         * Used to identify the name of a durable subscription.
220         * 
221         * @openwire:property version=1
222         */
223        public String getSubscriptionName() {
224            return subscriptionName;
225        }
226    
227        public void setSubscriptionName(String durableSubscriptionId) {
228            this.subscriptionName = durableSubscriptionId;
229        }
230    
231        /**
232         * @deprecated
233         * @return
234         * @see getSubscriptionName
235         */
236        public String getSubcriptionName() {
237            return subscriptionName;
238        }
239    
240        /**
241         * @deprecated
242         * @see setSubscriptionName
243         * @param durableSubscriptionId
244         */
245        public void setSubcriptionName(String durableSubscriptionId) {
246            this.subscriptionName = durableSubscriptionId;
247        }
248    
249        /**
250         * Set noLocal to true to avoid receiving messages that were published
251         * locally on the same connection.
252         * 
253         * @openwire:property version=1
254         */
255        public boolean isNoLocal() {
256            return noLocal;
257        }
258    
259        public void setNoLocal(boolean noLocal) {
260            this.noLocal = noLocal;
261        }
262    
263        /**
264         * An exclusive consumer locks out other consumers from being able to
265         * receive messages from the destination. If there are multiple exclusive
266         * consumers for a destination, the first one created will be the exclusive
267         * consumer of the destination.
268         * 
269         * @openwire:property version=1
270         */
271        public boolean isExclusive() {
272            return exclusive;
273        }
274    
275        public void setExclusive(boolean exclusive) {
276            this.exclusive = exclusive;
277        }
278    
279        /**
280         * A retroactive consumer only has meaning for Topics. It allows a consumer
281         * to retroactively see messages sent prior to the consumer being created.
282         * If the consumer is not durable, it will be delivered the last message
283         * published to the topic. If the consumer is durable then it will receive
284         * all persistent messages that are still stored in persistent storage for
285         * that topic.
286         * 
287         * @openwire:property version=1
288         */
289        public boolean isRetroactive() {
290            return retroactive;
291        }
292    
293        public void setRetroactive(boolean retroactive) {
294            this.retroactive = retroactive;
295        }
296    
297        public RemoveInfo createRemoveCommand() {
298            RemoveInfo command = new RemoveInfo(getConsumerId());
299            command.setResponseRequired(isResponseRequired());
300            return command;
301        }
302    
303        /**
304         * The broker will avoid dispatching to a lower priority consumer if there
305         * are other higher priority consumers available to dispatch to. This allows
306         * letting the broker to have an affinity to higher priority consumers.
307         * Default priority is 0.
308         * 
309         * @openwire:property version=1
310         */
311        public byte getPriority() {
312            return priority;
313        }
314    
315        public void setPriority(byte priority) {
316            this.priority = priority;
317        }
318    
319        /**
320         * The route of brokers the command has moved through.
321         * 
322         * @openwire:property version=1 cache=true
323         */
324        public BrokerId[] getBrokerPath() {
325            return brokerPath;
326        }
327    
328        public void setBrokerPath(BrokerId[] brokerPath) {
329            this.brokerPath = brokerPath;
330        }
331    
332        /**
333         * A transient additional predicate that can be used it inject additional
334         * predicates into the selector on the fly. Handy if if say a Security
335         * Broker interceptor wants to filter out messages based on security level
336         * of the consumer.
337         * 
338         * @openwire:property version=1
339         */
340        public BooleanExpression getAdditionalPredicate() {
341            return additionalPredicate;
342        }
343    
344        public void setAdditionalPredicate(BooleanExpression additionalPredicate) {
345            this.additionalPredicate = additionalPredicate;
346        }
347    
348        public Response visit(CommandVisitor visitor) throws Exception {
349            return visitor.processAddConsumer(this);
350        }
351    
352        /**
353         * @openwire:property version=1
354         * @return Returns the networkSubscription.
355         */
356        public boolean isNetworkSubscription() {
357            return networkSubscription;
358        }
359    
360        /**
361         * @param networkSubscription The networkSubscription to set.
362         */
363        public void setNetworkSubscription(boolean networkSubscription) {
364            this.networkSubscription = networkSubscription;
365        }
366    
367        /**
368         * @openwire:property version=1
369         * @return Returns the optimizedAcknowledge.
370         */
371        public boolean isOptimizedAcknowledge() {
372            return optimizedAcknowledge;
373        }
374    
375        /**
376         * @param optimizedAcknowledge The optimizedAcknowledge to set.
377         */
378        public void setOptimizedAcknowledge(boolean optimizedAcknowledge) {
379            this.optimizedAcknowledge = optimizedAcknowledge;
380        }
381    
382        /**
383         * @return Returns the currentPrefetchSize.
384         */
385        public int getCurrentPrefetchSize() {
386            return currentPrefetchSize;
387        }
388    
389        /**
390         * @param currentPrefetchSize The currentPrefetchSize to set.
391         */
392        public void setCurrentPrefetchSize(int currentPrefetchSize) {
393            this.currentPrefetchSize = currentPrefetchSize;
394        }
395    
396        /**
397         * The broker may be able to optimize it's processing or provides better QOS
398         * if it knows the consumer will not be sending ranged acks.
399         * 
400         * @return true if the consumer will not send range acks.
401         * @openwire:property version=1
402         */
403        public boolean isNoRangeAcks() {
404            return noRangeAcks;
405        }
406    
407        public void setNoRangeAcks(boolean noRangeAcks) {
408            this.noRangeAcks = noRangeAcks;
409        }
410    
411        public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) {
412            if (networkConsumerIds == null) {
413                networkConsumerIds = new ArrayList<ConsumerId>();
414            }
415            networkConsumerIds.add(networkConsumerId);
416        }
417    
418        public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) {
419            if (networkConsumerIds != null) {
420                networkConsumerIds.remove(networkConsumerId);
421                if (networkConsumerIds.isEmpty()) {
422                    networkConsumerIds=null;
423                }
424            }
425        }
426        
427        public synchronized boolean isNetworkConsumersEmpty() {
428            return networkConsumerIds == null || networkConsumerIds.isEmpty();
429        }
430        
431        public synchronized List<ConsumerId> getNetworkConsumerIds(){
432            List<ConsumerId> result = new ArrayList<ConsumerId>();
433            if (networkConsumerIds != null) {
434                result.addAll(networkConsumerIds);
435            }
436            return result;
437        }
438    
439        /**
440         * Tracks the original subscription id that causes a subscription to 
441         * percolate through a network when networkTTL > 1. Tracking the original
442         * subscription allows duplicate suppression.
443         * 
444         * @return array of the current subscription path
445         * @openwire:property version=4
446         */
447        public ConsumerId[] getNetworkConsumerPath() {
448            ConsumerId[] result = null;
449            if (networkConsumerIds != null) {
450                result = networkConsumerIds.toArray(new ConsumerId[0]);
451            }
452            return result;
453        }
454        
455        public void setNetworkConsumerPath(ConsumerId[] consumerPath) {
456            if (consumerPath != null) {
457                for (int i=0; i<consumerPath.length; i++) {
458                    addNetworkConsumerId(consumerPath[i]);
459                }
460            }
461        }
462    
463        public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
464            this.lastDeliveredSequenceId  = lastDeliveredSequenceId;
465        }
466        
467        public long getLastDeliveredSequenceId() {
468            return lastDeliveredSequenceId;
469        }
470    
471    }