Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import scala.math.max
import scala.math.min

import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted,
SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
SparkEnv, SparkException, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.{Clock, SystemClock}
import org.apache.spark.util.{AkkaUtils, Clock, SystemClock}

/**
* Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
Expand All @@ -56,6 +56,7 @@ private[spark] class TaskSetManager(
{
val conf = sched.sc.conf

private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
/*
* Sometimes if an executor is dead or in an otherwise invalid state, the driver
* does not realize right away leading to repeated task failures. If enabled,
Expand Down Expand Up @@ -414,6 +415,14 @@ private[spark] class TaskSetManager(
// we assume the task can be serialized without exceptions.
val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
if (serializedTask.limit >= akkaFrameSize - 1024) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the 1024 for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reference Executor.scala#L235.may not suitable for here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout - you added this. Any reason you left the 1024 of wiggle room?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

real example:

serialized task with dependencies serialized LaunchTask
4356 4797
2690585 2704141
5238584 5264580

The difference has been increasing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see - it's to leave room for the other contents of the message. In the second case here, do you know what is taking all the extra room? 2704141 - 2690585 = ~13KB which is very large for a few string/int fields!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the problem is the value of uncertainty

var msg = "Serialized task %s:%d were %d bytes which " +
"exceeds spark.akka.frameSize (%d bytes)."
msg = msg.format(taskSet.id, index, serializedTask.limit, akkaFrameSize)
val exception = new SparkException(msg)
logError(msg, exception)
throw exception
}
val timeTaken = clock.getTime() - startTime
addRunningTask(taskId)
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
Expand Down