Skip to content

Commit 123b992

Browse files
committed
Cleanup for submitting as standalone patch.
1 parent 4aa61b2 commit 123b992

File tree

3 files changed

+36
-21
lines changed

3 files changed

+36
-21
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class KryoSerializer(conf: SparkConf)
126126
new KryoSerializerInstance(this)
127127
}
128128

129-
override def supportsRelocationOfSerializedObjects: Boolean = {
129+
private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = {
130130
// If auto-flush is disabled, then Kryo may store references to duplicate occurrences of objects
131131
// in the stream rather than writing those objects' serialized bytes, breaking relocation. See
132132
// https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details.

core/src/main/scala/org/apache/spark/serializer/Serializer.scala

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
2323
import scala.reflect.ClassTag
2424

2525
import org.apache.spark.{SparkConf, SparkEnv}
26-
import org.apache.spark.annotation.{Experimental, DeveloperApi}
26+
import org.apache.spark.annotation.{Private, Experimental, DeveloperApi}
2727
import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator}
2828

2929
/**
@@ -65,28 +65,36 @@ abstract class Serializer {
6565
def newInstance(): SerializerInstance
6666

6767
/**
68+
* :: Private ::
6869
* Returns true if this serializer supports relocation of its serialized objects and false
69-
* otherwise. This should return true if and only if reordering the bytes of serialized objects
70-
* in serialization stream output results in re-ordered input that can be read with the
71-
* deserializer. For instance, the following should work if the serializer supports relocation:
70+
* otherwise. This should return true if and only if reordering the bytes of serialized objects
71+
* in serialization stream output is equivalent to having re-ordered those elements prior to
72+
* serializing them. More specifically, the following should hold if a serializer supports
73+
* relocation:
7274
*
73-
* serOut.open()
74-
* position = 0
75-
* serOut.write(obj1)
76-
* serOut.flush()
77-
* position = # of bytes writen to stream so far
78-
* obj1Bytes = [bytes 0 through position of stream]
79-
* serOut.write(obj2)
80-
* serOut.flush
81-
* position2 = # of bytes written to stream so far
82-
* obj2Bytes = bytes[position through position2 of stream]
75+
* {{{
76+
* serOut.open()
77+
* position = 0
78+
* serOut.write(obj1)
79+
* serOut.flush()
80+
* position = # of bytes writen to stream so far
81+
* obj1Bytes = output[0:position-1]
82+
* serOut.write(obj2)
83+
* serOut.flush()
84+
* position2 = # of bytes written to stream so far
85+
* obj2Bytes = output[position:position2-1]
86+
* serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1)
87+
* }}}
8388
*
84-
* serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1)
89+
* In general, this property should hold for serializers that are stateless.
8590
*
86-
* See SPARK-7311 for more discussion.
91+
* This API is private to Spark; this method should not be overridden in third-party subclasses
92+
* or called in user code and is subject to removal in future Spark releases.
93+
*
94+
* See SPARK-7311 for more details.
8795
*/
88-
@Experimental
89-
def supportsRelocationOfSerializedObjects: Boolean = false
96+
@Private
97+
private[spark] def supportsRelocationOfSerializedObjects: Boolean = false
9098
}
9199

92100

core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,19 @@ import org.scalatest.{Assertions, FunSuite}
2626
import org.apache.spark.SparkConf
2727
import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset
2828

29-
29+
/**
30+
* Tests to ensure that [[Serializer]] implementations obey the API contracts for methods that
31+
* describe properties of the serialized stream, such as
32+
* [[Serializer.supportsRelocationOfSerializedObjects]].
33+
*/
3034
class SerializerPropertiesSuite extends FunSuite {
3135

3236
import SerializerPropertiesSuite._
3337

3438
test("JavaSerializer does not support relocation") {
39+
// Per a comment on the SPARK-4550 JIRA ticket, Java serialization appears to write out the
40+
// full class name the first time an object is written to an output stream, but subsequent
41+
// references to the class write a more compact identifier; this prevents relocation.
3542
val ser = new JavaSerializer(new SparkConf())
3643
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
3744
}
@@ -76,7 +83,7 @@ object SerializerPropertiesSuite extends Assertions {
7683
if (!serializer.supportsRelocationOfSerializedObjects) {
7784
return
7885
}
79-
val NUM_TRIALS = 10
86+
val NUM_TRIALS = 5
8087
val rand = new Random(42)
8188
for (_ <- 1 to NUM_TRIALS) {
8289
val items = {

0 commit comments

Comments
 (0)