001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.util;
018    
019    import java.util.LinkedList;
020    
021    /**
022     * Holder for many bitArrays - used for message audit
023     * 
024     * @version $Revision: 1.1.1.1 $
025     */
026    public class BitArrayBin {
027    
028        private LinkedList<BitArray> list;
029        private int maxNumberOfArrays;
030        private int firstIndex = -1;
031        private long lastInOrderBit=-1;
032    
033        /**
034         * Create a BitArrayBin to a certain window size (number of messages to
035         * keep)
036         * 
037         * @param windowSize
038         */
039        public BitArrayBin(int windowSize) {
040            maxNumberOfArrays = ((windowSize + 1) / BitArray.LONG_SIZE) + 1;
041            maxNumberOfArrays = Math.max(maxNumberOfArrays, 1);
042            list = new LinkedList<BitArray>();
043            for (int i = 0; i < maxNumberOfArrays; i++) {
044                list.add(null);
045            }
046        }
047    
048        /**
049         * Set a bit
050         * 
051         * @param index
052         * @param value
053         * @return true if set
054         */
055        public boolean setBit(long index, boolean value) {
056            boolean answer = false;
057            BitArray ba = getBitArray(index);
058            if (ba != null) {
059                int offset = getOffset(index);
060                if (offset >= 0) {
061                    answer = ba.set(offset, value);
062                }
063            }
064            return answer;
065        }
066        
067        /**
068         * Test if in order
069         * @param index
070         * @return true if next message is in order
071         */
072        public boolean isInOrder(long index) {
073            boolean result = false;
074            if (lastInOrderBit == -1) {
075                result = true;
076            } else {
077                result = lastInOrderBit + 1 == index;
078            }
079            lastInOrderBit = index;
080            return result;
081    
082        }
083    
084        /**
085         * Get the boolean value at the index
086         * 
087         * @param index
088         * @return true/false
089         */
090        public boolean getBit(long index) {
091            boolean answer = index >= firstIndex;
092            BitArray ba = getBitArray(index);
093            if (ba != null) {
094                int offset = getOffset(index);
095                if (offset >= 0) {
096                    answer = ba.get(offset);
097                    return answer;
098                }
099            } else {
100                // gone passed range for previous bins so assume set
101                answer = true;
102            }
103            return answer;
104        }
105    
106        /**
107         * Get the BitArray for the index
108         * 
109         * @param index
110         * @return BitArray
111         */
112        private BitArray getBitArray(long index) {
113            int bin = getBin(index);
114            BitArray answer = null;
115            if (bin >= 0) {
116                if (bin >= maxNumberOfArrays) {
117                    int overShoot = bin - maxNumberOfArrays + 1;
118                    while (overShoot > 0) {
119                        list.removeFirst();
120                        firstIndex += BitArray.LONG_SIZE;
121                        list.add(new BitArray());
122                        overShoot--;
123                    }
124                    
125                    bin = maxNumberOfArrays - 1;
126                }
127                answer = list.get(bin);
128                if (answer == null) {
129                    answer = new BitArray();
130                    list.set(bin, answer);
131                }
132            }
133            return answer;
134        }
135    
136        /**
137         * Get the index of the bin from the total index
138         * 
139         * @param index
140         * @return the index of the bin
141         */
142        private int getBin(long index) {
143            int answer = 0;
144            if (firstIndex < 0) {
145                firstIndex = (int) (index - (index % BitArray.LONG_SIZE));
146            } else if (firstIndex >= 0) {
147                answer = (int)((index - firstIndex) / BitArray.LONG_SIZE);
148            }
149            return answer;
150        }
151    
152        /**
153         * Get the offset into a bin from the total index
154         * 
155         * @param index
156         * @return the relative offset into a bin
157         */
158        private int getOffset(long index) {
159            int answer = 0;
160            if (firstIndex >= 0) {
161                answer = (int)((index - firstIndex) - (BitArray.LONG_SIZE * getBin(index)));
162            }
163            return answer;
164        }
165    }