001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import javax.jms.JMSException;
020    import javax.jms.Message;
021    
022    import org.apache.activemq.broker.region.MessageReference;
023    import org.apache.activemq.command.MessageId;
024    import org.apache.activemq.command.ProducerId;
025    import org.apache.activemq.util.BitArrayBin;
026    import org.apache.activemq.util.IdGenerator;
027    import org.apache.activemq.util.LRUCache;
028    
029    /**
030     * Provides basic audit functions for Messages
031     * 
032     * @version $Revision: 1.1.1.1 $
033     */
034    public class ActiveMQMessageAudit {
035    
036        public static final int DEFAULT_WINDOW_SIZE = 2048;
037        public static final int MAXIMUM_PRODUCER_COUNT = 64;
038        private int auditDepth;
039        private int maximumNumberOfProducersToTrack;
040        private LRUCache<Object, BitArrayBin> map;
041    
042        /**
043         * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
044         * 64
045         */
046        public ActiveMQMessageAudit() {
047            this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
048        }
049    
050        /**
051         * Construct a MessageAudit
052         * 
053         * @param auditDepth range of ids to track
054         * @param maximumNumberOfProducersToTrack number of producers expected in
055         *                the system
056         */
057        public ActiveMQMessageAudit(int auditDepth, final int maximumNumberOfProducersToTrack) {
058            this.auditDepth = auditDepth;
059            this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
060            this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
061        }
062        
063        /**
064         * @return the auditDepth
065         */
066        public int getAuditDepth() {
067            return auditDepth;
068        }
069    
070        /**
071         * @param auditDepth the auditDepth to set
072         */
073        public void setAuditDepth(int auditDepth) {
074            this.auditDepth = auditDepth;
075        }
076    
077        /**
078         * @return the maximumNumberOfProducersToTrack
079         */
080        public int getMaximumNumberOfProducersToTrack() {
081            return maximumNumberOfProducersToTrack;
082        }
083    
084        /**
085         * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
086         */
087        public void setMaximumNumberOfProducersToTrack(
088                int maximumNumberOfProducersToTrack) {
089            this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
090            this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
091        }
092    
093        /**
094         * Checks if this message has been seen before
095         * 
096         * @param message
097         * @return true if the message is a duplicate
098         * @throws JMSException
099         */
100        public boolean isDuplicate(Message message) throws JMSException {
101            return isDuplicate(message.getJMSMessageID());
102        }
103    
104        /**
105         * checks whether this messageId has been seen before and adds this
106         * messageId to the list
107         * 
108         * @param id
109         * @return true if the message is a duplicate
110         */
111        public synchronized boolean isDuplicate(String id) {
112            boolean answer = false;
113            String seed = IdGenerator.getSeedFromId(id);
114            if (seed != null) {
115                BitArrayBin bab = map.get(seed);
116                if (bab == null) {
117                    bab = new BitArrayBin(auditDepth);
118                    map.put(seed, bab);
119                }
120                long index = IdGenerator.getSequenceFromId(id);
121                if (index >= 0) {
122                    answer = bab.setBit(index, true);
123                }
124            }
125            return answer;
126        }
127    
128        /**
129         * Checks if this message has been seen before
130         * 
131         * @param message
132         * @return true if the message is a duplicate
133         */
134        public boolean isDuplicate(final MessageReference message) {
135            MessageId id = message.getMessageId();
136            return isDuplicate(id);
137        }
138        
139        /**
140         * Checks if this messageId has been seen before
141         * 
142         * @param id
143         * @return true if the message is a duplicate
144         */
145        public synchronized boolean isDuplicate(final MessageId id) {
146            boolean answer = false;
147            
148            if (id != null) {
149                ProducerId pid = id.getProducerId();
150                if (pid != null) {
151                    BitArrayBin bab = map.get(pid);
152                    if (bab == null) {
153                        bab = new BitArrayBin(auditDepth);
154                        map.put(pid, bab);
155                    }
156                    answer = bab.setBit(id.getProducerSequenceId(), true);
157                }
158            }
159            return answer;
160        }
161    
162        /**
163         * mark this message as being received
164         * 
165         * @param message
166         */
167        public void rollback(final MessageReference message) {
168            MessageId id = message.getMessageId();
169            rollback(id);
170        }
171        
172        /**
173         * mark this message as being received
174         * 
175         * @param id
176         */
177        public synchronized void rollback(final  MessageId id) {
178            if (id != null) {
179                ProducerId pid = id.getProducerId();
180                if (pid != null) {
181                    BitArrayBin bab = map.get(pid);
182                    if (bab != null) {
183                        bab.setBit(id.getProducerSequenceId(), false);
184                    }
185                }
186            }
187        }
188        
189        /**
190         * Check the message is in order
191         * @param msg
192         * @return
193         * @throws JMSException
194         */
195        public boolean isInOrder(Message msg) throws JMSException {
196            return isInOrder(msg.getJMSMessageID());
197        }
198        
199        /**
200         * Check the message id is in order
201         * @param id
202         * @return
203         */
204        public synchronized boolean isInOrder(final String id) {
205            boolean answer = true;
206            
207            if (id != null) {
208                String seed = IdGenerator.getSeedFromId(id);
209                if (seed != null) {
210                    BitArrayBin bab = map.get(seed);
211                    if (bab != null) {
212                        long index = IdGenerator.getSequenceFromId(id);
213                        answer = bab.isInOrder(index);
214                    }
215                   
216                }
217            }
218            return answer;
219        }
220        
221        /**
222         * Check the MessageId is in order
223         * @param message 
224         * @return
225         */
226        public synchronized boolean isInOrder(final MessageReference message) {
227            return isInOrder(message.getMessageId());
228        }
229        
230        /**
231         * Check the MessageId is in order
232         * @param id
233         * @return
234         */
235        public synchronized boolean isInOrder(final MessageId id) {
236            boolean answer = false;
237    
238            if (id != null) {
239                ProducerId pid = id.getProducerId();
240                if (pid != null) {
241                    BitArrayBin bab = map.get(pid);
242                    if (bab == null) {
243                        bab = new BitArrayBin(auditDepth);
244                        map.put(pid, bab);
245                    }
246                    answer = bab.isInOrder(id.getProducerSequenceId());
247    
248                }
249            }
250            return answer;
251        }
252    }