View Javadoc

1   /**
2    * JDBM LICENSE v1.00
3    *
4    * Redistribution and use of this software and associated documentation
5    * ("Software"), with or without modification, are permitted provided
6    * that the following conditions are met:
7    *
8    * 1. Redistributions of source code must retain copyright
9    *    statements and notices.  Redistributions must also contain a
10   *    copy of this document.
11   *
12   * 2. Redistributions in binary form must reproduce the
13   *    above copyright notice, this list of conditions and the
14   *    following disclaimer in the documentation and/or other
15   *    materials provided with the distribution.
16   *
17   * 3. The name "JDBM" must not be used to endorse or promote
18   *    products derived from this Software without prior written
19   *    permission of Cees de Groot.  For written permission,
20   *    please contact cg@cdegroot.com.
21   *
22   * 4. Products derived from this Software may not be called "JDBM"
23   *    nor may "JDBM" appear in their names without prior written
24   *    permission of Cees de Groot.
25   *
26   * 5. Due credit should be given to the JDBM Project
27   *    (http://jdbm.sourceforge.net/).
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE JDBM PROJECT AND CONTRIBUTORS
30   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
31   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
33   * CEES DE GROOT OR ANY CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
34   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
35   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
36   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
37   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
38   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
39   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
40   * OF THE POSSIBILITY OF SUCH DAMAGE.
41   *
42   * Copyright 2001 (C) Alex Boisvert. All Rights Reserved.
43   * Contributions are Copyright (C) 2001 by their associated contributors.
44   *
45   */
46  
47  package jdbm.btree;
48  
49  import jdbm.helper.Serializer;
50  import jdbm.helper.Tuple;
51  import jdbm.helper.TupleBrowser;
52  
53  import java.io.IOException;
54  import java.io.ByteArrayOutputStream;
55  import java.io.ByteArrayInputStream;
56  import java.io.ObjectInput;
57  import java.io.ObjectOutput;
58  import java.io.ObjectInputStream;
59  import java.io.ObjectOutputStream;
60  
61  /**
62   * Page of a Btree.
63   * <p>
64   * The page contains a number of key-value pairs.  Keys are ordered to allow
65   * dichotomic search.
66   * <p>
67   * If the page is a leaf page, the keys and values are user-defined and
68   * represent entries inserted by the user.
69   * <p>
70   * If the page is non-leaf, each key represents the greatest key in the
71   * underlying BPages and the values are recids pointing to the children BPages.
72   * The only exception is the rightmost BPage, which is considered to have an
73   * "infinite" key value, meaning that any insert will be to the left of this
74   * pseudo-key
75   *
76   * @author <a href="mailto:boisvert@intalio.com">Alex Boisvert</a>
77   * @version $Id: BPage.java,v 1.6 2003/09/21 15:46:59 boisvert Exp $
78   */
79  public final class BPage
80      implements Serializer
81  {
82  
83      private static final boolean DEBUG = false;
84  
85  
86      /**
87       * Version id for serialization.
88       */
89      final static long serialVersionUID = 1L;
90  
91  
92      /**
93       * Parent B+Tree.
94       */
95      transient BTree _btree;
96  
97  
98      /**
99       * This BPage's record ID in the PageManager.
100      */
101     protected transient long _recid;
102 
103 
104     /**
105      * Flag indicating if this is a leaf BPage.
106      */
107     protected boolean _isLeaf;
108 
109 
110     /**
111      * Keys of children nodes
112      */
113     protected Object[] _keys;
114 
115 
116     /**
117      * Values associated with keys.  (Only valid if leaf BPage)
118      */
119     protected Object[] _values;
120 
121 
122     /**
123      * Children pages (recids) associated with keys.  (Only valid if non-leaf BPage)
124      */
125     protected long[] _children;
126 
127     
128     /**
129      * Index of first used item at the page
130      */
131     protected int _first;
132 
133 
134     /**
135      * Previous leaf BPage (only if this BPage is a leaf)
136      */
137     protected long _previous;
138 
139 
140     /**
141      * Next leaf BPage (only if this BPage is a leaf)
142      */
143     protected long _next;
144 
145 
146     /**
147      * No-argument constructor used by serialization.
148      */
149     public BPage()
150     {
151         // empty
152     }
153 
154 
155     /**
156      * Root page overflow constructor
157      */
158     BPage( BTree btree, BPage root, BPage overflow )
159         throws IOException
160     {
161         _btree = btree;
162 
163         _isLeaf = false;
164 
165         _first = _btree._pageSize-2;
166 
167         _keys = new Object[ _btree._pageSize ];
168         _keys[ _btree._pageSize-2 ] = overflow.getLargestKey();
169         _keys[ _btree._pageSize-1 ] = root.getLargestKey();
170 
171         _children = new long[ _btree._pageSize ];
172         _children[ _btree._pageSize-2 ] = overflow._recid;
173         _children[ _btree._pageSize-1 ] = root._recid;
174 
175         _recid = _btree._recman.insert( this, this );
176     }
177 
178 
179     /**
180      * Root page (first insert) constructor.
181      */
182     BPage( BTree btree, Object key, Object value )
183         throws IOException
184     {
185         _btree = btree;
186 
187         _isLeaf = true;
188 
189         _first = btree._pageSize-2;
190 
191         _keys = new Object[ _btree._pageSize ];
192         _keys[ _btree._pageSize-2 ] = key;
193         _keys[ _btree._pageSize-1 ] = null;  // I am the root BPage for now
194 
195         _values = new Object[ _btree._pageSize ];
196         _values[ _btree._pageSize-2 ] = value;
197         _values[ _btree._pageSize-1 ] = null;  // I am the root BPage for now
198 
199         _recid = _btree._recman.insert( this, this );
200     }
201 
202 
203     /**
204      * Overflow page constructor.  Creates an empty BPage.
205      */
206     BPage( BTree btree, boolean isLeaf )
207         throws IOException
208     {
209         _btree = btree;
210 
211         _isLeaf = isLeaf;
212 
213         // page will initially be half-full
214         _first = _btree._pageSize/2;
215 
216         _keys = new Object[ _btree._pageSize ];
217         if ( isLeaf ) {
218             _values = new Object[ _btree._pageSize ];
219         } else {
220             _children = new long[ _btree._pageSize ];
221         }
222 
223         _recid = _btree._recman.insert( this, this );
224     }
225 
226 
227     /**
228      * Get largest key under this BPage.  Null is considered to be the
229      * greatest possible key.
230      */
231     Object getLargestKey()
232     {
233         return _keys[ _btree._pageSize-1 ];
234     }
235 
236 
237     /**
238      * Return true if BPage is empty.
239      */
240     boolean isEmpty()
241     {
242         if ( _isLeaf ) {
243             return ( _first == _values.length-1 );
244         } else {
245             return ( _first == _children.length-1 );
246         }
247     }
248 
249 
250     /**
251      * Return true if BPage is full.
252      */
253     boolean isFull() {
254         return ( _first == 0 );
255     }
256 
257 
258     /**
259      * Find the object associated with the given key.
260      *
261      * @param height Height of the current BPage (zero is leaf page)
262      * @param key The key
263      * @return TupleBrowser positionned just before the given key, or before
264      *                      next greater key if key isn't found.
265      */
266     TupleBrowser find( int height, Object key )
267         throws IOException
268     {
269         int index = findChildren( key );
270 
271         /*
272         if ( DEBUG ) {
273             System.out.println( "BPage.find() current: " + this
274                                 + " height: " + height);
275         }
276         */
277 
278         height -= 1;
279 
280         if ( height == 0 ) {
281             // leaf BPage
282             return new Browser( this, index );
283         } else {
284             // non-leaf BPage
285             BPage child = childBPage( index );
286             return child.find( height, key );
287         }
288     }
289 
290 
291     /**
292      * Find first entry and return a browser positioned before it.
293      *
294      * @return TupleBrowser positionned just before the first entry.
295      */
296     TupleBrowser findFirst()
297         throws IOException
298     {
299         if ( _isLeaf ) {
300             return new Browser( this, _first );
301         } else {
302             BPage child = childBPage( _first );
303             return child.findFirst();
304         }
305     }
306 
307 
308     /**
309      * Insert the given key and value.
310      * <p>
311      * Since the Btree does not support duplicate entries, the caller must
312      * specify whether to replace the existing value.
313      *
314      * @param height Height of the current BPage (zero is leaf page)
315      * @param key Insert key
316      * @param value Insert value
317      * @param replace Set to true to replace the existing value, if one exists.
318      * @return Insertion result containing existing value OR a BPage if the key
319      *         was inserted and provoked a BPage overflow.
320      */
321     InsertResult insert( int height, Object key, Object value, boolean replace )
322         throws IOException
323     {
324         InsertResult  result;
325         long          overflow;
326 
327         int index = findChildren( key );
328 
329         height -= 1;
330         if ( height == 0 )  {
331 
332             result = new InsertResult();
333 
334             // inserting on a leaf BPage
335             overflow = -1;
336             if ( DEBUG ) {
337                 System.out.println( "Bpage.insert() Insert on leaf Bpage key=" + key
338                                     + " value=" + value + " index="+index);
339             }
340             if ( compare( key, _keys[ index ] ) == 0 ) {
341                 // key already exists
342                 if ( DEBUG ) {
343                     System.out.println( "Bpage.insert() Key already exists." ) ;
344                 }
345                 result._existing = _values[ index ];
346                 if ( replace ) {
347                     _values [ index ] = value;
348                     _btree._recman.update( _recid, this, this );
349                 }
350                 // return the existing key
351                 return result;
352             }
353         } else {
354             // non-leaf BPage
355             BPage child = childBPage( index );
356             result = child.insert( height, key, value, replace );
357 
358             if ( result._existing != null ) {
359                 // return existing key, if any.
360                 return result;
361             }
362 
363             if ( result._overflow == null ) {
364                 // no overflow means we're done with insertion
365                 return result;
366             }
367 
368             // there was an overflow, we need to insert the overflow page
369             // on this BPage
370             if ( DEBUG ) {
371                 System.out.println( "BPage.insert() Overflow page: " + result._overflow._recid );
372             }
373             key = result._overflow.getLargestKey();
374             overflow = result._overflow._recid;
375 
376             // update child's largest key
377             _keys[ index ] = child.getLargestKey();
378 
379             // clean result so we can reuse it
380             result._overflow = null;
381         }
382 
383         // if we get here, we need to insert a new entry on the BPage
384         // before _children[ index ]
385         if ( !isFull() ) {
386             if ( height == 0 ) {
387                 insertEntry( this, index-1, key, value );
388             } else {
389                 insertChild( this, index-1, key, overflow );
390             }
391             _btree._recman.update( _recid, this, this );
392             return result;
393         }
394 
395         // page is full, we must divide the page
396         int half = _btree._pageSize >> 1;
397         BPage newPage = new BPage( _btree, _isLeaf );
398         if ( index < half ) {
399             // move lower-half of entries to overflow BPage,
400             // including new entry
401             if ( DEBUG ) {
402                 System.out.println( "Bpage.insert() move lower-half of entries to overflow BPage, including new entry." ) ;
403             }
404             if ( height == 0 ) {
405                 copyEntries( this, 0, newPage, half, index );
406                 setEntry( newPage, half+index, key, value );
407                 copyEntries( this, index, newPage, half+index+1, half-index-1 );
408             } else {
409                 copyChildren( this, 0, newPage, half, index );
410                 setChild( newPage, half+index, key, overflow );
411                 copyChildren( this, index, newPage, half+index+1, half-index-1 );
412             }
413         } else {
414             // move lower-half of entries to overflow BPage,
415             // new entry stays on this BPage
416             if ( DEBUG ) {
417                 System.out.println( "Bpage.insert() move lower-half of entries to overflow BPage. New entry stays" ) ;
418             }
419             if ( height == 0 ) {
420                 copyEntries( this, 0, newPage, half, half );
421                 copyEntries( this, half, this, half-1, index-half );
422                 setEntry( this, index-1, key, value );
423             } else {
424                 copyChildren( this, 0, newPage, half, half );
425                 copyChildren( this, half, this, half-1, index-half );
426                 setChild( this, index-1, key, overflow );
427             }
428         }
429 
430         _first = half-1;
431 
432         // nullify lower half of entries
433         for ( int i=0; i<_first; i++ ) {
434             if ( height == 0 ) {
435                 setEntry( this, i, null, null );
436             } else {
437                 setChild( this, i, null, -1 );
438             }
439         }
440 
441         if ( _isLeaf ) {
442             // link newly created BPage
443             newPage._previous = _previous;
444             newPage._next = _recid;
445             if ( _previous != 0 ) {
446                 BPage previous = loadBPage( _previous );
447                 previous._next = newPage._recid;
448                 _btree._recman.update( _previous, previous, this );
449             }
450             _previous = newPage._recid;
451         }
452 
453         _btree._recman.update( _recid, this, this );
454         _btree._recman.update( newPage._recid, newPage, this );
455 
456         result._overflow = newPage;
457         return result;
458     }
459 
460 
461     /**
462      * Remove the entry associated with the given key.
463      *
464      * @param height Height of the current BPage (zero is leaf page)
465      * @param key Removal key
466      * @return Remove result object
467      */
468     RemoveResult remove( int height, Object key )
469         throws IOException
470     {
471         RemoveResult result;
472 
473         int half = _btree._pageSize / 2;
474         int index = findChildren( key );
475 
476         height -= 1;
477         if ( height == 0 ) {
478             // remove leaf entry
479             if ( compare( _keys[ index ], key ) != 0 ) {
480                 throw new IllegalArgumentException( "Key not found: " + key );
481             }
482             result = new RemoveResult();
483             result._value = _values[ index ];
484             removeEntry( this, index );
485 
486             // update this BPage
487             _btree._recman.update( _recid, this, this );
488 
489         } else {
490             // recurse into Btree to remove entry on a children page
491             BPage child = childBPage( index );
492             result = child.remove( height, key );
493 
494             // update children
495             _keys[ index ] = child.getLargestKey();
496             _btree._recman.update( _recid, this, this );
497 
498             if ( result._underflow ) {
499                 // underflow occured
500                 if ( child._first != half+1 ) {
501                     throw new IllegalStateException( "Error during underflow [1]" );
502                 }
503                 if ( index < _children.length-1 ) {
504                     // exists greater brother page
505                     BPage brother = childBPage( index+1 );
506                     int bfirst = brother._first;
507                     if ( bfirst < half ) {
508                         // steal entries from "brother" page
509                         int steal = ( half - bfirst + 1 ) / 2;
510                         brother._first += steal;
511                         child._first -= steal;
512                         if ( child._isLeaf ) {
513                             copyEntries( child, half+1, child, half+1-steal, half-1 );
514                             copyEntries( brother, bfirst, child, 2*half-steal, steal );
515                         } else {
516                             copyChildren( child, half+1, child, half+1-steal, half-1 );
517                             copyChildren( brother, bfirst, child, 2*half-steal, steal );
518                         }                            
519 
520                         for ( int i=bfirst; i<bfirst+steal; i++ ) {
521                             if ( brother._isLeaf ) {
522                                 setEntry( brother, i, null, null );
523                             } else {
524                                 setChild( brother, i, null, -1 );
525                             }
526                         }
527 
528                         // update child's largest key
529                         _keys[ index ] = child.getLargestKey();
530 
531                         // no change in previous/next BPage
532 
533                         // update BPages
534                         _btree._recman.update( _recid, this, this );
535                         _btree._recman.update( brother._recid, brother, this );
536                         _btree._recman.update( child._recid, child, this );
537 
538                     } else {
539                         // move all entries from page "child" to "brother"
540                         if ( brother._first != half ) {
541                             throw new IllegalStateException( "Error during underflow [2]" );
542                         }
543 
544                         brother._first = 1;
545                         if ( child._isLeaf ) {
546                             copyEntries( child, half+1, brother, 1, half-1 );
547                         } else {
548                             copyChildren( child, half+1, brother, 1, half-1 );
549                         }
550                         _btree._recman.update( brother._recid, brother, this );
551 
552                         // remove "child" from current BPage
553                         if ( _isLeaf ) {
554                             copyEntries( this, _first, this, _first+1, index-_first );
555                             setEntry( this, _first, null, null );
556                         } else {
557                             copyChildren( this, _first, this, _first+1, index-_first );
558                             setChild( this, _first, null, -1 );
559                         }
560                         _first += 1;
561                         _btree._recman.update( _recid, this, this );
562 
563                         // re-link previous and next BPages
564                         if ( child._previous != 0 ) {
565                             BPage prev = loadBPage( child._previous );
566                             prev._next = child._next;
567                             _btree._recman.update( prev._recid, prev, this );
568                         }
569                         if ( child._next != 0 ) {
570                             BPage next = loadBPage( child._next );
571                             next._previous = child._previous;
572                             _btree._recman.update( next._recid, next, this );
573                         }
574 
575                         // delete "child" BPage
576                         _btree._recman.delete( child._recid );
577                     }
578                 } else {
579                     // page "brother" is before "child"
580                     BPage brother = childBPage( index-1 );
581                     int bfirst = brother._first;
582                     if ( bfirst < half ) {
583                         // steal entries from "brother" page
584                         int steal = ( half - bfirst + 1 ) / 2;
585                         brother._first += steal;
586                         child._first -= steal;
587                         if ( child._isLeaf ) {
588                             copyEntries( brother, 2*half-steal, child,
589                                          half+1-steal, steal );
590                             copyEntries( brother, bfirst, brother,
591                                          bfirst+steal, 2*half-bfirst-steal );
592                         } else {
593                             copyChildren( brother, 2*half-steal, child,
594                                           half+1-steal, steal );
595                             copyChildren( brother, bfirst, brother,
596                                           bfirst+steal, 2*half-bfirst-steal );
597                         }
598 
599                         for ( int i=bfirst; i<bfirst+steal; i++ ) {
600                             if ( brother._isLeaf ) {
601                                 setEntry( brother, i, null, null );
602                             } else {
603                                 setChild( brother, i, null, -1 );
604                             }
605                         }
606 
607                         // update brother's largest key
608                         _keys[ index-1 ] = brother.getLargestKey();
609 
610                         // no change in previous/next BPage
611 
612                         // update BPages
613                         _btree._recman.update( _recid, this, this );
614                         _btree._recman.update( brother._recid, brother, this );
615                         _btree._recman.update( child._recid, child, this );
616 
617                     } else {
618                         // move all entries from page "brother" to "child"
619                         if ( brother._first != half ) {
620                             throw new IllegalStateException( "Error during underflow [3]" );
621                         }
622 
623                         child._first = 1;
624                         if ( child._isLeaf ) {
625                             copyEntries( brother, half, child, 1, half );
626                         } else {
627                             copyChildren( brother, half, child, 1, half );
628                         }
629                         _btree._recman.update( child._recid, child, this );
630 
631                         // remove "brother" from current BPage
632                         if ( _isLeaf ) {
633                             copyEntries( this, _first, this, _first+1, index-1-_first );
634                             setEntry( this, _first, null, null );
635                         } else {
636                             copyChildren( this, _first, this, _first+1, index-1-_first );
637                             setChild( this, _first, null, -1 );
638                         }
639                         _first += 1;
640                         _btree._recman.update( _recid, this, this );
641 
642                         // re-link previous and next BPages
643                         if ( brother._previous != 0 ) {
644                             BPage prev = loadBPage( brother._previous );
645                             prev._next = brother._next;
646                             _btree._recman.update( prev._recid, prev, this );
647                         }
648                         if ( brother._next != 0 ) {
649                             BPage next = loadBPage( brother._next );
650                             next._previous = brother._previous;
651                             _btree._recman.update( next._recid, next, this );
652                         }
653 
654                         // delete "brother" BPage
655                         _btree._recman.delete( brother._recid );
656                     }
657                 }
658             }
659         }
660 
661         // underflow if page is more than half-empty
662         result._underflow = _first > half;
663 
664         return result;
665     }
666 
667 
668     /**
669      * Find the first children node with a key equal or greater than the given
670      * key.
671      *
672      * @return index of first children with equal or greater key.
673      */
674     private int findChildren( Object key )
675     {
676         int left = _first;
677         int right = _btree._pageSize-1;
678 
679         // binary search
680         while ( left < right )  {
681             int middle = ( left + right ) / 2;
682             if ( compare( _keys[ middle ], key ) < 0 ) {
683                 left = middle+1;
684             } else {
685                 right = middle;
686             }
687         }
688         return right;
689     }
690 
691 
692     /**
693      * Insert entry at given position.
694      */
695     private static void insertEntry( BPage page, int index,
696                                      Object key, Object value )
697     {
698         Object[] keys = page._keys;
699         Object[] values = page._values;
700         int start = page._first;
701         int count = index-page._first+1;
702 
703         // shift entries to the left
704         System.arraycopy( keys, start, keys, start-1, count );
705         System.arraycopy( values, start, values, start-1, count );
706         page._first -= 1;
707         keys[ index ] = key;
708         values[ index ] = value;
709     }
710 
711 
712     /**
713      * Insert child at given position.
714      */
715     private static void insertChild( BPage page, int index,
716                                      Object key, long child )
717     {
718         Object[] keys = page._keys;
719         long[] children = page._children;
720         int start = page._first;
721         int count = index-page._first+1;
722 
723         // shift entries to the left
724         System.arraycopy( keys, start, keys, start-1, count );
725         System.arraycopy( children, start, children, start-1, count );
726         page._first -= 1;
727         keys[ index ] = key;
728         children[ index ] = child;
729     }
730     
731     /**
732      * Remove entry at given position.
733      */
734     private static void removeEntry( BPage page, int index )
735     {
736         Object[] keys = page._keys;
737         Object[] values = page._values;
738         int start = page._first;
739         int count = index-page._first;
740 
741         System.arraycopy( keys, start, keys, start+1, count );
742         keys[ start ] = null;
743         System.arraycopy( values, start, values, start+1, count );
744         values[ start ] = null;
745         page._first++;
746     }
747 
748 
749     /**
750      * Remove child at given position.
751      */
752 /*    
753     private static void removeChild( BPage page, int index )
754     {
755         Object[] keys = page._keys;
756         long[] children = page._children;
757         int start = page._first;
758         int count = index-page._first;
759 
760         System.arraycopy( keys, start, keys, start+1, count );
761         keys[ start ] = null;
762         System.arraycopy( children, start, children, start+1, count );
763         children[ start ] = (long) -1;
764         page._first++;
765     }
766 */
767     
768     /**
769      * Set the entry at the given index.
770      */
771     private static void setEntry( BPage page, int index, Object key, Object value )
772     {
773         page._keys[ index ] = key;
774         page._values[ index ] = value;
775     }
776 
777 
778     /**
779      * Set the child BPage recid at the given index.
780      */
781     private static void setChild( BPage page, int index, Object key, long recid )
782     {
783         page._keys[ index ] = key;
784         page._children[ index ] = recid;
785     }
786     
787     
788     /**
789      * Copy entries between two BPages
790      */
791     private static void copyEntries( BPage source, int indexSource,
792                                      BPage dest, int indexDest, int count )
793     {
794         System.arraycopy( source._keys, indexSource, dest._keys, indexDest, count);
795         System.arraycopy( source._values, indexSource, dest._values, indexDest, count);
796     }
797 
798 
799     /**
800      * Copy child BPage recids between two BPages
801      */
802     private static void copyChildren( BPage source, int indexSource,
803                                       BPage dest, int indexDest, int count )
804     {
805         System.arraycopy( source._keys, indexSource, dest._keys, indexDest, count);
806         System.arraycopy( source._children, indexSource, dest._children, indexDest, count);
807     }
808 
809     
810     /**
811      * Return the child BPage at given index.
812      */
813     BPage childBPage( int index )
814         throws IOException
815     {
816         return loadBPage( _children[ index ] );
817     }
818 
819 
820     /**
821      * Load the BPage at the given recid.
822      */
823     private BPage loadBPage( long recid )
824         throws IOException
825     {
826         BPage child = (BPage) _btree._recman.fetch( recid, this );
827         child._recid = recid;
828         child._btree = _btree;
829         return child;
830     }
831 
832     
833     private final int compare( Object value1, Object value2 )
834     {
835         if ( value1 == null ) {
836             return 1;
837         }
838         if ( value2 == null ) {
839             return -1;
840         }
841         return _btree._comparator.compare( value1, value2 );
842     }
843 
844     static byte[] readByteArray( ObjectInput in )
845         throws IOException
846     {
847         int len = in.readInt();
848         if ( len < 0 ) {
849             return null;
850         }
851         byte[] buf = new byte[ len ];
852         in.readFully( buf );
853         return buf;
854     }
855 
856 
857     static void writeByteArray( ObjectOutput out, byte[] buf )
858         throws IOException
859     {
860         if ( buf == null ) {
861             out.writeInt( -1 );
862         } else {
863             out.writeInt( buf.length );
864             out.write( buf );
865         }
866     }
867 
868     /**
869      * Dump the structure of the tree on the screen.  This is used for debugging
870      * purposes only.
871      */
872     private void dump( int height )
873     {
874         String prefix = "";
875         for ( int i=0; i<height; i++ ) {
876            prefix += "    ";
877         }
878         System.out.println( prefix + "-------------------------------------- BPage recid=" + _recid);
879         System.out.println( prefix + "first=" + _first );
880         for ( int i=0; i< _btree._pageSize; i++ ) {
881             if ( _isLeaf ) {
882                 System.out.println( prefix + "BPage [" + i + "] " + _keys[ i ] + " " + _values[ i ] );
883             } else {
884                 System.out.println( prefix + "BPage [" + i + "] " + _keys[ i ] + " " + _children[ i ] );
885             }
886         }
887         System.out.println( prefix + "--------------------------------------" );
888     }
889 
890 
891     /**
892      * Recursively dump the state of the BTree on screen.  This is used for
893      * debugging purposes only.
894      */
895     void dumpRecursive( int height, int level )
896         throws IOException
897     {
898         height -= 1;
899         level += 1;
900         if ( height > 0 ) {
901             for ( int i=_first; i<_btree._pageSize; i++ ) {
902                 if ( _keys[ i ] == null ) break;
903                 BPage child = childBPage( i );
904                 child.dump( level );
905                 child.dumpRecursive( height, level );
906             }
907         }
908     }
909 
910 
911     /**
912      * Assert the ordering of the keys on the BPage.  This is used for testing
913      * purposes only.
914      */
915     private void assertConsistency()
916     {
917         for ( int i=_first; i<_btree._pageSize-1; i++ ) {
918             if ( compare( (byte[]) _keys[ i ], (byte[]) _keys[ i+1 ] ) >= 0 ) {
919                 dump( 0 );
920                 throw new Error( "BPage not ordered" );
921             }
922         }
923     }
924 
925 
926     /**
927      * Recursively assert the ordering of the BPage entries on this page
928      * and sub-pages.  This is used for testing purposes only.
929      */
930     void assertConsistencyRecursive( int height ) 
931         throws IOException 
932     {
933         assertConsistency();
934         if ( --height > 0 ) {
935             for ( int i=_first; i<_btree._pageSize; i++ ) {
936                 if ( _keys[ i ] == null ) break;
937                 BPage child = childBPage( i );
938                 if ( compare( (byte[]) _keys[ i ], child.getLargestKey() ) != 0 ) {
939                     dump( 0 );
940                     child.dump( 0 );
941                     throw new Error( "Invalid child subordinate key" );
942                 }
943                 child.assertConsistencyRecursive( height );
944             }
945         }
946     }
947 
948 
949     /**
950      * Deserialize the content of an object from a byte array.
951      *
952      * @param serialized Byte array representation of the object
953      * @return deserialized object
954      *
955      */
956     public Object deserialize( byte[] serialized ) 
957         throws IOException
958     {
959         ByteArrayInputStream  bais;
960         ObjectInputStream     ois;
961         BPage                 bpage;
962 
963         bpage = new BPage();
964         bais = new ByteArrayInputStream( serialized );
965         ois = new ObjectInputStream( bais );
966         
967         bpage._isLeaf = ois.readBoolean();
968         if ( bpage._isLeaf ) {
969             bpage._previous = ois.readLong();
970             bpage._next = ois.readLong();
971         }
972 
973         bpage._first = ois.readInt();
974 
975         bpage._keys = new Object[ _btree._pageSize ];
976         try {
977             for ( int i=bpage._first; i<_btree._pageSize; i++ ) {
978                 if ( _btree._keySerializer == null ) {
979                     bpage._keys[ i ] = ois.readObject();
980                 } else {
981                     serialized = readByteArray( ois );
982                     if ( serialized != null ) {
983                         bpage._keys[ i ] = _btree._keySerializer.deserialize( serialized );
984                     }
985                 }
986             }
987         } catch ( ClassNotFoundException except ) {
988             throw new IOException( except.getMessage() );
989         }
990         
991         if ( bpage._isLeaf ) {
992             bpage._values = new Object[ _btree._pageSize ];
993             try {
994                 for ( int i=bpage._first; i<_btree._pageSize; i++ ) {
995                     if ( _btree._valueSerializer == null ) {
996                         bpage._values[ i ] = ois.readObject();
997                     } else {
998                         serialized = readByteArray( ois );
999                         if ( serialized != null ) {
1000                             bpage._values[ i ] = _btree._valueSerializer.deserialize( serialized );
1001                         }
1002                     }
1003                 }
1004             } catch ( ClassNotFoundException except ) {
1005                 throw new IOException( except.getMessage() );
1006             }
1007         } else {
1008             bpage._children = new long[ _btree._pageSize ];
1009             for ( int i=bpage._first; i<_btree._pageSize; i++ ) {
1010                 bpage._children[ i ] = ois.readLong();
1011             }
1012         }
1013         ois.close();
1014         bais.close();
1015         
1016         return bpage;
1017     }
1018 
1019     
1020     /** 
1021      * Serialize the content of an object into a byte array.
1022      *
1023      * @param obj Object to serialize
1024      * @return a byte array representing the object's state
1025      *
1026      */
1027     public byte[] serialize( Object obj ) 
1028         throws IOException
1029     {
1030         byte[]                 serialized;
1031         ByteArrayOutputStream  baos;
1032         ObjectOutputStream     oos;
1033         BPage                  bpage;
1034         byte[]                 data;
1035         
1036         // note:  It is assumed that BPage instance doing the serialization is the parent
1037         // of the BPage object being serialized.
1038         
1039         bpage = (BPage) obj;
1040         baos = new ByteArrayOutputStream();
1041         oos = new ObjectOutputStream( baos );        
1042         
1043         oos.writeBoolean( bpage._isLeaf );
1044         if ( bpage._isLeaf ) {
1045             oos.writeLong( bpage._previous );
1046             oos.writeLong( bpage._next );
1047         }
1048 
1049         oos.writeInt( bpage._first );
1050         
1051         for ( int i=bpage._first; i<_btree._pageSize; i++ ) {
1052             if ( _btree._keySerializer == null ) {
1053                 oos.writeObject( bpage._keys[ i ] );
1054             } else {
1055                 if ( bpage._keys[ i ] != null ) {
1056                     serialized = _btree._keySerializer.serialize( bpage._keys[ i ] );
1057                     writeByteArray( oos, serialized );
1058                 } else {
1059                     writeByteArray( oos, null );
1060                 }
1061             }
1062         }
1063 
1064         if ( bpage._isLeaf ) {
1065             for ( int i=bpage._first; i<_btree._pageSize; i++ ) {
1066                 if ( _btree._valueSerializer == null ) {
1067                     oos.writeObject( bpage._values[ i ] );
1068                 } else {
1069                     if ( bpage._values[ i ] != null ) {
1070                         serialized = _btree._valueSerializer.serialize( bpage._values[ i ] );
1071                         writeByteArray( oos, serialized );
1072                     } else {
1073                         writeByteArray( oos, null );
1074                     }
1075                 }
1076             }
1077         } else {
1078             for ( int i=bpage._first; i<_btree._pageSize; i++ ) {
1079                 oos.writeLong( bpage._children[ i ] );
1080             }
1081         }
1082         
1083         oos.flush();
1084         data = baos.toByteArray();
1085         oos.close();
1086         baos.close();
1087         return data;
1088     }
1089     
1090     
1091     /** STATIC INNER CLASS
1092      *  Result from insert() method call
1093      */
1094     static class InsertResult {
1095 
1096         /**
1097          * Overflow page.
1098          */
1099         BPage _overflow;
1100 
1101         /**
1102          * Existing value for the insertion key.
1103          */
1104         Object _existing;
1105 
1106     }
1107 
1108     /** STATIC INNER CLASS
1109      *  Result from remove() method call
1110      */
1111     static class RemoveResult {
1112 
1113         /**
1114          * Set to true if underlying pages underflowed
1115          */
1116         boolean _underflow;
1117 
1118         /**
1119          * Removed entry value
1120          */
1121         Object _value;
1122     }
1123 
1124 
1125     /** PRIVATE INNER CLASS
1126      * Browser to traverse leaf BPages.
1127      */
1128     static class Browser
1129         extends TupleBrowser
1130     {
1131 
1132         /**
1133          * Current page.
1134          */
1135         private BPage _page;
1136 
1137 
1138         /**
1139          * Current index in the page.  The index positionned on the next
1140          * tuple to return.
1141          */
1142         private int _index;
1143 
1144 
1145         /**
1146          * Create a browser.
1147          *
1148          * @param page Current page
1149          * @param index Position of the next tuple to return.
1150          */
1151         Browser( BPage page, int index )
1152         {
1153             _page = page;
1154             _index = index;
1155         }
1156 
1157         public boolean getNext( Tuple tuple )
1158             throws IOException
1159         {
1160             if ( _index < _page._btree._pageSize ) {
1161                 if ( _page._keys[ _index ] == null ) {
1162                     // reached end of the tree.
1163                     return false;
1164                 }
1165             } else if ( _page._next != 0 ) {
1166                 // move to next page
1167                 _page = _page.loadBPage( _page._next );
1168                 _index = _page._first;
1169             }
1170             tuple.setKey( _page._keys[ _index ] );
1171             tuple.setValue( _page._values[ _index ] );
1172             _index++;
1173             return true;
1174         }
1175 
1176         public boolean getPrevious( Tuple tuple )
1177             throws IOException
1178         {
1179             if ( _index == _page._first ) {
1180 
1181                 if ( _page._previous != 0 ) {
1182                     _page = _page.loadBPage( _page._previous );
1183                     _index = _page._btree._pageSize;
1184                 } else {
1185                     // reached beginning of the tree
1186                     return false;
1187                 }
1188             }
1189             _index--;
1190             tuple.setKey( _page._keys[ _index ] );
1191             tuple.setValue( _page._values[ _index ] );
1192             return true;
1193 
1194         }
1195     }
1196 
1197 }