001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.fusesource.hawtdb.util.list;
018    
019    import org.fusesource.hawtbuf.codec.Codec;
020    
021    import java.io.DataInput;
022    import java.io.DataOutput;
023    import java.io.IOException;
024    import java.util.ArrayList;
025    import java.util.List;
026    import java.util.NoSuchElementException;
027    
028    
029    /**
030     * Keeps track of a added long values. Collapses ranges of numbers using a
031     * Sequence representation. Use to keep track of received message ids to find
032     * out if a message is duplicate or if there are any missing messages.
033     * 
034     * @author chirino
035     */
036    public class SequenceSet extends LinkedNodeList<Sequence> {
037        
038        public static Codec CODEC = new Codec<SequenceSet>() {
039    
040            public SequenceSet decode(DataInput in) throws IOException {
041                SequenceSet value = new SequenceSet();
042                int count = in.readInt();
043                for (int i = 0; i < count; i++) {
044                    if( in.readBoolean() ) {
045                        Sequence sequence = new Sequence(in.readLong(), in.readLong());
046                        value.addLast(sequence);
047                    } else {
048                        Sequence sequence = new Sequence(in.readLong());
049                        value.addLast(sequence);
050                    }
051                }
052                return value;
053            }
054    
055            public void encode(SequenceSet value, DataOutput out) throws IOException {
056                out.writeInt(value.size());
057                Sequence sequence = value.getHead();
058                while (sequence != null ) {
059                    if( sequence.range() > 1 ) {
060                        out.writeBoolean(true);
061                        out.writeLong(sequence.first);
062                        out.writeLong(sequence.last);
063                    } else {
064                        out.writeBoolean(false);
065                        out.writeLong(sequence.first);
066                    }
067                    sequence = sequence.getNext();
068                }
069            }
070    
071            public int getFixedSize() {
072                return -1;
073            }
074    
075            public SequenceSet deepCopy(SequenceSet value) {
076                SequenceSet rc = new SequenceSet();
077                Sequence sequence = value.getHead();
078                while (sequence != null ) {
079                    rc.add(new Sequence(sequence.first, sequence.last));
080                    sequence = sequence.getNext();
081                }
082                return rc;
083            }
084    
085            public boolean isDeepCopySupported() {
086                return true;
087            }
088    
089            public boolean isEstimatedSizeSupported() {
090                return true;
091            }
092    
093            public int estimatedSize(SequenceSet object) {
094                return object.size()*16;
095            }
096        };
097        
098        public void add(Sequence value) {
099            // TODO we can probably optimize this a bit
100            for(long i=value.first; i<value.last+1; i++) {
101                add(i);
102            }
103        }
104        
105        
106        /**
107         * 
108         * @param value
109         *            the value to add to the list
110         * @return false if the value was a duplicate.
111         */
112        public boolean add(long value) {
113    
114            if (isEmpty()) {
115                addFirst(new Sequence(value));
116                return true;
117            }
118    
119            Sequence sequence = getHead();
120            while (sequence != null) {
121    
122                if (sequence.isAdjacentToLast(value)) {
123                    // grow the sequence...
124                    sequence.last = value;
125                    // it might connect us to the next sequence..
126                    if (sequence.getNext() != null) {
127                        Sequence next = sequence.getNext();
128                        if (next.isAdjacentToFirst(value)) {
129                            // Yep the sequence connected.. so join them.
130                            sequence.last = next.last;
131                            next.unlink();
132                        }
133                    }
134                    return true;
135                }
136    
137                if (sequence.isAdjacentToFirst(value)) {
138                    // grow the sequence...
139                    sequence.first = value;
140    
141                    // it might connect us to the previous
142                    if (sequence.getPrevious() != null) {
143                        Sequence prev = sequence.getPrevious();
144                        if (prev.isAdjacentToLast(value)) {
145                            // Yep the sequence connected.. so join them.
146                            sequence.first = prev.first;
147                            prev.unlink();
148                        }
149                    }
150                    return true;
151                }
152    
153                // Did that value land before this sequence?
154                if (value < sequence.first) {
155                    // Then insert a new entry before this sequence item.
156                    sequence.linkBefore(new Sequence(value));
157                    return true;
158                }
159    
160                // Did that value land within the sequence? The it's a duplicate.
161                if (sequence.contains(value)) {
162                    return false;
163                }
164    
165                sequence = sequence.getNext();
166            }
167    
168            // Then the value is getting appended to the tail of the sequence.
169            addLast(new Sequence(value));
170            return true;
171        }
172        
173        /**
174         * Removes and returns the first element from this list.
175         *
176         * @return the first element from this list.
177         * @throws NoSuchElementException if this list is empty.
178         */
179        public long removeFirst() {
180            if (isEmpty()) {
181                throw new NoSuchElementException();
182            }
183            
184            Sequence rc = removeFirstSequence(1);
185            return rc.first;
186        }
187    
188    
189        public Sequence removeLastSequence() {
190            if (isEmpty()) {
191                return null;
192            }
193    
194            Sequence rc = getTail();
195            rc.unlink();
196            return rc;
197        }
198    
199        /**
200         * Removes and returns the first sequence that is count range large.
201         *
202         * @return a sequence that is count range large, or null if no sequence is that large in the list.
203         */
204        public Sequence removeFirstSequence(long count) {
205            if (isEmpty()) {
206                return null;
207            }
208            
209            Sequence sequence = getHead();
210            while (sequence != null ) {
211                if (sequence.range() == count ) {
212                    sequence.unlink();
213                    return sequence;
214                }
215                if (sequence.range() > count ) {
216                    Sequence rc = new Sequence(sequence.first, sequence.first+count);
217                    sequence.first+=count;
218                    return rc;
219                }
220                sequence = sequence.getNext();
221            }
222            return null;
223        }
224    
225    
226        /**
227         * @return all the id Sequences that are missing from this set that are not
228         *         in between the range provided.
229         */
230        public List<Sequence> getMissing(long first, long last) {
231            ArrayList<Sequence> rc = new ArrayList<Sequence>();
232            if (first > last) {
233                throw new IllegalArgumentException("First cannot be more than last");
234            }
235            if (isEmpty()) {
236                // We are missing all the messages.
237                rc.add(new Sequence(first, last));
238                return rc;
239            }
240    
241            Sequence sequence = getHead();
242            while (sequence != null && first <= last) {
243                if (sequence.contains(first)) {
244                    first = sequence.last + 1;
245                } else {
246                    if (first < sequence.first) {
247                        if (last < sequence.first) {
248                            rc.add(new Sequence(first, last));
249                            return rc;
250                        } else {
251                            rc.add(new Sequence(first, sequence.first - 1));
252                            first = sequence.last + 1;
253                        }
254                    }
255                }
256                sequence = sequence.getNext();
257            }
258    
259            if (first <= last) {
260                rc.add(new Sequence(first, last));
261            }
262            return rc;
263        }
264    
265        /**
266         * @return all the Sequence that are in this list
267         */
268        public List<Sequence> getReceived() {
269            ArrayList<Sequence> rc = new ArrayList<Sequence>(size());
270            Sequence sequence = getHead();
271            while (sequence != null) {
272                rc.add(new Sequence(sequence.first, sequence.last));
273                sequence = sequence.getNext();
274            }
275            return rc;
276        }
277    
278        public boolean contains(int first, int last) {
279            if (isEmpty()) {
280                return false;
281            }
282            Sequence sequence = getHead();
283            while (sequence != null) {
284                if (sequence.first <= first && first <= sequence.last ) {
285                    return last <= sequence.last ;
286                }
287                sequence = sequence.getNext();
288            }
289            return false;
290        }
291    
292    }