001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.kahadb.util; 018 019 import java.io.DataInput; 020 import java.io.DataOutput; 021 import java.io.IOException; 022 import java.util.ArrayList; 023 import java.util.List; 024 import java.util.NoSuchElementException; 025 026 /** 027 * Keeps track of a added long values. Collapses ranges of numbers using a 028 * Sequence representation. Use to keep track of received message ids to find 029 * out if a message is duplicate or if there are any missing messages. 030 * 031 * @author chirino 032 */ 033 public class SequenceSet extends LinkedNodeList<Sequence> { 034 035 public static class Marshaller implements org.apache.kahadb.util.Marshaller<SequenceSet> { 036 037 public static final Marshaller INSTANCE = new Marshaller(); 038 039 public SequenceSet readPayload(DataInput in) throws IOException { 040 SequenceSet value = new SequenceSet(); 041 int count = in.readInt(); 042 for (int i = 0; i < count; i++) { 043 if( in.readBoolean() ) { 044 Sequence sequence = new Sequence(in.readLong(), in.readLong()); 045 value.addLast(sequence); 046 } else { 047 Sequence sequence = new Sequence(in.readLong()); 048 value.addLast(sequence); 049 } 050 } 051 return value; 052 } 053 054 public void writePayload(SequenceSet value, DataOutput out) throws IOException { 055 out.writeInt(value.size()); 056 Sequence sequence = value.getHead(); 057 while (sequence != null ) { 058 if( sequence.range() > 1 ) { 059 out.writeBoolean(true); 060 out.writeLong(sequence.first); 061 out.writeLong(sequence.last); 062 } else { 063 out.writeBoolean(false); 064 out.writeLong(sequence.first); 065 } 066 sequence = sequence.getNext(); 067 } 068 } 069 070 public int getFixedSize() { 071 return -1; 072 } 073 074 public SequenceSet deepCopy(SequenceSet value) { 075 SequenceSet rc = new SequenceSet(); 076 Sequence sequence = value.getHead(); 077 while (sequence != null ) { 078 rc.add(new Sequence(sequence.first, sequence.last)); 079 sequence = sequence.getNext(); 080 } 081 return rc; 082 } 083 084 public boolean isDeepCopySupported() { 085 return true; 086 } 087 } 088 089 public void add(Sequence value) { 090 // TODO we can probably optimize this a bit 091 for(long i=value.first; i<value.last+1; i++) { 092 add(i); 093 } 094 } 095 096 097 /** 098 * 099 * @param value 100 * the value to add to the list 101 * @return false if the value was a duplicate. 102 */ 103 public boolean add(long value) { 104 105 if (isEmpty()) { 106 addFirst(new Sequence(value)); 107 return true; 108 } 109 110 Sequence sequence = getHead(); 111 while (sequence != null) { 112 113 if (sequence.isAdjacentToLast(value)) { 114 // grow the sequence... 115 sequence.last = value; 116 // it might connect us to the next sequence.. 117 if (sequence.getNext() != null) { 118 Sequence next = sequence.getNext(); 119 if (next.isAdjacentToFirst(value)) { 120 // Yep the sequence connected.. so join them. 121 sequence.last = next.last; 122 next.unlink(); 123 } 124 } 125 return true; 126 } 127 128 if (sequence.isAdjacentToFirst(value)) { 129 // grow the sequence... 130 sequence.first = value; 131 132 // it might connect us to the previous 133 if (sequence.getPrevious() != null) { 134 Sequence prev = sequence.getPrevious(); 135 if (prev.isAdjacentToLast(value)) { 136 // Yep the sequence connected.. so join them. 137 sequence.first = prev.first; 138 prev.unlink(); 139 } 140 } 141 return true; 142 } 143 144 // Did that value land before this sequence? 145 if (value < sequence.first) { 146 // Then insert a new entry before this sequence item. 147 sequence.linkBefore(new Sequence(value)); 148 return true; 149 } 150 151 // Did that value land within the sequence? The it's a duplicate. 152 if (sequence.contains(value)) { 153 return false; 154 } 155 156 sequence = sequence.getNext(); 157 } 158 159 // Then the value is getting appended to the tail of the sequence. 160 addLast(new Sequence(value)); 161 return true; 162 } 163 164 /** 165 * Removes and returns the first element from this list. 166 * 167 * @return the first element from this list. 168 * @throws NoSuchElementException if this list is empty. 169 */ 170 public long removeFirst() { 171 if (isEmpty()) { 172 throw new NoSuchElementException(); 173 } 174 175 Sequence rc = removeFirstSequence(1); 176 return rc.first; 177 } 178 179 180 public Sequence removeLastSequence() { 181 if (isEmpty()) { 182 return null; 183 } 184 185 Sequence rc = getTail(); 186 rc.unlink(); 187 return rc; 188 } 189 190 /** 191 * Removes and returns the first sequence that is count range large. 192 * 193 * @return a sequence that is count range large, or null if no sequence is that large in the list. 194 */ 195 public Sequence removeFirstSequence(long count) { 196 if (isEmpty()) { 197 return null; 198 } 199 200 Sequence sequence = getHead(); 201 while (sequence != null ) { 202 if (sequence.range() == count ) { 203 sequence.unlink(); 204 return sequence; 205 } 206 if (sequence.range() > count ) { 207 Sequence rc = new Sequence(sequence.first, sequence.first+count); 208 sequence.first+=count; 209 return rc; 210 } 211 sequence = sequence.getNext(); 212 } 213 return null; 214 } 215 216 217 /** 218 * @return all the id Sequences that are missing from this set that are not 219 * in between the range provided. 220 */ 221 public List<Sequence> getMissing(long first, long last) { 222 ArrayList<Sequence> rc = new ArrayList<Sequence>(); 223 if (first > last) { 224 throw new IllegalArgumentException("First cannot be more than last"); 225 } 226 if (isEmpty()) { 227 // We are missing all the messages. 228 rc.add(new Sequence(first, last)); 229 return rc; 230 } 231 232 Sequence sequence = getHead(); 233 while (sequence != null && first <= last) { 234 if (sequence.contains(first)) { 235 first = sequence.last + 1; 236 } else { 237 if (first < sequence.first) { 238 if (last < sequence.first) { 239 rc.add(new Sequence(first, last)); 240 return rc; 241 } else { 242 rc.add(new Sequence(first, sequence.first - 1)); 243 first = sequence.last + 1; 244 } 245 } 246 } 247 sequence = sequence.getNext(); 248 } 249 250 if (first <= last) { 251 rc.add(new Sequence(first, last)); 252 } 253 return rc; 254 } 255 256 /** 257 * @return all the Sequence that are in this list 258 */ 259 public List<Sequence> getReceived() { 260 ArrayList<Sequence> rc = new ArrayList<Sequence>(size()); 261 Sequence sequence = getHead(); 262 while (sequence != null) { 263 rc.add(new Sequence(sequence.first, sequence.last)); 264 sequence = sequence.getNext(); 265 } 266 return rc; 267 } 268 269 public boolean contains(int first, int last) { 270 if (isEmpty()) { 271 return false; 272 } 273 Sequence sequence = getHead(); 274 while (sequence != null) { 275 if (sequence.first <= first ) { 276 return last <= sequence.last ; 277 } 278 sequence = sequence.getNext(); 279 } 280 return false; 281 } 282 283 }