Skip to content

Commit b9624ee

Browse files
committed
Expand serializer API and use new function to help control when new UnsafeShuffle path is used.
Conflicts: core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
1 parent fec7b29 commit b9624ee

File tree

3 files changed

+31
-3
lines changed

3 files changed

+31
-3
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ class KryoSerializer(conf: SparkConf)
125125
override def newInstance(): SerializerInstance = {
126126
new KryoSerializerInstance(this)
127127
}
128+
129+
override def supportsRelocationOfSerializedObjects: Boolean = {
130+
// TODO: we should have a citation / explanatory comment here clarifying _why_ this is the case
131+
newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()
132+
}
128133
}
129134

130135
private[spark]

core/src/main/scala/org/apache/spark/serializer/Serializer.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
2323
import scala.reflect.ClassTag
2424

2525
import org.apache.spark.{SparkConf, SparkEnv}
26-
import org.apache.spark.annotation.DeveloperApi
26+
import org.apache.spark.annotation.{Experimental, DeveloperApi}
2727
import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator}
2828

2929
/**
@@ -63,6 +63,30 @@ abstract class Serializer {
6363

6464
/** Creates a new [[SerializerInstance]]. */
6565
def newInstance(): SerializerInstance
66+
67+
/**
68+
* Returns true if this serializer supports relocation of its serialized objects and false
69+
* otherwise. This should return true if and only if reordering the bytes of serialized objects
70+
* in serialization stream output results in re-ordered input that can be read with the
71+
* deserializer. For instance, the following should work if the serializer supports relocation:
72+
*
73+
* serOut.open()
74+
* position = 0
75+
* serOut.write(obj1)
76+
* serOut.flush()
77+
* position = # of bytes writen to stream so far
78+
* obj1Bytes = [bytes 0 through position of stream]
79+
* serOut.write(obj2)
80+
* serOut.flush
81+
* position2 = # of bytes written to stream so far
82+
* obj2Bytes = bytes[position through position2 of stream]
83+
*
84+
* serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1)
85+
*
86+
* See SPARK-7311 for more discussion.
87+
*/
88+
@Experimental
89+
def supportsRelocationOfSerializedObjects: Boolean = false
6690
}
6791

6892

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,7 @@ private[spark] class ExternalSorter[K, V, C](
131131
private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB
132132
private val useSerializedPairBuffer =
133133
!ordering.isDefined && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
134-
ser.isInstanceOf[KryoSerializer] &&
135-
serInstance.asInstanceOf[KryoSerializerInstance].getAutoReset
134+
ser.supportsRelocationOfSerializedObjects
136135

137136
// Data structures to store in-memory objects before we spill. Depending on whether we have an
138137
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we

0 commit comments

Comments
 (0)