From b9624eefa111c012b013417a4c754358066e29ed Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 1 May 2015 14:43:56 -0700 Subject: [PATCH 1/9] Expand serializer API and use new function to help control when new UnsafeShuffle path is used. Conflicts: core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala --- .../spark/serializer/KryoSerializer.scala | 5 ++++ .../apache/spark/serializer/Serializer.scala | 26 ++++++++++++++++++- .../util/collection/ExternalSorter.scala | 3 +-- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index b7bc087855b9f..f6c17e362f9b3 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -125,6 +125,11 @@ class KryoSerializer(conf: SparkConf) override def newInstance(): SerializerInstance = { new KryoSerializerInstance(this) } + + override def supportsRelocationOfSerializedObjects: Boolean = { + // TODO: we should have a citation / explanatory comment here clarifying _why_ this is the case + newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset() + } } private[spark] diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index c381672a4f588..144a1c51ac858 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -23,7 +23,7 @@ import java.nio.ByteBuffer import scala.reflect.ClassTag import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.annotation.{Experimental, DeveloperApi} import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator} /** @@ -63,6 +63,30 @@ abstract class Serializer { /** Creates a new [[SerializerInstance]]. */ def newInstance(): SerializerInstance + + /** + * Returns true if this serializer supports relocation of its serialized objects and false + * otherwise. This should return true if and only if reordering the bytes of serialized objects + * in serialization stream output results in re-ordered input that can be read with the + * deserializer. For instance, the following should work if the serializer supports relocation: + * + * serOut.open() + * position = 0 + * serOut.write(obj1) + * serOut.flush() + * position = # of bytes writen to stream so far + * obj1Bytes = [bytes 0 through position of stream] + * serOut.write(obj2) + * serOut.flush + * position2 = # of bytes written to stream so far + * obj2Bytes = bytes[position through position2 of stream] + * + * serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1) + * + * See SPARK-7311 for more discussion. + */ + @Experimental + def supportsRelocationOfSerializedObjects: Boolean = false } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index b7306cd551918..7d5cf7b61e56a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -131,8 +131,7 @@ private[spark] class ExternalSorter[K, V, C]( private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB private val useSerializedPairBuffer = !ordering.isDefined && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && - ser.isInstanceOf[KryoSerializer] && - serInstance.asInstanceOf[KryoSerializerInstance].getAutoReset + ser.supportsRelocationOfSerializedObjects // Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we From 86d4dcdf0cb09b75d14ef201262ea771995ac7cb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 1 May 2015 14:47:33 -0700 Subject: [PATCH 2/9] Flag that SparkSqlSerializer2 supports relocation --- core/src/test/resources/log4j.properties | 2 +- .../org/apache/spark/sql/execution/SparkSqlSerializer2.scala | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index eb3b1999eb996..9512ac1ac79c3 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -16,7 +16,7 @@ # # Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file +log4j.rootCategory=DEBUG, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=true log4j.appender.file.file=target/unit-tests.log diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala index 9552f41115866..c841362a246ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala @@ -154,6 +154,8 @@ private[sql] class SparkSqlSerializer2(keySchema: Array[DataType], valueSchema: with Serializable{ def newInstance(): SerializerInstance = new ShuffleSerializerInstance(keySchema, valueSchema) + + override def supportsRelocationOfSerializedObjects: Boolean = true } private[sql] object SparkSqlSerializer2 { From 450fa2182c5345b08f6fb5eea74f9f5d96822a48 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 3 May 2015 12:57:07 -0700 Subject: [PATCH 3/9] Back out accidental log4j.properties change --- core/src/test/resources/log4j.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 9512ac1ac79c3..eb3b1999eb996 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -16,7 +16,7 @@ # # Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=DEBUG, file +log4j.rootCategory=INFO, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=true log4j.appender.file.file=target/unit-tests.log From 0ba75e61ed32140f17e559cd5378ed3d3ba6ed38 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 3 May 2015 12:52:15 -0700 Subject: [PATCH 4/9] Add tests for serializer relocation property. I verified that the Kryo tests will fail if we remove the auto-reset check in KryoSerializer. I also checked that this test fails if we mistakenly enable this flag for JavaSerializer. This demonstrates that the test case is actually capable of detecting the types of bugs that it's trying to prevent. Of course, it's possible that certain bugs will only surface when serializing specific data types, so we'll still have to be cautious when overriding `supportsRelocationOfSerializedObjects` for new serializers. --- .../spark/serializer/KryoSerializer.scala | 4 +- .../SerializerPropertiesSuite.scala | 103 ++++++++++++++++++ 2 files changed, 106 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index f6c17e362f9b3..14b3890fac01a 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -127,7 +127,9 @@ class KryoSerializer(conf: SparkConf) } override def supportsRelocationOfSerializedObjects: Boolean = { - // TODO: we should have a citation / explanatory comment here clarifying _why_ this is the case + // If auto-flush is disabled, then Kryo may store references to duplicate occurrences of objects + // in the stream rather than writing those objects' serialized bytes, breaking relocation. See + // https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details. newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset() } } diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala new file mode 100644 index 0000000000000..a117619b04a43 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.SparkConf +import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset + +private case class MyCaseClass(foo: Int, bar: String) + +class SerializerPropertiesSuite extends FunSuite { + + test("JavaSerializer does not support relocation") { + testSupportsRelocationOfSerializedObjects(new JavaSerializer(new SparkConf())) + } + + test("KryoSerializer supports relocation when auto-reset is enabled") { + val ser = new KryoSerializer(new SparkConf) + assert(ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()) + testSupportsRelocationOfSerializedObjects(ser) + } + + test("KryoSerializer does not support relocation when auto-reset is disabled") { + val conf = new SparkConf().set("spark.kryo.registrator", + classOf[RegistratorWithoutAutoReset].getName) + val ser = new KryoSerializer(conf) + assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()) + testSupportsRelocationOfSerializedObjects(ser) + } + + def testSupportsRelocationOfSerializedObjects(serializer: Serializer): Unit = { + val NUM_TRIALS = 100 + if (!serializer.supportsRelocationOfSerializedObjects) { + return + } + val rand = new Random(42) + val randomFunctions: Seq[() => Any] = Seq( + () => rand.nextInt(), + () => rand.nextString(rand.nextInt(10)), + () => rand.nextDouble(), + () => rand.nextBoolean(), + () => (rand.nextInt(), rand.nextString(rand.nextInt(10))), + () => MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10))), + () => { + val x = MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10))) + (x, x) + } + ) + def generateRandomItem(): Any = { + randomFunctions(rand.nextInt(randomFunctions.size)).apply() + } + + for (_ <- 1 to NUM_TRIALS) { + val items = { + // Make sure that we have duplicate occurrences of the same object in the stream: + val randomItems = Seq.fill(10)(generateRandomItem()) + randomItems ++ randomItems.take(5) + } + val baos = new ByteArrayOutputStream() + val serStream = serializer.newInstance().serializeStream(baos) + def serializeItem(item: Any): Array[Byte] = { + val itemStartOffset = baos.toByteArray.length + serStream.writeObject(item) + serStream.flush() + val itemEndOffset = baos.toByteArray.length + baos.toByteArray.slice(itemStartOffset, itemEndOffset).clone() + } + val itemsAndSerializedItems: Seq[(Any, Array[Byte])] = { + val serItems = items.map { + item => (item, serializeItem(item)) + } + serStream.close() + rand.shuffle(serItems) + } + val reorderedSerializedData: Array[Byte] = itemsAndSerializedItems.flatMap(_._2).toArray + val deserializedItemsStream = serializer.newInstance().deserializeStream( + new ByteArrayInputStream(reorderedSerializedData)) + assert(deserializedItemsStream.asIterator.toSeq === itemsAndSerializedItems.map(_._1)) + deserializedItemsStream.close() + } + } + +} From 2c1233a4b2754fe94e076bb0aac806474555730a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 3 May 2015 13:08:35 -0700 Subject: [PATCH 5/9] Small refactoring of SerializerPropertiesSuite to enable test re-use: This lays some groundwork for re-using this test logic for serializers defined in other subprojects (those projects can just declare a test-jar dependency on Spark core). --- .../SerializerPropertiesSuite.scala | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala index a117619b04a43..dbd831c3f6056 100644 --- a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala @@ -21,23 +21,25 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import scala.util.Random -import org.scalatest.FunSuite +import org.scalatest.{Assertions, FunSuite} import org.apache.spark.SparkConf import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset -private case class MyCaseClass(foo: Int, bar: String) class SerializerPropertiesSuite extends FunSuite { + import SerializerPropertiesSuite._ + test("JavaSerializer does not support relocation") { - testSupportsRelocationOfSerializedObjects(new JavaSerializer(new SparkConf())) + val ser = new JavaSerializer(new SparkConf()) + testSupportsRelocationOfSerializedObjects(ser, generateRandomItem) } test("KryoSerializer supports relocation when auto-reset is enabled") { val ser = new KryoSerializer(new SparkConf) assert(ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()) - testSupportsRelocationOfSerializedObjects(ser) + testSupportsRelocationOfSerializedObjects(ser, generateRandomItem) } test("KryoSerializer does not support relocation when auto-reset is disabled") { @@ -45,15 +47,14 @@ class SerializerPropertiesSuite extends FunSuite { classOf[RegistratorWithoutAutoReset].getName) val ser = new KryoSerializer(conf) assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()) - testSupportsRelocationOfSerializedObjects(ser) + testSupportsRelocationOfSerializedObjects(ser, generateRandomItem) } - def testSupportsRelocationOfSerializedObjects(serializer: Serializer): Unit = { - val NUM_TRIALS = 100 - if (!serializer.supportsRelocationOfSerializedObjects) { - return - } - val rand = new Random(42) +} + +object SerializerPropertiesSuite extends Assertions { + + def generateRandomItem(rand: Random): Any = { val randomFunctions: Seq[() => Any] = Seq( () => rand.nextInt(), () => rand.nextString(rand.nextInt(10)), @@ -66,14 +67,21 @@ class SerializerPropertiesSuite extends FunSuite { (x, x) } ) - def generateRandomItem(): Any = { - randomFunctions(rand.nextInt(randomFunctions.size)).apply() - } + randomFunctions(rand.nextInt(randomFunctions.size)).apply() + } + def testSupportsRelocationOfSerializedObjects( + serializer: Serializer, + generateRandomItem: Random => Any): Unit = { + if (!serializer.supportsRelocationOfSerializedObjects) { + return + } + val NUM_TRIALS = 10 + val rand = new Random(42) for (_ <- 1 to NUM_TRIALS) { val items = { // Make sure that we have duplicate occurrences of the same object in the stream: - val randomItems = Seq.fill(10)(generateRandomItem()) + val randomItems = Seq.fill(10)(generateRandomItem(rand)) randomItems ++ randomItems.take(5) } val baos = new ByteArrayOutputStream() @@ -99,5 +107,6 @@ class SerializerPropertiesSuite extends FunSuite { deserializedItemsStream.close() } } - } + +private case class MyCaseClass(foo: Int, bar: String) \ No newline at end of file From 4aa61b2ff8a494881937104299103bb5fdce6f8b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 3 May 2015 13:18:39 -0700 Subject: [PATCH 6/9] Add missing newline --- .../org/apache/spark/serializer/SerializerPropertiesSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala index dbd831c3f6056..b6848c3b19f51 100644 --- a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala @@ -109,4 +109,4 @@ object SerializerPropertiesSuite extends Assertions { } } -private case class MyCaseClass(foo: Int, bar: String) \ No newline at end of file +private case class MyCaseClass(foo: Int, bar: String) From 123b99286e3a5e6586818d302ec79abc4f0e958e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 5 May 2015 14:30:28 -0700 Subject: [PATCH 7/9] Cleanup for submitting as standalone patch. --- .../spark/serializer/KryoSerializer.scala | 2 +- .../apache/spark/serializer/Serializer.scala | 44 +++++++++++-------- .../SerializerPropertiesSuite.scala | 11 ++++- 3 files changed, 36 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 14b3890fac01a..d27bea48561f5 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -126,7 +126,7 @@ class KryoSerializer(conf: SparkConf) new KryoSerializerInstance(this) } - override def supportsRelocationOfSerializedObjects: Boolean = { + private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = { // If auto-flush is disabled, then Kryo may store references to duplicate occurrences of objects // in the stream rather than writing those objects' serialized bytes, breaking relocation. See // https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details. diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index 144a1c51ac858..dd4a3c090d647 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -23,7 +23,7 @@ import java.nio.ByteBuffer import scala.reflect.ClassTag import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.annotation.{Experimental, DeveloperApi} +import org.apache.spark.annotation.{Private, Experimental, DeveloperApi} import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator} /** @@ -65,28 +65,36 @@ abstract class Serializer { def newInstance(): SerializerInstance /** + * :: Private :: * Returns true if this serializer supports relocation of its serialized objects and false - * otherwise. This should return true if and only if reordering the bytes of serialized objects - * in serialization stream output results in re-ordered input that can be read with the - * deserializer. For instance, the following should work if the serializer supports relocation: + * otherwise. This should return true if and only if reordering the bytes of serialized objects + * in serialization stream output is equivalent to having re-ordered those elements prior to + * serializing them. More specifically, the following should hold if a serializer supports + * relocation: * - * serOut.open() - * position = 0 - * serOut.write(obj1) - * serOut.flush() - * position = # of bytes writen to stream so far - * obj1Bytes = [bytes 0 through position of stream] - * serOut.write(obj2) - * serOut.flush - * position2 = # of bytes written to stream so far - * obj2Bytes = bytes[position through position2 of stream] + * {{{ + * serOut.open() + * position = 0 + * serOut.write(obj1) + * serOut.flush() + * position = # of bytes writen to stream so far + * obj1Bytes = output[0:position-1] + * serOut.write(obj2) + * serOut.flush() + * position2 = # of bytes written to stream so far + * obj2Bytes = output[position:position2-1] + * serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1) + * }}} * - * serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1) + * In general, this property should hold for serializers that are stateless. * - * See SPARK-7311 for more discussion. + * This API is private to Spark; this method should not be overridden in third-party subclasses + * or called in user code and is subject to removal in future Spark releases. + * + * See SPARK-7311 for more details. */ - @Experimental - def supportsRelocationOfSerializedObjects: Boolean = false + @Private + private[spark] def supportsRelocationOfSerializedObjects: Boolean = false } diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala index b6848c3b19f51..bb34033fe9e7e 100644 --- a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala @@ -26,12 +26,19 @@ import org.scalatest.{Assertions, FunSuite} import org.apache.spark.SparkConf import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset - +/** + * Tests to ensure that [[Serializer]] implementations obey the API contracts for methods that + * describe properties of the serialized stream, such as + * [[Serializer.supportsRelocationOfSerializedObjects]]. + */ class SerializerPropertiesSuite extends FunSuite { import SerializerPropertiesSuite._ test("JavaSerializer does not support relocation") { + // Per a comment on the SPARK-4550 JIRA ticket, Java serialization appears to write out the + // full class name the first time an object is written to an output stream, but subsequent + // references to the class write a more compact identifier; this prevents relocation. val ser = new JavaSerializer(new SparkConf()) testSupportsRelocationOfSerializedObjects(ser, generateRandomItem) } @@ -76,7 +83,7 @@ object SerializerPropertiesSuite extends Assertions { if (!serializer.supportsRelocationOfSerializedObjects) { return } - val NUM_TRIALS = 10 + val NUM_TRIALS = 5 val rand = new Random(42) for (_ <- 1 to NUM_TRIALS) { val items = { From 0a7ebd7311c42707d2576cda9a7b28ce96d2973f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 5 May 2015 14:35:59 -0700 Subject: [PATCH 8/9] Clarify reason why SqlSerializer2 supports this serializer --- .../main/scala/org/apache/spark/serializer/Serializer.scala | 3 ++- .../org/apache/spark/sql/execution/SparkSqlSerializer2.scala | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index dd4a3c090d647..4000f87d7ce12 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -86,7 +86,8 @@ abstract class Serializer { * serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1) * }}} * - * In general, this property should hold for serializers that are stateless. + * In general, this property should hold for serializers that are stateless and that do not + * write special metadata at the beginning or end of the serialization stream. * * This API is private to Spark; this method should not be overridden in third-party subclasses * or called in user code and is subject to removal in future Spark releases. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala index c841362a246ea..35ad987eb1a63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala @@ -155,7 +155,10 @@ private[sql] class SparkSqlSerializer2(keySchema: Array[DataType], valueSchema: def newInstance(): SerializerInstance = new ShuffleSerializerInstance(keySchema, valueSchema) - override def supportsRelocationOfSerializedObjects: Boolean = true + override def supportsRelocationOfSerializedObjects: Boolean = { + // SparkSqlSerializer2 is stateless and writes no stream headers + true + } } private[sql] object SparkSqlSerializer2 { From 50a68ca42058da46caff93ed1af925810555d994 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 6 May 2015 10:47:27 -0700 Subject: [PATCH 9/9] Address minor nits --- .../main/scala/org/apache/spark/serializer/KryoSerializer.scala | 2 +- .../src/main/scala/org/apache/spark/serializer/Serializer.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index d27bea48561f5..f9f78852f032b 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -127,7 +127,7 @@ class KryoSerializer(conf: SparkConf) } private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = { - // If auto-flush is disabled, then Kryo may store references to duplicate occurrences of objects + // If auto-reset is disabled, then Kryo may store references to duplicate occurrences of objects // in the stream rather than writing those objects' serialized bytes, breaking relocation. See // https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details. newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset() diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index 4000f87d7ce12..6078c9d433ebf 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -23,7 +23,7 @@ import java.nio.ByteBuffer import scala.reflect.ClassTag import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.annotation.{Private, Experimental, DeveloperApi} +import org.apache.spark.annotation.{DeveloperApi, Private} import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator} /**