Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 94 additions & 24 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ class DAGScheduler(

// For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
// every task. When we detect a node failing, we note the current epoch number and failed
// executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
// executor or host, increment it for new tasks, and use this to ignore stray
// ShuffleMapTask results.
//
// TODO: Garbage collect information about failure epochs when we know there are no more
// stray messages to detect.
Expand Down Expand Up @@ -1348,7 +1349,14 @@ class DAGScheduler(

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
if (env.blockManager.externalShuffleServiceEnabled) {
val currentEpoch = Some(task.epoch).getOrElse(mapOutputTracker.getEpoch)
removeExecutor(bmAddress.executorId, currentEpoch)
handleExternalShuffleFailure(bmAddress.host, currentEpoch)
}
else {
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
}
}
}

Expand All @@ -1368,6 +1376,30 @@ class DAGScheduler(
}
}

/**
* Removes an executor from the driver endpoint.
*
* @param execId id of the executor to be removed
* @param currentEpoch epoch during which the executor failure was caught to avoid allowing
* stray failures from possibly retriggering the detection of an
* executor as lost.
*
* @return boolean value indicating whether the executor was removed or not
*/
private[scheduler] def removeExecutor(execId: String, currentEpoch: Long): Boolean = {
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)
true
}
else {
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
false
}
}

/**
* Responds to an executor being lost. This is called inside the event loop, so it assumes it can
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
Expand All @@ -1385,38 +1417,76 @@ class DAGScheduler(
filesLost: Boolean,
maybeEpoch: Option[Long] = None) {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)
val executorRemoved = removeExecutor(execId, currentEpoch)
if (executorRemoved && (filesLost || !env.blockManager.externalShuffleServiceEnabled)) {
handleInternalShuffleFailure(execId, currentEpoch)
}
}

if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
stage.removeOutputsOnExecutor(execId)
mapOutputTracker.registerMapOutputs(
shuffleId,
stage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
}
if (shuffleIdToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch()
}
clearCacheLocs()
}
/**
* Responds to an internal shuffle becoming unavailable for an executor.
*
* We will assume that we've lost all the shuffle blocks for the executor.
*
* @param execId id of the executor for which internal shuffle is unavailable
* @param currentEpoch epoch during which the failure was caught.
*/
private[scheduler] def handleInternalShuffleFailure(execId: String, currentEpoch: Long): Unit = {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
cleanShuffleOutputs((stage: ShuffleMapStage) => {
stage.removeOutputsOnExecutor(execId)
})
}

/**
* Responds to an external shuffle service becoming unavailable on a host.
*
* We will assume that we've lost all the shuffle blocks on that host if FetchFailed occurred
* while external shuffle is being used.
*
* @param host address of the host on which external shuffle is unavailable
* @param currentEpoch epoch during which the failure was caught. This is passed to avoid
* allowing stray fetch failures from possibly retriggering the detection
* of external shuffle service becoming unavailable.
*/
private[scheduler] def handleExternalShuffleFailure(host: String, currentEpoch: Long): Unit = {
if (!failedEpoch.contains(host) || failedEpoch(host) < currentEpoch) {
failedEpoch(host) = currentEpoch
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
cleanShuffleOutputs((stage: ShuffleMapStage) => {
stage.removeOutputsOnHost(host)
})
} else {
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
logDebug(("Additional Shuffle files " +
"lost message for host: %s (epoch %d)").format(host, currentEpoch))
}
}

private[scheduler] def cleanShuffleOutputs(outputsCleaner: ShuffleMapStage => _): Unit = {
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
outputsCleaner(stage)
mapOutputTracker.registerMapOutputs(
shuffleId,
stage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
}
if (shuffleIdToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch()
}
clearCacheLocs()
}

private[scheduler] def handleExecutorAdded(execId: String, host: String) {
// remove from failedEpoch(execId) ?
if (failedEpoch.contains(execId)) {
logInfo("Host added was in lost list earlier: " + host)
logInfo("Executor %s added was in lost list earlier.".format(execId))
failedEpoch -= execId
}

if (failedEpoch.contains(host)) {
failedEpoch -= host
}
}

private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,25 +132,45 @@ private[spark] class ShuffleMapStage(
outputLocs.map(_.headOption.orNull)
}

/**
* Removes all shuffle outputs associated with this executor. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists), as they are still
* registered with this execId.
*/
def removeOutputsOnExecutor(execId: String): Unit = {
private def removeOutputsHelper(locationChecker: BlockManagerId => Boolean): Boolean = {
var becameUnavailable = false
for (partition <- 0 until numPartitions) {
val prevList = outputLocs(partition)
val newList = prevList.filterNot(_.location.executorId == execId)
val newList = prevList.filterNot(status => locationChecker(status.location))
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
becameUnavailable = true
_numAvailableOutputs -= 1
}
}
becameUnavailable
}

/**
* Removes all shuffle outputs associated with this executor. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists), as they are still
* registered with this execId.
*/
def removeOutputsOnExecutor(execId: String): Unit = {
val becameUnavailable = removeOutputsHelper(
(blockManagerId: BlockManagerId) => { blockManagerId.executorId == execId })

if (becameUnavailable) {
logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
this, execId, _numAvailableOutputs, numPartitions, isAvailable))
}
}

/**
* Removes all shuffle outputs associated with the external shuffle service on this host.
*/
def removeOutputsOnHost(host: String): Unit = {
val becameUnavailable = removeOutputsHelper(
(blockManagerId: BlockManagerId) => { blockManagerId.host == host })

if (becameUnavailable) {
logInfo("%s is now unavailable on host %s (%d/%d, %s)".format(
this, host, _numAvailableOutputs, numPartitions, isAvailable))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,41 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}
}

private val shuffleFetchFailureTests = Seq(
("fetch failure with external shuffle service enabled", true, Set(0, 1, 2, 4)),
("fetch failure with internal shuffle service enabled", false, Set(0, 1)))

for((eventDescription, shuffleServiceOn, expectedPartitionsLost)
<- shuffleFetchFailureTests) {
test(eventDescription) {
afterEach()
val conf = new SparkConf()
conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString)
init(conf)
assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn)

val shuffleMapRdd = new MyRDD(sc, 5, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0, 1))

complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", reduceRdd.partitions.length, 5, Some("exec-hostA-1"))),
(Success, makeMapStatus("hostA", reduceRdd.partitions.length, 5, Some("exec-hostA-1"))),
(Success, makeMapStatus("hostA", reduceRdd.partitions.length, 5, Some("exec-hostA-2"))),
(Success, makeMapStatus("hostB", reduceRdd.partitions.length, 5, Some("exec-hostB-1"))),
(Success, makeMapStatus("hostA", reduceRdd.partitions.length, 5, Some("exec-hostA-2")))))

complete(taskSets(1), Seq(
(Success, 42),
(FetchFailed(makeBlockManagerId("hostA", Some("exec-hostA-1")),
shuffleId, 0, 0, "ignored"), null)))
scheduler.resubmitFailedStages()
assert(taskSets(2).tasks.map(_.partitionId).toSet === expectedPartitionsLost)
}
}

// Helper function to validate state when creating tests for task failures
private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
assert(stageAttempt.stageId === stageId)
Expand Down Expand Up @@ -2330,9 +2365,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}

object DAGSchedulerSuite {
def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))
def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2,
execId: Option[String] = None): MapStatus =
MapStatus(makeBlockManagerId(host, execId), Array.fill[Long](reduces)(sizes))

def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)
def makeBlockManagerId(host: String, execId: Option[String] = None): BlockManagerId =
BlockManagerId(execId.getOrElse("exec-" + host), host, 12345)
}