001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.kahadb.index; 018 019 import java.io.DataInput; 020 import java.io.DataOutput; 021 import java.io.IOException; 022 import java.util.Iterator; 023 import java.util.Map; 024 import java.util.Map.Entry; 025 import java.util.concurrent.atomic.AtomicBoolean; 026 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 import org.apache.kahadb.page.Page; 030 import org.apache.kahadb.page.PageFile; 031 import org.apache.kahadb.page.Transaction; 032 import org.apache.kahadb.util.Marshaller; 033 import org.apache.kahadb.util.VariableMarshaller; 034 035 /** 036 * BTree implementation 037 * 038 * @version $Revision: 777209 $ 039 */ 040 public class HashIndex<Key,Value> implements Index<Key,Value> { 041 042 public static final int CLOSED_STATE = 1; 043 public static final int OPEN_STATE = 2; 044 045 046 private static final Log LOG = LogFactory.getLog(HashIndex.class); 047 048 public static final int DEFAULT_BIN_CAPACITY; 049 public static final int DEFAULT_MAXIMUM_BIN_CAPACITY; 050 public static final int DEFAULT_MINIMUM_BIN_CAPACITY; 051 public static final int DEFAULT_LOAD_FACTOR; 052 053 static { 054 DEFAULT_BIN_CAPACITY = Integer.parseInt(System.getProperty("defaultBinSize", "1024")); 055 DEFAULT_MAXIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384")); 056 DEFAULT_MINIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("minimumCapacity", "16")); 057 DEFAULT_LOAD_FACTOR = Integer.parseInt(System.getProperty("defaultLoadFactor", "75")); 058 } 059 060 private AtomicBoolean loaded = new AtomicBoolean(); 061 062 063 private int increaseThreshold; 064 private int decreaseThreshold; 065 066 // Where the bin page array starts at. 067 private int maximumBinCapacity = DEFAULT_MAXIMUM_BIN_CAPACITY; 068 private int minimumBinCapacity = DEFAULT_MINIMUM_BIN_CAPACITY; 069 070 071 072 // Once binsActive/binCapacity reaches the loadFactor, then we need to 073 // increase the capacity 074 private int loadFactor = DEFAULT_LOAD_FACTOR; 075 076 private PageFile pageFile; 077 // This page holds the index metadata. 078 private long pageId; 079 080 static class Metadata { 081 082 private Page<Metadata> page; 083 084 // When the index is initializing or resizing.. state changes so that 085 // on failure it can be properly recovered. 086 private int state; 087 private long binPageId; 088 private int binCapacity = DEFAULT_BIN_CAPACITY; 089 private int binsActive; 090 private int size; 091 092 093 public void read(DataInput is) throws IOException { 094 state = is.readInt(); 095 binPageId = is.readLong(); 096 binCapacity = is.readInt(); 097 size = is.readInt(); 098 binsActive = is.readInt(); 099 } 100 public void write(DataOutput os) throws IOException { 101 os.writeInt(state); 102 os.writeLong(binPageId); 103 os.writeInt(binCapacity); 104 os.writeInt(size); 105 os.writeInt(binsActive); 106 } 107 108 static class Marshaller extends VariableMarshaller<Metadata> { 109 public Metadata readPayload(DataInput dataIn) throws IOException { 110 Metadata rc = new Metadata(); 111 rc.read(dataIn); 112 return rc; 113 } 114 115 public void writePayload(Metadata object, DataOutput dataOut) throws IOException { 116 object.write(dataOut); 117 } 118 } 119 } 120 121 private Metadata metadata = new Metadata(); 122 123 private Metadata.Marshaller metadataMarshaller = new Metadata.Marshaller(); 124 private HashBin.Marshaller<Key,Value> hashBinMarshaller = new HashBin.Marshaller<Key,Value>(this); 125 private Marshaller<Key> keyMarshaller; 126 private Marshaller<Value> valueMarshaller; 127 128 129 /** 130 * Constructor 131 * 132 * @param directory 133 * @param name 134 * @param indexManager 135 * @param numberOfBins 136 * @throws IOException 137 */ 138 public HashIndex(PageFile pageFile, long pageId) throws IOException { 139 this.pageFile = pageFile; 140 this.pageId = pageId; 141 } 142 143 public synchronized void load(Transaction tx) throws IOException { 144 if (loaded.compareAndSet(false, true)) { 145 final Page<Metadata> metadataPage = tx.load(pageId, metadataMarshaller); 146 // Is this a brand new index? 147 if (metadataPage.getType() == Page.PAGE_FREE_TYPE) { 148 // We need to create the pages for the bins 149 Page binPage = tx.allocate(metadata.binCapacity); 150 metadata.binPageId = binPage.getPageId(); 151 metadata.page = metadataPage; 152 metadataPage.set(metadata); 153 clear(tx); 154 155 // If failure happens now we can continue initializing the 156 // the hash bins... 157 } else { 158 159 metadata = metadataPage.get(); 160 metadata.page = metadataPage; 161 162 // If we did not have a clean shutdown... 163 if (metadata.state == OPEN_STATE ) { 164 // Figure out the size and the # of bins that are 165 // active. Yeah This loads the first page of every bin. :( 166 // We might want to put this in the metadata page, but 167 // then that page would be getting updated on every write. 168 metadata.size = 0; 169 for (int i = 0; i < metadata.binCapacity; i++) { 170 int t = sizeOfBin(tx, i); 171 if (t > 0) { 172 metadata.binsActive++; 173 } 174 metadata.size += t; 175 } 176 } 177 } 178 179 calcThresholds(); 180 181 metadata.state = OPEN_STATE; 182 tx.store(metadataPage, metadataMarshaller, true); 183 184 LOG.debug("HashIndex loaded. Using "+metadata.binCapacity+" bins starting at page "+metadata.binPageId); 185 } 186 } 187 188 public synchronized void unload(Transaction tx) throws IOException { 189 if (loaded.compareAndSet(true, false)) { 190 metadata.state = CLOSED_STATE; 191 tx.store(metadata.page, metadataMarshaller, true); 192 } 193 } 194 195 private int sizeOfBin(Transaction tx, int index) throws IOException { 196 return getBin(tx, index).size(); 197 } 198 199 public synchronized Value get(Transaction tx, Key key) throws IOException { 200 assertLoaded(); 201 return getBin(tx, key).get(key); 202 } 203 204 public synchronized boolean containsKey(Transaction tx, Key key) throws IOException { 205 assertLoaded(); 206 return getBin(tx, key).containsKey(key); 207 } 208 209 synchronized public Value put(Transaction tx, Key key, Value value) throws IOException { 210 assertLoaded(); 211 HashBin<Key,Value> bin = getBin(tx, key); 212 213 int originalSize = bin.size(); 214 Value result = bin.put(key,value); 215 store(tx, bin); 216 217 int newSize = bin.size(); 218 219 if (newSize != originalSize) { 220 metadata.size++; 221 if (newSize == 1) { 222 metadata.binsActive++; 223 } 224 } 225 226 if (metadata.binsActive >= this.increaseThreshold) { 227 newSize = Math.min(maximumBinCapacity, metadata.binCapacity*2); 228 if(metadata.binCapacity!=newSize) { 229 resize(tx, newSize); 230 } 231 } 232 return result; 233 } 234 235 synchronized public Value remove(Transaction tx, Key key) throws IOException { 236 assertLoaded(); 237 238 HashBin<Key,Value> bin = getBin(tx, key); 239 int originalSize = bin.size(); 240 Value result = bin.remove(key); 241 int newSize = bin.size(); 242 243 if (newSize != originalSize) { 244 store(tx, bin); 245 246 metadata.size--; 247 if (newSize == 0) { 248 metadata.binsActive--; 249 } 250 } 251 252 if (metadata.binsActive <= this.decreaseThreshold) { 253 newSize = Math.max(minimumBinCapacity, metadata.binCapacity/2); 254 if(metadata.binCapacity!=newSize) { 255 resize(tx, newSize); 256 } 257 } 258 return result; 259 } 260 261 262 public synchronized void clear(Transaction tx) throws IOException { 263 assertLoaded(); 264 for (int i = 0; i < metadata.binCapacity; i++) { 265 long pageId = metadata.binPageId + i; 266 clearBinAtPage(tx, pageId); 267 } 268 metadata.size = 0; 269 metadata.binsActive = 0; 270 } 271 272 public Iterator<Entry<Key, Value>> iterator(Transaction tx) throws IOException, UnsupportedOperationException { 273 throw new UnsupportedOperationException(); 274 } 275 276 277 /** 278 * @param tx 279 * @param pageId 280 * @throws IOException 281 */ 282 private void clearBinAtPage(Transaction tx, long pageId) throws IOException { 283 Page<HashBin<Key,Value>> page = tx.load(pageId, null); 284 HashBin<Key, Value> bin = new HashBin<Key,Value>(); 285 bin.setPage(page); 286 page.set(bin); 287 store(tx, bin); 288 } 289 290 public String toString() { 291 String str = "HashIndex" + System.identityHashCode(this) + ": " + pageFile; 292 return str; 293 } 294 295 // ///////////////////////////////////////////////////////////////// 296 // Implementation Methods 297 // ///////////////////////////////////////////////////////////////// 298 299 private void assertLoaded() throws IllegalStateException { 300 if( !loaded.get() ) { 301 throw new IllegalStateException("The HashIndex is not loaded"); 302 } 303 } 304 305 public synchronized void store(Transaction tx, HashBin<Key,Value> bin) throws IOException { 306 tx.store(bin.getPage(), hashBinMarshaller, true); 307 } 308 309 // While resizing, the following contains the new resize data. 310 311 private void resize(Transaction tx, final int newSize) throws IOException { 312 LOG.debug("Resizing to: "+newSize); 313 314 int resizeCapacity = newSize; 315 long resizePageId = tx.allocate(resizeCapacity).getPageId(); 316 317 // In Phase 1 we copy the data to the new bins.. 318 // Initialize the bins.. 319 for (int i = 0; i < resizeCapacity; i++) { 320 long pageId = resizePageId + i; 321 clearBinAtPage(tx, pageId); 322 } 323 324 metadata.binsActive = 0; 325 // Copy the data from the old bins to the new bins. 326 for (int i = 0; i < metadata.binCapacity; i++) { 327 328 HashBin<Key,Value> bin = getBin(tx, i); 329 for (Map.Entry<Key, Value> entry : bin.getAll(tx).entrySet()) { 330 HashBin<Key,Value> resizeBin = getBin(tx, entry.getKey(), resizePageId, resizeCapacity); 331 resizeBin.put(entry.getKey(), entry.getValue()); 332 store(tx, resizeBin); 333 if( resizeBin.size() == 1) { 334 metadata.binsActive++; 335 } 336 } 337 } 338 339 // In phase 2 we free the old bins and switch the the new bins. 340 tx.free(metadata.binPageId, metadata.binCapacity); 341 342 metadata.binCapacity = resizeCapacity; 343 metadata.binPageId = resizePageId; 344 metadata.state = OPEN_STATE; 345 tx.store(metadata.page, metadataMarshaller, true); 346 calcThresholds(); 347 348 LOG.debug("Resizing done. New bins start at: "+metadata.binPageId); 349 resizeCapacity=0; 350 resizePageId=0; 351 } 352 353 private void calcThresholds() { 354 increaseThreshold = (metadata.binCapacity * loadFactor)/100; 355 decreaseThreshold = (metadata.binCapacity * loadFactor * loadFactor ) / 20000; 356 } 357 358 private HashBin<Key,Value> getBin(Transaction tx, Key key) throws IOException { 359 return getBin(tx, key, metadata.binPageId, metadata.binCapacity); 360 } 361 362 private HashBin<Key,Value> getBin(Transaction tx, int i) throws IOException { 363 return getBin(tx, i, metadata.binPageId); 364 } 365 366 private HashBin<Key,Value> getBin(Transaction tx, Key key, long basePage, int capacity) throws IOException { 367 int i = indexFor(key, capacity); 368 return getBin(tx, i, basePage); 369 } 370 371 private HashBin<Key,Value> getBin(Transaction tx, int i, long basePage) throws IOException { 372 Page<HashBin<Key, Value>> page = tx.load(basePage + i, hashBinMarshaller); 373 HashBin<Key, Value> rc = page.get(); 374 rc.setPage(page); 375 return rc; 376 } 377 378 int indexFor(Key x, int length) { 379 return Math.abs(x.hashCode()%length); 380 } 381 382 // ///////////////////////////////////////////////////////////////// 383 // Property Accessors 384 // ///////////////////////////////////////////////////////////////// 385 386 public Marshaller<Key> getKeyMarshaller() { 387 return keyMarshaller; 388 } 389 390 /** 391 * Set the marshaller for key objects 392 * 393 * @param marshaller 394 */ 395 public synchronized void setKeyMarshaller(Marshaller<Key> marshaller) { 396 this.keyMarshaller = marshaller; 397 } 398 399 public Marshaller<Value> getValueMarshaller() { 400 return valueMarshaller; 401 } 402 /** 403 * Set the marshaller for value objects 404 * 405 * @param marshaller 406 */ 407 public void setValueMarshaller(Marshaller<Value> valueMarshaller) { 408 this.valueMarshaller = valueMarshaller; 409 } 410 411 /** 412 * @return number of bins in the index 413 */ 414 public int getBinCapacity() { 415 return metadata.binCapacity; 416 } 417 418 /** 419 * @param binCapacity 420 */ 421 public void setBinCapacity(int binCapacity) { 422 if (loaded.get() && binCapacity != metadata.binCapacity) { 423 throw new RuntimeException("Pages already loaded - can't reset bin capacity"); 424 } 425 metadata.binCapacity = binCapacity; 426 } 427 428 public boolean isTransient() { 429 return false; 430 } 431 432 /** 433 * @return the loadFactor 434 */ 435 public int getLoadFactor() { 436 return loadFactor; 437 } 438 439 /** 440 * @param loadFactor the loadFactor to set 441 */ 442 public void setLoadFactor(int loadFactor) { 443 this.loadFactor = loadFactor; 444 } 445 446 /** 447 * @return the maximumCapacity 448 */ 449 public int setMaximumBinCapacity() { 450 return maximumBinCapacity; 451 } 452 453 /** 454 * @param maximumCapacity the maximumCapacity to set 455 */ 456 public void setMaximumBinCapacity(int maximumCapacity) { 457 this.maximumBinCapacity = maximumCapacity; 458 } 459 460 public synchronized int size(Transaction tx) { 461 return metadata.size; 462 } 463 464 public synchronized int getActiveBins() { 465 return metadata.binsActive; 466 } 467 468 public long getBinPageId() { 469 return metadata.binPageId; 470 } 471 472 public PageFile getPageFile() { 473 return pageFile; 474 } 475 476 public int getBinsActive() { 477 return metadata.binsActive; 478 } 479 480 }