Skip to content

Commit 3b6ef2c

Browse files
zsxwingrxin
authored andcommitted
[SPARK-7655][Core] Deserializing value should not hold the TaskSchedulerImpl lock
We should not call `DirectTaskResult.value` when holding the `TaskSchedulerImpl` lock. It may cost dozens of seconds to deserialize a large object. Author: zsxwing <[email protected]> Closes #6195 from zsxwing/SPARK-7655 and squashes the following commits: 21f502e [zsxwing] Add more comments e25fa88 [zsxwing] Add comments 15010b5 [zsxwing] Deserialize value should not hold the TaskSchedulerImpl lock
1 parent 161d0b4 commit 3b6ef2c

File tree

3 files changed

+31
-2
lines changed

3 files changed

+31
-2
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
4040
var metrics: TaskMetrics)
4141
extends TaskResult[T] with Externalizable {
4242

43+
private var valueObjectDeserialized = false
44+
private var valueObject: T = _
45+
4346
def this() = this(null.asInstanceOf[ByteBuffer], null, null)
4447

4548
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
@@ -72,10 +75,26 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
7275
}
7376
}
7477
metrics = in.readObject().asInstanceOf[TaskMetrics]
78+
valueObjectDeserialized = false
7579
}
7680

81+
/**
82+
* When `value()` is called at the first time, it needs to deserialize `valueObject` from
83+
* `valueBytes`. It may cost dozens of seconds for a large instance. So when calling `value` at
84+
* the first time, the caller should avoid to block other threads.
85+
*
86+
* After the first time, `value()` is trivial and just returns the deserialized `valueObject`.
87+
*/
7788
def value(): T = {
78-
val resultSer = SparkEnv.get.serializer.newInstance()
79-
resultSer.deserialize(valueBytes)
89+
if (valueObjectDeserialized) {
90+
valueObject
91+
} else {
92+
// This should not run when holding a lock because it may cost dozens of seconds for a large
93+
// value.
94+
val resultSer = SparkEnv.get.serializer.newInstance()
95+
valueObject = resultSer.deserialize(valueBytes)
96+
valueObjectDeserialized = true
97+
valueObject
98+
}
8099
}
81100
}

core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
5454
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
5555
return
5656
}
57+
// deserialize "value" without holding any lock so that it won't block other threads.
58+
// We should call it here, so that when it's called again in
59+
// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
60+
directResult.value()
5761
(directResult, serializedData.limit())
5862
case IndirectTaskResult(blockId, size) =>
5963
if (!taskSetManager.canFetchMoreResults(size)) {

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,12 @@ private[spark] class TaskSetManager(
620620
val index = info.index
621621
info.markSuccessful()
622622
removeRunningTask(tid)
623+
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
624+
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
625+
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
626+
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
627+
// Note: "result.value()" only deserializes the value when it's called at the first time, so
628+
// here "result.value()" just returns the value and won't block other threads.
623629
sched.dagScheduler.taskEnded(
624630
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
625631
if (!successful(index)) {

0 commit comments

Comments
 (0)