From e6578400ce58104d2b022f62110ac83f82a92872 Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 7 May 2014 13:12:34 +0800 Subject: [PATCH 1/2] SPARK-1712: TaskDescription instance is too big causes Spark to hang --- .../org/apache/spark/scheduler/TaskSetManager.scala | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index f3bd0797aa035..e475780fdf0bc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -27,10 +27,10 @@ import scala.math.max import scala.math.min import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted, - SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState} + SparkEnv, SparkException, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.TaskMetrics -import org.apache.spark.util.{Clock, SystemClock} +import org.apache.spark.util.{AkkaUtils, Clock, SystemClock} /** * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of @@ -56,6 +56,7 @@ private[spark] class TaskSetManager( { val conf = sched.sc.conf + private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) /* * Sometimes if an executor is dead or in an otherwise invalid state, the driver * does not realize right away leading to repeated task failures. If enabled, @@ -414,6 +415,14 @@ private[spark] class TaskSetManager( // we assume the task can be serialized without exceptions. val serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) + if (serializedTask.limit >= akkaFrameSize - 1024) { + val msg = "Serialized task %s:%d were %d bytes which " + + "exceeds spark.akka.frameSize (%d bytes)." + val exception = new SparkException(msg.format(taskSet.id, + index, serializedTask.limit, akkaFrameSize)) + logError(msg, exception) + throw exception + } val timeTaken = clock.getTime() - startTime addRunningTask(taskId) logInfo("Serialized task %s:%d as %d bytes in %d ms".format( From 3330fb667bc56fa51a00ef39dac073843ce2003a Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 7 May 2014 15:22:21 +0800 Subject: [PATCH 2/2] fix mssg format --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index e475780fdf0bc..47faaa8639f6e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -416,10 +416,10 @@ private[spark] class TaskSetManager( val serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) if (serializedTask.limit >= akkaFrameSize - 1024) { - val msg = "Serialized task %s:%d were %d bytes which " + + var msg = "Serialized task %s:%d were %d bytes which " + "exceeds spark.akka.frameSize (%d bytes)." - val exception = new SparkException(msg.format(taskSet.id, - index, serializedTask.limit, akkaFrameSize)) + msg = msg.format(taskSet.id, index, serializedTask.limit, akkaFrameSize) + val exception = new SparkException(msg) logError(msg, exception) throw exception }