@@ -35,6 +35,12 @@ import org.apache.spark.network._
3535import org .apache .spark .serializer .Serializer
3636import org .apache .spark .util ._
3737
38+ sealed trait Values
39+
40+ case class ByteBufferValues (buffer : ByteBuffer ) extends Values
41+ case class IteratorValues (iterator : Iterator [Any ]) extends Values
42+ case class ArrayBufferValues (buffer : ArrayBuffer [Any ]) extends Values
43+
3844private [spark] class BlockManager (
3945 executorId : String ,
4046 actorSystem : ActorSystem ,
@@ -455,7 +461,7 @@ private[spark] class BlockManager(
455461
456462 def put (blockId : BlockId , values : Iterator [Any ], level : StorageLevel , tellMaster : Boolean )
457463 : Long = {
458- doPut(blockId, Left ( Left ( values) ), level, tellMaster)
464+ doPut(blockId, IteratorValues ( values), level, tellMaster)
459465 }
460466
461467 /**
@@ -477,7 +483,7 @@ private[spark] class BlockManager(
477483 def put (blockId : BlockId , values : ArrayBuffer [Any ], level : StorageLevel ,
478484 tellMaster : Boolean = true ) : Long = {
479485 require(values != null , " Values is null" )
480- doPut(blockId, Left ( Right ( values) ), level, tellMaster)
486+ doPut(blockId, ArrayBufferValues ( values), level, tellMaster)
481487 }
482488
483489 /**
@@ -486,11 +492,11 @@ private[spark] class BlockManager(
486492 def putBytes (blockId : BlockId , bytes : ByteBuffer , level : StorageLevel ,
487493 tellMaster : Boolean = true ) {
488494 require(bytes != null , " Bytes is null" )
489- doPut(blockId, Right (bytes), level, tellMaster)
495+ doPut(blockId, ByteBufferValues (bytes), level, tellMaster)
490496 }
491497
492498 private def doPut (blockId : BlockId ,
493- data : Either [ Either [ Iterator [ Any ], ArrayBuffer [ Any ]], ByteBuffer ] ,
499+ data : Values ,
494500 level : StorageLevel , tellMaster : Boolean = true ): Long = {
495501 require(blockId != null , " BlockId is null" )
496502 require(level != null && level.isValid, " StorageLevel is null or invalid" )
@@ -533,8 +539,9 @@ private[spark] class BlockManager(
533539
534540 // If we're storing bytes, then initiate the replication before storing them locally.
535541 // This is faster as data is already serialized and ready to send.
536- val replicationFuture = if (data.isRight && level.replication > 1 ) {
537- val bufferView = data.right.get.duplicate() // Doesn't copy the bytes, just creates a wrapper
542+ val replicationFuture = if (data.isInstanceOf [ByteBufferValues ] && level.replication > 1 ) {
543+ // Duplicate doesn't copy the bytes, just creates a wrapper
544+ val bufferView = data.asInstanceOf [ByteBufferValues ].buffer.duplicate()
538545 Future {
539546 replicate(blockId, bufferView, level)
540547 }
@@ -548,42 +555,43 @@ private[spark] class BlockManager(
548555
549556 var marked = false
550557 try {
551- data match {
552- case Left (values) => {
553- if (level.useMemory) {
554- // Save it just to memory first, even if it also has useDisk set to true; we will
555- // drop it to disk later if the memory store can't hold it.
556- val res = values match {
557- case Left (values_i) => memoryStore.putValues(blockId, values_i, level, true )
558- case Right (values_a) => memoryStore.putValues(blockId, values_a, level, true )
559- }
560- size = res.size
561- res.data match {
562- case Right (newBytes) => bytesAfterPut = newBytes
563- case Left (newIterator) => valuesAfterPut = newIterator
564- }
565- } else {
566- // Save directly to disk.
567- // Don't get back the bytes unless we replicate them.
568- val askForBytes = level.replication > 1
569-
570- val res = values match {
571- case Left (values_i) => diskStore.putValues(blockId, values_i, level, askForBytes)
572- case Right (values_a) => diskStore.putValues(blockId, values_a, level, askForBytes)
573- }
574-
575- size = res.size
576- res.data match {
577- case Right (newBytes) => bytesAfterPut = newBytes
578- case _ =>
579- }
558+ if (level.useMemory) {
559+ // Save it just to memory first, even if it also has useDisk set to true; we will
560+ // drop it to disk later if the memory store can't hold it.
561+ val res = data match {
562+ case IteratorValues (values_i) =>
563+ memoryStore.putValues(blockId, values_i, level, true )
564+ case ArrayBufferValues (values_a) =>
565+ memoryStore.putValues(blockId, values_a, level, true )
566+ case ByteBufferValues (value_bytes) => {
567+ value_bytes.rewind();
568+ memoryStore.putBytes(blockId, value_bytes, level)
569+ }
570+ }
571+ size = res.size
572+ res.data match {
573+ case Right (newBytes) => bytesAfterPut = newBytes
574+ case Left (newIterator) => valuesAfterPut = newIterator
575+ }
576+ } else {
577+ // Save directly to disk.
578+ // Don't get back the bytes unless we replicate them.
579+ val askForBytes = level.replication > 1
580+
581+ val res = data match {
582+ case IteratorValues (values_i) =>
583+ diskStore.putValues(blockId, values_i, level, askForBytes)
584+ case ArrayBufferValues (values_a) =>
585+ diskStore.putValues(blockId, values_a, level, askForBytes)
586+ case ByteBufferValues (value_bytes) => {
587+ value_bytes.rewind();
588+ diskStore.putBytes(blockId, value_bytes, level)
580589 }
581590 }
582- case Right (bytes) => {
583- bytes.rewind()
584- // Store it only in memory at first, even if useDisk is also set to true
585- (if (level.useMemory) memoryStore else diskStore).putBytes(blockId, bytes, level)
586- size = bytes.limit
591+ size = res.size
592+ res.data match {
593+ case Right (newBytes) => bytesAfterPut = newBytes
594+ case _ =>
587595 }
588596 }
589597
@@ -612,8 +620,8 @@ private[spark] class BlockManager(
612620 // values and need to serialize and replicate them now:
613621 if (level.replication > 1 ) {
614622 data match {
615- case Right (bytes) => Await .ready(replicationFuture, Duration .Inf )
616- case Left (values) => {
623+ case ByteBufferValues (bytes) => Await .ready(replicationFuture, Duration .Inf )
624+ case _ => {
617625 val remoteStartTime = System .currentTimeMillis
618626 // Serialize the block if not already done
619627 if (bytesAfterPut == null ) {
0 commit comments