001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.kahadb.util;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.List;
024    import java.util.NoSuchElementException;
025    
026    /**
027     * Keeps track of a added long values. Collapses ranges of numbers using a
028     * Sequence representation. Use to keep track of received message ids to find
029     * out if a message is duplicate or if there are any missing messages.
030     * 
031     * @author chirino
032     */
033    public class SequenceSet extends LinkedNodeList<Sequence> {
034    
035        public static class Marshaller implements org.apache.kahadb.util.Marshaller<SequenceSet> {
036    
037            public static final Marshaller INSTANCE = new Marshaller();
038            
039            public SequenceSet readPayload(DataInput in) throws IOException {
040                SequenceSet value = new SequenceSet();
041                int count = in.readInt();
042                for (int i = 0; i < count; i++) {
043                    if( in.readBoolean() ) {
044                        Sequence sequence = new Sequence(in.readLong(), in.readLong());
045                        value.addLast(sequence);
046                    } else {
047                        Sequence sequence = new Sequence(in.readLong());
048                        value.addLast(sequence);
049                    }
050                }
051                return value;
052            }
053    
054            public void writePayload(SequenceSet value, DataOutput out) throws IOException {
055                out.writeInt(value.size());
056                Sequence sequence = value.getHead();
057                while (sequence != null ) {
058                    if( sequence.range() > 1 ) {
059                        out.writeBoolean(true);
060                        out.writeLong(sequence.first);
061                        out.writeLong(sequence.last);
062                    } else {
063                        out.writeBoolean(false);
064                        out.writeLong(sequence.first);
065                    }
066                    sequence = sequence.getNext();
067                }
068            }
069    
070            public int getFixedSize() {
071                return -1;
072            }
073    
074            public SequenceSet deepCopy(SequenceSet value) {
075                SequenceSet rc = new SequenceSet();
076                Sequence sequence = value.getHead();
077                while (sequence != null ) {
078                    rc.add(new Sequence(sequence.first, sequence.last));
079                    sequence = sequence.getNext();
080                }
081                return rc;
082            }
083    
084            public boolean isDeepCopySupported() {
085                return true;
086            }
087        }
088        
089        public void add(Sequence value) {
090            // TODO we can probably optimize this a bit
091            for(long i=value.first; i<value.last+1; i++) {
092                add(i);
093            }
094        }
095        
096        
097        /**
098         * 
099         * @param value
100         *            the value to add to the list
101         * @return false if the value was a duplicate.
102         */
103        public boolean add(long value) {
104    
105            if (isEmpty()) {
106                addFirst(new Sequence(value));
107                return true;
108            }
109    
110            Sequence sequence = getHead();
111            while (sequence != null) {
112    
113                if (sequence.isAdjacentToLast(value)) {
114                    // grow the sequence...
115                    sequence.last = value;
116                    // it might connect us to the next sequence..
117                    if (sequence.getNext() != null) {
118                        Sequence next = sequence.getNext();
119                        if (next.isAdjacentToFirst(value)) {
120                            // Yep the sequence connected.. so join them.
121                            sequence.last = next.last;
122                            next.unlink();
123                        }
124                    }
125                    return true;
126                }
127    
128                if (sequence.isAdjacentToFirst(value)) {
129                    // grow the sequence...
130                    sequence.first = value;
131    
132                    // it might connect us to the previous
133                    if (sequence.getPrevious() != null) {
134                        Sequence prev = sequence.getPrevious();
135                        if (prev.isAdjacentToLast(value)) {
136                            // Yep the sequence connected.. so join them.
137                            sequence.first = prev.first;
138                            prev.unlink();
139                        }
140                    }
141                    return true;
142                }
143    
144                // Did that value land before this sequence?
145                if (value < sequence.first) {
146                    // Then insert a new entry before this sequence item.
147                    sequence.linkBefore(new Sequence(value));
148                    return true;
149                }
150    
151                // Did that value land within the sequence? The it's a duplicate.
152                if (sequence.contains(value)) {
153                    return false;
154                }
155    
156                sequence = sequence.getNext();
157            }
158    
159            // Then the value is getting appended to the tail of the sequence.
160            addLast(new Sequence(value));
161            return true;
162        }
163        
164        /**
165         * Removes and returns the first element from this list.
166         *
167         * @return the first element from this list.
168         * @throws NoSuchElementException if this list is empty.
169         */
170        public long removeFirst() {
171            if (isEmpty()) {
172                throw new NoSuchElementException();
173            }
174            
175            Sequence rc = removeFirstSequence(1);
176            return rc.first;
177        }
178    
179    
180        public Sequence removeLastSequence() {
181            if (isEmpty()) {
182                return null;
183            }
184    
185            Sequence rc = getTail();
186            rc.unlink();
187            return rc;
188        }
189    
190        /**
191         * Removes and returns the first sequence that is count range large.
192         *
193         * @return a sequence that is count range large, or null if no sequence is that large in the list.
194         */
195        public Sequence removeFirstSequence(long count) {
196            if (isEmpty()) {
197                return null;
198            }
199            
200            Sequence sequence = getHead();
201            while (sequence != null ) {
202                if (sequence.range() == count ) {
203                    sequence.unlink();
204                    return sequence;
205                }
206                if (sequence.range() > count ) {
207                    Sequence rc = new Sequence(sequence.first, sequence.first+count);
208                    sequence.first+=count;
209                    return rc;
210                }
211                sequence = sequence.getNext();
212            }
213            return null;
214        }
215    
216    
217        /**
218         * @return all the id Sequences that are missing from this set that are not
219         *         in between the range provided.
220         */
221        public List<Sequence> getMissing(long first, long last) {
222            ArrayList<Sequence> rc = new ArrayList<Sequence>();
223            if (first > last) {
224                throw new IllegalArgumentException("First cannot be more than last");
225            }
226            if (isEmpty()) {
227                // We are missing all the messages.
228                rc.add(new Sequence(first, last));
229                return rc;
230            }
231    
232            Sequence sequence = getHead();
233            while (sequence != null && first <= last) {
234                if (sequence.contains(first)) {
235                    first = sequence.last + 1;
236                } else {
237                    if (first < sequence.first) {
238                        if (last < sequence.first) {
239                            rc.add(new Sequence(first, last));
240                            return rc;
241                        } else {
242                            rc.add(new Sequence(first, sequence.first - 1));
243                            first = sequence.last + 1;
244                        }
245                    }
246                }
247                sequence = sequence.getNext();
248            }
249    
250            if (first <= last) {
251                rc.add(new Sequence(first, last));
252            }
253            return rc;
254        }
255    
256        /**
257         * @return all the Sequence that are in this list
258         */
259        public List<Sequence> getReceived() {
260            ArrayList<Sequence> rc = new ArrayList<Sequence>(size());
261            Sequence sequence = getHead();
262            while (sequence != null) {
263                rc.add(new Sequence(sequence.first, sequence.last));
264                sequence = sequence.getNext();
265            }
266            return rc;
267        }
268    
269        public boolean contains(int first, int last) {
270            if (isEmpty()) {
271                return false;
272            }
273            Sequence sequence = getHead();
274            while (sequence != null) {
275                if (sequence.first <= first ) {
276                    return last <= sequence.last ;
277                }
278                sequence = sequence.getNext();
279            }
280            return false;
281        }
282    
283    }