001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.Serializable;
020    import org.apache.commons.logging.Log;
021    import org.apache.commons.logging.LogFactory;
022    
023    /**
024     * Defines the prefetch message policies for different types of consumers
025     * 
026     * @org.apache.xbean.XBean element="prefetchPolicy"
027     * @version $Revision: 1.3 $
028     */
029    public class ActiveMQPrefetchPolicy extends Object implements Serializable {
030        public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE - 1;
031        public static final int DEFAULT_QUEUE_PREFETCH = 1000;
032        public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
033        public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
034        public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH=1000;
035        public static final int DEFAULT_INPUT_STREAM_PREFETCH=100;
036        public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;
037        
038        private static final Log LOG = LogFactory.getLog(ActiveMQPrefetchPolicy.class);
039        
040        private int queuePrefetch;
041        private int queueBrowserPrefetch;
042        private int topicPrefetch;
043        private int durableTopicPrefetch;
044        private int optimizeDurableTopicPrefetch;
045        private int inputStreamPrefetch;
046        private int maximumPendingMessageLimit;
047    
048        /**
049         * Initialize default prefetch policies
050         */
051        public ActiveMQPrefetchPolicy() {
052            this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
053            this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
054            this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
055            this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
056            this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH;
057            this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH;
058        }
059    
060        /**
061         * @return Returns the durableTopicPrefetch.
062         */
063        public int getDurableTopicPrefetch() {
064            return durableTopicPrefetch;
065        }
066    
067        /**
068         * @param durableTopicPrefetch The durableTopicPrefetch to set.
069         */
070        public void setDurableTopicPrefetch(int durableTopicPrefetch) {
071            this.durableTopicPrefetch = getMaxPrefetchLimit(durableTopicPrefetch);
072        }
073    
074        /**
075         * @return Returns the queuePrefetch.
076         */
077        public int getQueuePrefetch() {
078            return queuePrefetch;
079        }
080    
081        /**
082         * @param queuePrefetch The queuePrefetch to set.
083         */
084        public void setQueuePrefetch(int queuePrefetch) {
085            this.queuePrefetch = getMaxPrefetchLimit(queuePrefetch);
086        }
087    
088        /**
089         * @return Returns the queueBrowserPrefetch.
090         */
091        public int getQueueBrowserPrefetch() {
092            return queueBrowserPrefetch;
093        }
094    
095        /**
096         * @param queueBrowserPrefetch The queueBrowserPrefetch to set.
097         */
098        public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
099            this.queueBrowserPrefetch = getMaxPrefetchLimit(queueBrowserPrefetch);
100        }
101    
102        /**
103         * @return Returns the topicPrefetch.
104         */
105        public int getTopicPrefetch() {
106            return topicPrefetch;
107        }
108    
109        /**
110         * @param topicPrefetch The topicPrefetch to set.
111         */
112        public void setTopicPrefetch(int topicPrefetch) {
113            this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
114        }
115    
116        /**
117         * @return Returns the optimizeDurableTopicPrefetch.
118         */
119        public int getOptimizeDurableTopicPrefetch() {
120            return optimizeDurableTopicPrefetch;
121        }
122    
123        /**
124         * @param optimizeAcknowledgePrefetch The optimizeDurableTopicPrefetch to
125         *                set.
126         */
127        public void setOptimizeDurableTopicPrefetch(int optimizeAcknowledgePrefetch) {
128            this.optimizeDurableTopicPrefetch = optimizeAcknowledgePrefetch;
129        }
130    
131        public int getMaximumPendingMessageLimit() {
132            return maximumPendingMessageLimit;
133        }
134    
135        /**
136         * Sets how many messages a broker will keep around, above the prefetch
137         * limit, for non-durable topics before starting to discard older messages.
138         */
139        public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
140            this.maximumPendingMessageLimit = maximumPendingMessageLimit;
141        }
142    
143        private int getMaxPrefetchLimit(int value) {
144            int result = Math.min(value, MAX_PREFETCH_SIZE);
145            if (result < value) {
146                LOG.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE);
147            }
148            return result;
149        }
150    
151        public void setAll(int i) {
152            this.durableTopicPrefetch = i;
153            this.queueBrowserPrefetch = i;
154            this.queuePrefetch = i;
155            this.topicPrefetch = i;
156            this.inputStreamPrefetch = 1;
157            this.optimizeDurableTopicPrefetch = i;
158        }
159    
160        public int getInputStreamPrefetch() {
161            return inputStreamPrefetch;
162        }
163    
164        public void setInputStreamPrefetch(int inputStreamPrefetch) {
165            this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch);
166        }
167        
168        public boolean equals(Object object){
169            if (object instanceof ActiveMQPrefetchPolicy){
170                ActiveMQPrefetchPolicy other = (ActiveMQPrefetchPolicy) object;
171                return this.queuePrefetch == other.queuePrefetch &&
172                this.queueBrowserPrefetch == other.queueBrowserPrefetch &&
173                this.topicPrefetch == other.topicPrefetch &&
174                this.durableTopicPrefetch == other.durableTopicPrefetch &&
175                this.optimizeDurableTopicPrefetch == other.optimizeDurableTopicPrefetch &&
176                this.inputStreamPrefetch == other.inputStreamPrefetch;
177            }
178            return false;
179        }
180    
181    }