2323import java .util .LinkedList ;
2424import java .util .List ;
2525
26+ import com .google .common .annotations .VisibleForTesting ;
27+
2628import org .apache .spark .unsafe .*;
2729import org .apache .spark .unsafe .array .ByteArrayMethods ;
2830import org .apache .spark .unsafe .array .LongArray ;
3638 * This is backed by a power-of-2-sized hash table, using quadratic probing with triangular numbers,
3739 * which is guaranteed to exhaust the space.
3840 * <p>
39- * The map can support up to 2^31 keys because we use 32 bit MurmurHash. If the key cardinality is
40- * higher than this, you should probably be using sorting instead of hashing for better cache
41- * locality.
41+ * The map can support up to 2^29 keys. If the key cardinality is higher than this, you should
42+ * probably be using sorting instead of hashing for better cache locality.
4243 * <p>
4344 * This class is not thread safe.
4445 */
@@ -48,6 +49,11 @@ public final class BytesToBytesMap {
4849
4950 private static final HashMapGrowthStrategy growthStrategy = HashMapGrowthStrategy .DOUBLING ;
5051
52+ /**
53+ * Special record length that is placed after the last record in a data page.
54+ */
55+ private static final int END_OF_PAGE_MARKER = -1 ;
56+
5157 private final TaskMemoryManager memoryManager ;
5258
5359 /**
@@ -64,7 +70,7 @@ public final class BytesToBytesMap {
6470
6571 /**
6672 * Offset into `currentDataPage` that points to the location where new data can be inserted into
67- * the page.
73+ * the page. This does not incorporate the page's base offset.
6874 */
6975 private long pageCursor = 0 ;
7076
@@ -74,6 +80,15 @@ public final class BytesToBytesMap {
7480 */
7581 private static final long PAGE_SIZE_BYTES = 1L << 26 ; // 64 megabytes
7682
83+ /**
84+ * The maximum number of keys that BytesToBytesMap supports. The hash table has to be
85+ * power-of-2-sized and its backing Java array can contain at most (1 << 30) elements, since
86+ * that's the largest power-of-2 that's less than Integer.MAX_VALUE. We need two long array
87+ * entries per key, giving us a maximum capacity of (1 << 29).
88+ */
89+ @ VisibleForTesting
90+ static final int MAX_CAPACITY = (1 << 29 );
91+
7792 // This choice of page table size and page size means that we can address up to 500 gigabytes
7893 // of memory.
7994
@@ -143,6 +158,13 @@ public BytesToBytesMap(
143158 this .loadFactor = loadFactor ;
144159 this .loc = new Location ();
145160 this .enablePerfMetrics = enablePerfMetrics ;
161+ if (initialCapacity <= 0 ) {
162+ throw new IllegalArgumentException ("Initial capacity must be greater than 0" );
163+ }
164+ if (initialCapacity > MAX_CAPACITY ) {
165+ throw new IllegalArgumentException (
166+ "Initial capacity " + initialCapacity + " exceeds maximum capacity of " + MAX_CAPACITY );
167+ }
146168 allocate (initialCapacity );
147169 }
148170
@@ -162,6 +184,55 @@ public BytesToBytesMap(
162184 */
163185 public int size () { return size ; }
164186
187+ private static final class BytesToBytesMapIterator implements Iterator <Location > {
188+
189+ private final int numRecords ;
190+ private final Iterator <MemoryBlock > dataPagesIterator ;
191+ private final Location loc ;
192+
193+ private int currentRecordNumber = 0 ;
194+ private Object pageBaseObject ;
195+ private long offsetInPage ;
196+
197+ BytesToBytesMapIterator (int numRecords , Iterator <MemoryBlock > dataPagesIterator , Location loc ) {
198+ this .numRecords = numRecords ;
199+ this .dataPagesIterator = dataPagesIterator ;
200+ this .loc = loc ;
201+ if (dataPagesIterator .hasNext ()) {
202+ advanceToNextPage ();
203+ }
204+ }
205+
206+ private void advanceToNextPage () {
207+ final MemoryBlock currentPage = dataPagesIterator .next ();
208+ pageBaseObject = currentPage .getBaseObject ();
209+ offsetInPage = currentPage .getBaseOffset ();
210+ }
211+
212+ @ Override
213+ public boolean hasNext () {
214+ return currentRecordNumber != numRecords ;
215+ }
216+
217+ @ Override
218+ public Location next () {
219+ int keyLength = (int ) PlatformDependent .UNSAFE .getLong (pageBaseObject , offsetInPage );
220+ if (keyLength == END_OF_PAGE_MARKER ) {
221+ advanceToNextPage ();
222+ keyLength = (int ) PlatformDependent .UNSAFE .getLong (pageBaseObject , offsetInPage );
223+ }
224+ loc .with (pageBaseObject , offsetInPage );
225+ offsetInPage += 8 + 8 + keyLength + loc .getValueLength ();
226+ currentRecordNumber ++;
227+ return loc ;
228+ }
229+
230+ @ Override
231+ public void remove () {
232+ throw new UnsupportedOperationException ();
233+ }
234+ }
235+
165236 /**
166237 * Returns an iterator for iterating over the entries of this map.
167238 *
@@ -171,27 +242,7 @@ public BytesToBytesMap(
171242 * `lookup()`, the behavior of the returned iterator is undefined.
172243 */
173244 public Iterator <Location > iterator () {
174- return new Iterator <Location >() {
175-
176- private int nextPos = bitset .nextSetBit (0 );
177-
178- @ Override
179- public boolean hasNext () {
180- return nextPos != -1 ;
181- }
182-
183- @ Override
184- public Location next () {
185- final int pos = nextPos ;
186- nextPos = bitset .nextSetBit (nextPos + 1 );
187- return loc .with (pos , 0 , true );
188- }
189-
190- @ Override
191- public void remove () {
192- throw new UnsupportedOperationException ();
193- }
194- };
245+ return new BytesToBytesMapIterator (size , dataPages .iterator (), loc );
195246 }
196247
197248 /**
@@ -268,8 +319,11 @@ public final class Location {
268319 private int valueLength ;
269320
270321 private void updateAddressesAndSizes (long fullKeyAddress ) {
271- final Object page = memoryManager .getPage (fullKeyAddress );
272- final long keyOffsetInPage = memoryManager .getOffsetInPage (fullKeyAddress );
322+ updateAddressesAndSizes (
323+ memoryManager .getPage (fullKeyAddress ), memoryManager .getOffsetInPage (fullKeyAddress ));
324+ }
325+
326+ private void updateAddressesAndSizes (Object page , long keyOffsetInPage ) {
273327 long position = keyOffsetInPage ;
274328 keyLength = (int ) PlatformDependent .UNSAFE .getLong (page , position );
275329 position += 8 ; // word used to store the key size
@@ -291,6 +345,12 @@ Location with(int pos, int keyHashcode, boolean isDefined) {
291345 return this ;
292346 }
293347
348+ Location with (Object page , long keyOffsetInPage ) {
349+ this .isDefined = true ;
350+ updateAddressesAndSizes (page , keyOffsetInPage );
351+ return this ;
352+ }
353+
294354 /**
295355 * Returns true if the key is defined at this position, and false otherwise.
296356 */
@@ -345,6 +405,8 @@ public int getValueLength() {
345405 * <p>
346406 * It is only valid to call this method immediately after calling `lookup()` using the same key.
347407 * <p>
408+ * The key and value must be word-aligned (that is, their sizes must multiples of 8).
409+ * <p>
348410 * After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length`
349411 * will return information on the data stored by this `putNewKey` call.
350412 * <p>
@@ -370,17 +432,27 @@ public void putNewKey(
370432 isDefined = true ;
371433 assert (keyLengthBytes % 8 == 0 );
372434 assert (valueLengthBytes % 8 == 0 );
435+ if (size == MAX_CAPACITY ) {
436+ throw new IllegalStateException ("BytesToBytesMap has reached maximum capacity" );
437+ }
373438 // Here, we'll copy the data into our data pages. Because we only store a relative offset from
374439 // the key address instead of storing the absolute address of the value, the key and value
375440 // must be stored in the same memory page.
376441 // (8 byte key length) (key) (8 byte value length) (value)
377442 final long requiredSize = 8 + keyLengthBytes + 8 + valueLengthBytes ;
378- assert (requiredSize <= PAGE_SIZE_BYTES );
443+ assert (requiredSize <= PAGE_SIZE_BYTES - 8 ); // Reserve 8 bytes for the end-of-page marker.
379444 size ++;
380445 bitset .set (pos );
381446
382- // If there's not enough space in the current page, allocate a new page:
383- if (currentDataPage == null || PAGE_SIZE_BYTES - pageCursor < requiredSize ) {
447+ // If there's not enough space in the current page, allocate a new page (8 bytes are reserved
448+ // for the end-of-page marker).
449+ if (currentDataPage == null || PAGE_SIZE_BYTES - 8 - pageCursor < requiredSize ) {
450+ if (currentDataPage != null ) {
451+ // There wasn't enough space in the current page, so write an end-of-page marker:
452+ final Object pageBaseObject = currentDataPage .getBaseObject ();
453+ final long lengthOffsetInPage = currentDataPage .getBaseOffset () + pageCursor ;
454+ PlatformDependent .UNSAFE .putLong (pageBaseObject , lengthOffsetInPage , END_OF_PAGE_MARKER );
455+ }
384456 MemoryBlock newPage = memoryManager .allocatePage (PAGE_SIZE_BYTES );
385457 dataPages .add (newPage );
386458 pageCursor = 0 ;
@@ -414,7 +486,7 @@ public void putNewKey(
414486 longArray .set (pos * 2 + 1 , keyHashcode );
415487 updateAddressesAndSizes (storedKeyAddress );
416488 isDefined = true ;
417- if (size > growthThreshold ) {
489+ if (size > growthThreshold && longArray . size () < MAX_CAPACITY ) {
418490 growAndRehash ();
419491 }
420492 }
@@ -427,8 +499,11 @@ public void putNewKey(
427499 * @param capacity the new map capacity
428500 */
429501 private void allocate (int capacity ) {
430- capacity = Math .max ((int ) Math .min (Integer .MAX_VALUE , nextPowerOf2 (capacity )), 64 );
431- longArray = new LongArray (memoryManager .allocate (capacity * 8 * 2 ));
502+ assert (capacity >= 0 );
503+ // The capacity needs to be divisible by 64 so that our bit set can be sized properly
504+ capacity = Math .max ((int ) Math .min (MAX_CAPACITY , nextPowerOf2 (capacity )), 64 );
505+ assert (capacity <= MAX_CAPACITY );
506+ longArray = new LongArray (memoryManager .allocate (capacity * 8L * 2 ));
432507 bitset = new BitSet (MemoryBlock .fromLongArray (new long [capacity / 64 ]));
433508
434509 this .growthThreshold = (int ) (capacity * loadFactor );
@@ -494,10 +569,16 @@ public long getNumHashCollisions() {
494569 return numHashCollisions ;
495570 }
496571
572+ @ VisibleForTesting
573+ int getNumDataPages () {
574+ return dataPages .size ();
575+ }
576+
497577 /**
498578 * Grows the size of the hash table and re-hash everything.
499579 */
500- private void growAndRehash () {
580+ @ VisibleForTesting
581+ void growAndRehash () {
501582 long resizeStartTime = -1 ;
502583 if (enablePerfMetrics ) {
503584 resizeStartTime = System .nanoTime ();
@@ -508,7 +589,7 @@ private void growAndRehash() {
508589 final int oldCapacity = (int ) oldBitSet .capacity ();
509590
510591 // Allocate the new data structures
511- allocate (Math .min (Integer . MAX_VALUE , growthStrategy .nextCapacity (oldCapacity )));
592+ allocate (Math .min (growthStrategy .nextCapacity (oldCapacity ), MAX_CAPACITY ));
512593
513594 // Re-mask (we don't recompute the hashcode because we stored all 32 bits of it)
514595 for (int pos = oldBitSet .nextSetBit (0 ); pos >= 0 ; pos = oldBitSet .nextSetBit (pos + 1 )) {
0 commit comments