diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 4e5ebc402be35..cb23a39705b12 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -173,18 +173,35 @@ public BytesToBytesMap( public Iterator iterator() { return new Iterator() { - private int nextPos = bitset.nextSetBit(0); + private int cur = 0; + + private int pageCur = 0; + + private MemoryBlock currentPage; + + private long addr; @Override public boolean hasNext() { - return nextPos != -1; + return cur != size; } @Override public Location next() { - final int pos = nextPos; - nextPos = bitset.nextSetBit(nextPos + 1); - return loc.with(pos, 0, true); + if (currentPage == null) { + currentPage = dataPages.get(pageCur++); + addr = currentPage.getBaseOffset(); + } + long keySize = PlatformDependent.UNSAFE.getLong(memoryManager.getPage(addr), addr); + if (keySize == -1L) { + currentPage = dataPages.get(pageCur++); + addr = currentPage.getBaseOffset(); + } + loc.with(addr, true); + addr += keySize + 8; + addr += PlatformDependent.UNSAFE.getLong(memoryManager.getPage(addr), addr) + 8; + cur++; + return loc; } @Override @@ -291,6 +308,14 @@ Location with(int pos, int keyHashcode, boolean isDefined) { return this; } + Location with(long fullKeyAddress, boolean isDefined) { + this.isDefined = isDefined; + if (isDefined) { + updateAddressesAndSizes(fullKeyAddress); + } + return this; + } + /** * Returns true if the key is defined at this position, and false otherwise. */ @@ -380,7 +405,13 @@ public void putNewKey( bitset.set(pos); // If there's not enough space in the current page, allocate a new page: - if (currentDataPage == null || PAGE_SIZE_BYTES - pageCursor < requiredSize) { + if (currentDataPage == null || PAGE_SIZE_BYTES - pageCursor - 8 < requiredSize) { + if (currentDataPage != null) { + final Object pageBaseObject = currentDataPage.getBaseObject(); + final long pageBaseOffset = currentDataPage.getBaseOffset(); + final long lengthOffsetInPage = pageBaseOffset + pageCursor; + PlatformDependent.UNSAFE.putLong(pageBaseObject, lengthOffsetInPage, -1L); + } MemoryBlock newPage = memoryManager.allocatePage(PAGE_SIZE_BYTES); dataPages.add(newPage); pageCursor = 0; diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index 7a5c0622d1ffb..bfa5780ae9c0a 100644 --- a/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -167,14 +167,25 @@ public void iteratorTest() throws Exception { final BytesToBytesMap.Location loc = map.lookup(value, PlatformDependent.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); - loc.putNewKey( - value, - PlatformDependent.LONG_ARRAY_OFFSET, - 8, - value, - PlatformDependent.LONG_ARRAY_OFFSET, - 8 - ); + if (i % 5 == 0) { + loc.putNewKey( + value, + PlatformDependent.LONG_ARRAY_OFFSET, + 0, + value, + PlatformDependent.LONG_ARRAY_OFFSET, + 8 + ); + } else { + loc.putNewKey( + value, + PlatformDependent.LONG_ARRAY_OFFSET, + 8, + value, + PlatformDependent.LONG_ARRAY_OFFSET, + 8 + ); + } } final java.util.BitSet valuesSeen = new java.util.BitSet(size); final Iterator iter = map.iterator(); @@ -183,11 +194,15 @@ public void iteratorTest() throws Exception { Assert.assertTrue(loc.isDefined()); final MemoryLocation keyAddress = loc.getKeyAddress(); final MemoryLocation valueAddress = loc.getValueAddress(); - final long key = PlatformDependent.UNSAFE.getLong( + final long keyLength = loc.getKeyLength(); + final long key = PlatformDependent.UNSAFE.getLong( keyAddress.getBaseObject(), keyAddress.getBaseOffset()); final long value = PlatformDependent.UNSAFE.getLong( valueAddress.getBaseObject(), valueAddress.getBaseOffset()); - Assert.assertEquals(key, value); + if (value % 5 == 0) + Assert.assertEquals(keyLength, 0); + else + Assert.assertEquals(key, value); valuesSeen.set((int) value); } Assert.assertEquals(size, valuesSeen.cardinality());