Skip to content

Commit 56c82ed

Browse files
uncleGenrxin
authored andcommitted
[SPARK-18617][CORE][STREAMING] Close "kryo auto pick" feature for Spark Streaming
## What changes were proposed in this pull request? apache#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue apache#15992 to optimize the solution. ## How was this patch tested? existing ut Author: uncleGen <[email protected]> Closes apache#16052 from uncleGen/SPARK-18617.
1 parent 879ba71 commit 56c82ed

File tree

4 files changed

+65
-9
lines changed

4 files changed

+65
-9
lines changed

core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,11 @@ private[spark] class SerializerManager(
7979
primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
8080
}
8181

82-
def getSerializer(ct: ClassTag[_]): Serializer = {
83-
if (canUseKryo(ct)) {
82+
// SPARK-18617: As feature in SPARK-13990 can not be applied to Spark Streaming now. The worst
83+
// result is streaming job based on `Receiver` mode can not run on Spark 2.x properly. It may be
84+
// a rational choice to close `kryo auto pick` feature for streaming in the first step.
85+
def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = {
86+
if (autoPick && canUseKryo(ct)) {
8487
kryoSerializer
8588
} else {
8689
defaultSerializer
@@ -161,7 +164,8 @@ private[spark] class SerializerManager(
161164
outputStream: OutputStream,
162165
values: Iterator[T]): Unit = {
163166
val byteStream = new BufferedOutputStream(outputStream)
164-
val ser = getSerializer(implicitly[ClassTag[T]]).newInstance()
167+
val autoPick = !blockId.isInstanceOf[StreamBlockId]
168+
val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
165169
ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
166170
}
167171

@@ -177,7 +181,8 @@ private[spark] class SerializerManager(
177181
classTag: ClassTag[_]): ChunkedByteBuffer = {
178182
val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
179183
val byteStream = new BufferedOutputStream(bbos)
180-
val ser = getSerializer(classTag).newInstance()
184+
val autoPick = !blockId.isInstanceOf[StreamBlockId]
185+
val ser = getSerializer(classTag, autoPick).newInstance()
181186
ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
182187
bbos.toChunkedByteBuffer
183188
}
@@ -191,7 +196,8 @@ private[spark] class SerializerManager(
191196
inputStream: InputStream)
192197
(classTag: ClassTag[T]): Iterator[T] = {
193198
val stream = new BufferedInputStream(inputStream)
194-
getSerializer(classTag)
199+
val autoPick = !blockId.isInstanceOf[StreamBlockId]
200+
getSerializer(classTag, autoPick)
195201
.newInstance()
196202
.deserializeStream(wrapStream(blockId, stream))
197203
.asIterator.asInstanceOf[Iterator[T]]

core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.{SparkConf, TaskContext}
3131
import org.apache.spark.internal.Logging
3232
import org.apache.spark.memory.{MemoryManager, MemoryMode}
3333
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
34-
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
34+
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId}
3535
import org.apache.spark.unsafe.Platform
3636
import org.apache.spark.util.{SizeEstimator, Utils}
3737
import org.apache.spark.util.collection.SizeTrackingVector
@@ -334,7 +334,8 @@ private[spark] class MemoryStore(
334334
val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator)
335335
redirectableStream.setOutputStream(bbos)
336336
val serializationStream: SerializationStream = {
337-
val ser = serializerManager.getSerializer(classTag).newInstance()
337+
val autoPick = !blockId.isInstanceOf[StreamBlockId]
338+
val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
338339
ser.serializeStream(serializerManager.wrapStream(blockId, redirectableStream))
339340
}
340341

core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ class PartiallySerializedBlockSuite
6767
spy
6868
}
6969

70-
val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
70+
val serializer = serializerManager
71+
.getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
7172
val redirectableOutputStream = Mockito.spy(new RedirectableOutputStream)
7273
redirectableOutputStream.setOutputStream(bbos)
7374
val serializationStream = Mockito.spy(serializer.serializeStream(redirectableOutputStream))
@@ -182,7 +183,8 @@ class PartiallySerializedBlockSuite
182183
Mockito.verifyNoMoreInteractions(memoryStore)
183184
Mockito.verify(partiallySerializedBlock.getUnrolledChunkedByteBuffer, atLeastOnce).dispose()
184185

185-
val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
186+
val serializer = serializerManager
187+
.getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
186188
val deserialized =
187189
serializer.deserializeStream(new ByteBufferInputStream(bbos.toByteBuffer)).asIterator.toSeq
188190
assert(deserialized === items)

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,28 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
806806
ssc.stop()
807807
}
808808

809+
test("SPARK-18560 Receiver data should be deserialized properly.") {
810+
// Start a two nodes cluster, so receiver will use one node, and Spark jobs will use the
811+
// other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560.
812+
val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName)
813+
ssc = new StreamingContext(conf, Milliseconds(100))
814+
val input = ssc.receiverStream(new FakeByteArrayReceiver)
815+
input.count().foreachRDD { rdd =>
816+
// Make sure we can read from BlockRDD
817+
if (rdd.collect().headOption.getOrElse(0L) > 0) {
818+
// Stop StreamingContext to unblock "awaitTerminationOrTimeout"
819+
new Thread() {
820+
setDaemon(true)
821+
override def run(): Unit = {
822+
ssc.stop(stopSparkContext = true, stopGracefully = false)
823+
}
824+
}.start()
825+
}
826+
}
827+
ssc.start()
828+
ssc.awaitTerminationOrTimeout(60000)
829+
}
830+
809831
def addInputStream(s: StreamingContext): DStream[Int] = {
810832
val input = (1 to 100).map(i => 1 to i)
811833
val inputStream = new TestInputStream(s, input, 1)
@@ -869,6 +891,31 @@ object TestReceiver {
869891
val counter = new AtomicInteger(1)
870892
}
871893

894+
class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging {
895+
896+
val data: Array[Byte] = "test".getBytes
897+
var receivingThreadOption: Option[Thread] = None
898+
899+
override def onStart(): Unit = {
900+
val thread = new Thread() {
901+
override def run() {
902+
logInfo("Receiving started")
903+
while (!isStopped) {
904+
store(data)
905+
}
906+
logInfo("Receiving stopped")
907+
}
908+
}
909+
receivingThreadOption = Some(thread)
910+
thread.start()
911+
}
912+
913+
override def onStop(): Unit = {
914+
// no clean to be done, the receiving thread should stop on it own, so just wait for it.
915+
receivingThreadOption.foreach(_.join())
916+
}
917+
}
918+
872919
/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */
873920
class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
874921
extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {

0 commit comments

Comments
 (0)