-
Couldn't load subscription status.
- Fork 117
Use nodeAffinity to launch executors on cluster nodes that benefit from HDFS data locality #315
Description
This is a sub-issue of the HDFS data locality issue #206.
When using HDFS, the Spark driver sends tasks to particular executors that have tasks' HDFS data on local disks. (Implemented in #216)
To increase the chance of benefitting from data locality, the driver should also launch executors on cluster nodes that have tasks' HDFS data in the first place if possible.
There are a few Spark core base classes that update the key data structure hostToLocalTaskCount, which contains a map of cluster nodes to the number of tasks that could benefit from data locality if tasks run on the cluster node.
- ExecutorAllocationManager listens to a newly submitted stage and looks at the partition locations of the stage input data. The cluster nodes that have the partition locations will increase the task count.
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
...
stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>
if (!locality.isEmpty) {
numTasksPending += 1
locality.foreach { location =>
val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1
hostToLocalTaskCountPerStage(location.host) = count
}
}
}
stageIdToExecutorPlacementHints.put(stageId,
(numTasksPending, hostToLocalTaskCountPerStage.toMap))
// Update the executor placement hints
updateExecutorPlacementHints()
}
}
def updateExecutorPlacementHints(): Unit = {
var localityAwareTasks = 0
val localityToCount = new mutable.HashMap[String, Int]()
stageIdToExecutorPlacementHints.values.foreach { case (numTasksPending, localities) =>
localityAwareTasks += numTasksPending
localities.foreach { case (hostname, count) =>
val updatedCount = localityToCount.getOrElse(hostname, 0) + count
localityToCount(hostname) = updatedCount
}
}
allocationManager.localityAwareTasks = localityAwareTasks
allocationManager.hostToLocalTaskCount = localityToCount.toMap
}
- CoarseGrainedSchedulerBackend will be informed of
hostToLocalTaskCountand stores a snapshot.
final override def requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]
): Boolean = {
if (numExecutors < 0) {
throw new IllegalArgumentException(
"Attempted to request a negative number of executor(s) " +
s"$numExecutors from the cluster manager. Please specify a positive number!")
}
val response = synchronized {
this.localityAwareTasks = localityAwareTasks
this.hostToLocalTaskCount = hostToLocalTaskCount
numPendingExecutors =
math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
doRequestTotalExecutors(numExecutors)
}
defaultAskTimeout.awaitResult(response)
}
In K8s, we can use K8s nodeAffinity (doc)
in KubernetesClusterSchedulerBackend, which is a subclass of CoarseGrainedSchedulerBackend, to express this preference. In particular, preferredDuringSchedulingIgnoredDuringExecution can express soft requirement. From the doc:
There are currently two types of node affinity, called requiredDuringSchedulingIgnoredDuringExecution and preferredDuringSchedulingIgnoredDuringExecution. You can think of them as “hard” and “soft” respectively, in the sense that the former specifies rules that must be met for a pod to schedule onto a node (just like nodeSelector but using a more expressive syntax), while the latter specifies preferences that the scheduler will try to enforce but will not guarantee. The “IgnoredDuringExecution” part of the names means that, similar to how nodeSelector works, if labels on a node change at runtime such that the affinity rules on a pod are no longer met, the pod will still continue to run on the node.
I'll send a PR soon to address this need.