Skip to content

Commit 0f28ec7

Browse files
committed
Adding second putValues to BlockStore interface that accepts an ArrayBuffer (rather then an Iterator).
This will allow BlockStores to have slightly different behaviors dependent on whether they get an Iterator or ArrayBuffer. In the case of the MemoryStore, it needs to duplicate and cache an Iterator into an ArrayBuffer, but if handed a ArrayBuffer, it can skip the duplication.
1 parent 656c33e commit 0f28ec7

File tree

4 files changed

+42
-5
lines changed

4 files changed

+42
-5
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ private[spark] class BlockManager(
455455

456456
def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
457457
: Long = {
458-
doPut(blockId, Left(values), level, tellMaster)
458+
doPut(blockId, Left(Left(values)), level, tellMaster)
459459
}
460460

461461
/**
@@ -477,7 +477,7 @@ private[spark] class BlockManager(
477477
def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
478478
tellMaster: Boolean = true) : Long = {
479479
require(values != null, "Values is null")
480-
doPut(blockId, Left(values.toIterator), level, tellMaster)
480+
doPut(blockId, Left(Right(values)), level, tellMaster)
481481
}
482482

483483
/**
@@ -489,7 +489,7 @@ private[spark] class BlockManager(
489489
doPut(blockId, Right(bytes), level, tellMaster)
490490
}
491491

492-
private def doPut(blockId: BlockId, data: Either[Iterator[Any], ByteBuffer],
492+
private def doPut(blockId: BlockId, data: Either[Either[Iterator[Any],ArrayBuffer[Any]], ByteBuffer],
493493
level: StorageLevel, tellMaster: Boolean = true): Long = {
494494
require(blockId != null, "BlockId is null")
495495
require(level != null && level.isValid, "StorageLevel is null or invalid")
@@ -552,7 +552,10 @@ private[spark] class BlockManager(
552552
if (level.useMemory) {
553553
// Save it just to memory first, even if it also has useDisk set to true; we will
554554
// drop it to disk later if the memory store can't hold it.
555-
val res = memoryStore.putValues(blockId, values, level, true)
555+
val res = values match {
556+
case Left(values_i) => memoryStore.putValues(blockId, values_i, level, true)
557+
case Right(values_a) => memoryStore.putValues(blockId, values_a, level, true)
558+
}
556559
size = res.size
557560
res.data match {
558561
case Right(newBytes) => bytesAfterPut = newBytes
@@ -562,7 +565,12 @@ private[spark] class BlockManager(
562565
// Save directly to disk.
563566
// Don't get back the bytes unless we replicate them.
564567
val askForBytes = level.replication > 1
565-
val res = diskStore.putValues(blockId, values, level, askForBytes)
568+
569+
val res = values match {
570+
case Left(values_i) => diskStore.putValues(blockId, values_i, level, askForBytes)
571+
case Right(values_a) => diskStore.putValues(blockId, values_a, level, askForBytes)
572+
}
573+
566574
size = res.size
567575
res.data match {
568576
case Right(newBytes) => bytesAfterPut = newBytes

core/src/main/scala/org/apache/spark/storage/BlockStore.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
4040
def putValues(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
4141
returnValues: Boolean) : PutResult
4242

43+
def putValues(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
44+
returnValues: Boolean) : PutResult
45+
4346
/**
4447
* Return the size of a block in bytes.
4548
*/

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
5454
file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime)))
5555
}
5656

57+
override def putValues(
58+
blockId: BlockId,
59+
values: ArrayBuffer[Any],
60+
level: StorageLevel,
61+
returnValues: Boolean)
62+
: PutResult = {
63+
return putValues(blockId, values.toIterator, level, returnValues)
64+
}
65+
5766
override def putValues(
5867
blockId: BlockId,
5968
values: Iterator[Any],

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,23 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
6464
}
6565
}
6666

67+
override def putValues(
68+
blockId: BlockId,
69+
values: ArrayBuffer[Any],
70+
level: StorageLevel,
71+
returnValues: Boolean)
72+
: PutResult = {
73+
if (level.deserialized) {
74+
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
75+
tryToPut(blockId, values, sizeEstimate, true)
76+
PutResult(sizeEstimate, Left(values.toIterator))
77+
} else {
78+
val bytes = blockManager.dataSerialize(blockId, values.toIterator)
79+
tryToPut(blockId, bytes, bytes.limit, false)
80+
PutResult(bytes.limit(), Right(bytes.duplicate()))
81+
}
82+
}
83+
6784
override def putValues(
6885
blockId: BlockId,
6986
values: Iterator[Any],

0 commit comments

Comments
 (0)