-
Notifications
You must be signed in to change notification settings - Fork 117
Dispatch tasks to right executors that have tasks' input HDFS data #216
Dispatch tasks to right executors that have tasks' input HDFS data #216
Conversation
|
Looks like there's still a scalastyle failure on the import ordering. See the "Imports" section of http://spark.apache.org/contributing.html for the expected ordering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love to see locality coming into play here! This looks like it might also improve node locality for two executor pods colocated on the same k8s node.
I'm not entirely sure just overriding getPendingTasksForHost is enough though, have you observed that with this change locality is enabled for within-node pods and disabled for cross-node pods?
| runningExecutorPods += ((executorId, pod)) | ||
| val podReadyFuture = SettableFuture.create[Pod] | ||
| val podWatcher = new ExecutorPodReadyWatcher(podReadyFuture) | ||
| val watchConnectionManager = kubernetesClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of opening a watch for each executor pod, we should try to open a single watch on all executor pods (by the executor pod selectors) to receive all events. That way we don't have N concurrent watches open, which each have an active connection with the apiserver. It also will hopefully simplify lifecycle tracking for these N watches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great suggestion. I was curious how to do it. The selector idea makes sense and it does clean up. Thanks!
| private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod] | ||
| private val runningExecutorPods = new HashMap[String, Pod] // Indexed by IDs. | ||
| private val executorWatchResources = new HashMap[String, Closeable] // Indexed by IDs. | ||
| private val executorPodsByIPs = new HashMap[String, Pod] // Indexed by IP addrs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the scala convention is that unqualified HashMap is often an immutable hashmap, so it might be surprising to a scala developer that these are mutable. Maybe import the mutable package and refer to these by new mutable.HashMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Also can you elaborate what ID is this indexed on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. Good to learn. Thanks!
| val (executorId, pod) = allocateNewExecutorPod() | ||
| logInfo(s"Allocated executor $executorId") | ||
| runningExecutorPods += ((executorId, pod)) | ||
| val podReadyFuture = SettableFuture.create[Pod] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't seem to be used anywhere -- maybe you can eliminate the SettableFuture here and also as a parameter into ExecutorPodReadyWatcher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. Removed.
| EXECUTOR_MODIFICATION_LOCK.synchronized { | ||
| for (executor <- executorIds) { | ||
| runningExecutorPods.remove(executor) match { | ||
| case Some(pod) => kubernetesClient.pods().delete(pod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
push the kubernetesClient.pods().delete(pod) onto a new line too so it lines up with the others in this block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| new TaskSetManager(sched = this, taskSet, maxTaskFailures) { | ||
|
|
||
| // Returns preferred tasks for an executor that may have local data there, | ||
| // using the physical cluster node name that it is running on. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This overrides the lookup to look not just for the pod's IP but also for the pod's node name and the pod's host IP address. Where in the parent class would executors get registered with those node names / host IP addresses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the spark driver is about to execute a stage, it creates a new task set and a new instance of this TaskSetManager. The constructor of TaskSetManager then populates this and other locality-related maps with the preferred locations of each task in the task set.
Here's the relevant method addPendingTask, which will be called for each task index:
/** Add a task to all the pending-task lists that it should be on. */
private def addPendingTask(index: Int) {
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
case _ =>
}
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
| } | ||
| val clusterNodeName = pod.get.getSpec.getNodeName | ||
| val clusterNodeIP = pod.get.getStatus.getHostIP | ||
| pendingTasks = super.getPendingTasksForHost(pod.get.getSpec.getNodeName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a bit have a bit more comments explaining why you're doing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also it's a bit confusing if you use clusterNodeName and clusterNodeIP in the log statement but you used pod.get.getSpec.getXXXX in these calls. I think it's better just to use the variables you declared
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. My bad. Failed to clean up some early code. Also added more comments. PTAL. Thanks!
| pendingTasks = super.getPendingTasksForHost(pod.get.getStatus.getHostIP) | ||
| } | ||
| if (pendingTasks.nonEmpty) { | ||
| logInfo(s"Got preferred task list $pendingTasks for executor host $executorIP " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be logDebug instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right. Done.
| try { | ||
| runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) | ||
| executorWatchResources.values.foreach(_.close) | ||
| executorPodsByIPs.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused when we need to use the EXECUTOR_MODIFICATION_LOCK lock. The lookup in the method is locking it, but in places where we're clearing and modifying it we're not really locking anything.
I think if we're explicitly assuming there is only one possible execution of this method we should be more explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I think we should lock here as well to be on the safe side, unless someone objects to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have simplified locking a bit in the latest diff. Now executorPodsByIPs is guarded by its own lock and add/delete is done only by the watcher. Hopefully, this makes it easier to separate locking concerns going forward.
Ah, I didn't think about that. Do you have an example in your mind for such a scenario? Maybe external shuffle service? One thing to note is that there are other locality-related maps in the base class. For instance,
I tested this with a Spark DFSReadWriteTest job, which writes to HDFS and then reads. I observed that reader tasks go to right executors who wrote the data to the local disks. But I didn't pay attention to the multiple executor pods on the same node scenario. The job used only two executors and they were on different nodes. |
Thanks for the pointer. The ordering finally makes sense to me. |
|
Thanks for the reviews so far. I addressed the comments in the latest diff. PTAL. In the meantime, I'll run the DFSReadWriteTest job again to see if it still works :-) |
|
Doing a bit more testing, I have uncovered a bug in this change. The addition of executor pod watcher thread will make the driver hang in case error was happening. I was accidentally running
The driver tries to write to the dir and then crashed complaining about the existing dir:
And the driver just hangs there. Looking at what threads are still active in the driver JVM, you can see OkHttp threads associated with the watcher:
FYI, befor this change, you would see the shutdown hook kicking in after such an error and clean-up happens normally:
I wonder how we can handle this bug? I was looking for a way to launch OkHttp threads as daemon thread. So far no luck. |
|
Fixed the driver hanging issue in two places.
With this, we don't see OkHttp threads in the grep output below:
|
| serviceAccountConfigBuilder | ||
| } | ||
| new DefaultKubernetesClient(configBuilder.build) | ||
| val threadPoolExecutor = new Dispatcher().executorService().asInstanceOf[ThreadPoolExecutor] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@foxish Can you take a look at this part of code? I don't like the fact that we have to do this much workaround to avoid hanging. I wonder if we want to change the upstream k8s client or okhttp code to make this easier. If yes, I'm willing to send a PR to them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does seem like an issue that you have to do this much to get exceptions in the main thread to terminate the JVM. Let's figure it out in the upstream library as that seems to be where the problem is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking really good! I like that even though we had to make changes to core Spark, we were able to keep them rather minimal.
Definitely let's figure out what's causing the issue with hanging that you had to work around. Don't think that should block this PR merging though.
Only real thing I think we need to do is fix the merge conflict then I'd say this is good to go.
| serviceAccountConfigBuilder | ||
| } | ||
| new DefaultKubernetesClient(configBuilder.build) | ||
| val threadPoolExecutor = new Dispatcher().executorService().asInstanceOf[ThreadPoolExecutor] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does seem like an issue that you have to do this much to get exceptions in the main thread to terminate the JVM. Let's figure it out in the upstream library as that seems to be where the problem is.
| // the driver main thread to shut down upon errors. Otherwise, the driver | ||
| // will hang indefinitely. | ||
| val config = configBuilder | ||
| .withWebsocketPingInterval(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely seems like kubernetes-client should run this as a daemon thread (meaning it doesn't prevent JVM exit per http://stackoverflow.com/questions/2213340/what-is-daemon-thread-in-java ). I don't think we want pinging the websocket to be worth keeping the JVM running for.
Check out https://github.com/fabric8io/kubernetes-client/blob/2f3e6de212f848774775bac6108b2f4d57c41f2b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java#L55 and how Spark has a ThreadUtils.newDaemonSingleThreadExecutor(). Possibly that use of the Executor (or another one) is the culprit here.
@kimoonkim can you please provide the full stacktrace of the thread that hangs the JVM shutdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ash211 @foxish
I looked at the full stack traces, but they don't tell you much as to who submitted the threads. (Because submission takes place in other threads)
For the web socket ping interval, I believe this is the relevant source code. From RealWebSocket.java:
public void initReaderAndWriter(
String name, long pingIntervalMillis, Streams streams) throws IOException {
synchronized (this) {
this.streams = streams;
this.writer = new WebSocketWriter(streams.client, streams.sink, random);
this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
if (pingIntervalMillis != 0) {
executor.scheduleAtFixedRate(
new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
}
if (!messageAndCloseQueue.isEmpty()) {
runWriter(); // Send messages that were enqueued before we were connected.
}
}
Notice it is calling Util.threadFactory(name, false). The second parameter false sets a new thread to be non-daemon. So passing true will fix the issue.
For the other one, I think this is the source code. From Dispatcher.java:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
Again, passing true to Util.threadFactory will fix the issue.
Unfortunately, both are inside the okhttp project outside the k8s client code. Interestingly, the comment of the main class OkHttpClient claims it is using daemon threads. So the above two lines could be simply bugs:
OkHttp also uses daemon threads for HTTP/2 connections. These will exit automatically if they remain idle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great investigation! With that work I think we have enough information to know what to change in okhttp upstream for it to correctly adhere to its stated contract on daemon threads.
Do you feel ready to open an issue with accompanying PR on okhttp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One can set the Dispatcher instance on okhttp clients also, such that it uses a daemon thread pool. See this in the context of retrofit: https://github.com/apache-spark-on-k8s/spark/pull/227/files#diff-716e31eb38cfd793ba5b2eea49cf5487R43
But Okhttp should certainly be doing this by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Let's try to open an issue/PR on okhttp. I'll work on that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccheah Right. We can change k8s client to specify a custom dispatcher. Or a custom executor service like this code did. We should keep that as an option in case fixing that part of okhttp turns out harder than we imagined. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kimoonkim did you end up opening an issue in okhttp upstream? I couldn't find the issue if you did when I looked at https://github.com/square/okhttp
square/okhttp#1890 seems like the most relevant issue.
I think the custom dispatcher fixes this anyway though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet. I was carried away by HDFS experiments. I'll open an issue today or tomorrow. Thanks for the pointer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the response in sqaure/okhttp#1890 says that the dispatcher shouldn't be a daemon. I'm going to file an issue for only the ping thread first.
| try { | ||
| runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) | ||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need a runningExecutorPods.clear() here to match the executorPodsByIPs.clear() below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in the latest diff.
| logDebug(s"Executor pod $podName at IP $podIP was deleted.") | ||
| EXECUTOR_PODS_BY_IPS_LOCK.synchronized { | ||
| executorPodsByIPs -= podIP | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also need to remove from runningExecutorPods here -- we currently handle intentional shutdown of executors (e.g. out of tasks) via doKillExecutors but I don't think we handle unintentional shutdown anywhere.
This should probably be the place to prune dead executors out of runningExecutorPods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that this can be better handled by @varunkatta with the upcoming recovery handling code. His PR would also use the same watcher, but he has a lot more context of how different pieces work together for dead executors.
…lity' into dispatch-tasks-by-hdfs-node-locality
|
Thanks @ash211 for the review. I addressed remaining comments. So maybe good to merge after Jenkins passes. |
|
My bad, I resolved the merge conflict incorrectly. |
| if (pendingTasks.nonEmpty) { | ||
| return pendingTasks | ||
| } | ||
| val backend = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].backend.asInstanceOf[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again - put this in a separate file. TaskSetManager also has a TaskSchedulerImpl reference available to it - no need to extract it from the SparkContext.
| KubernetesClusterSchedulerBackend] | ||
| val pod = backend.getExecutorPodByIP(executorIP) | ||
| if (pod.isEmpty) { | ||
| return pendingTasks // Empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using return in scala. Instead:
if (pod.isEmpty) {
pendingTasks // Since this is the last evaluated statement it also becomes the return value
} else {
// do other things
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| */ | ||
| override def getPendingTasksForHost(executorIP: String): ArrayBuffer[Int] = { | ||
| var pendingTasks = super.getPendingTasksForHost(executorIP) | ||
| if (pendingTasks.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using return in Scala (see below).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| * only executor pod IPs may not match them. | ||
| */ | ||
| override def getPendingTasksForHost(executorIP: String): ArrayBuffer[Int] = { | ||
| var pendingTasks = super.getPendingTasksForHost(executorIP) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See if we can avoid using var here. Having a separate val for each kind of pendingTasks that we're checking and validating would make it clearer how each pendingTasks set was obtained. E.g.
val pendingTasksExecutorIP = super.getPendingTasksForHost(executorIP)
...
val pendingTasksClusterNodeName = super.getPendingTasksForHost(clusterNodeName)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| if (pendingTasks.isEmpty) { | ||
| pendingTasks = super.getPendingTasksForHost(clusterNodeIP) | ||
| } | ||
| if (pendingTasks.nonEmpty && log.isDebugEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check if debug logging is enabled - simply invoking logDebug will print the log or not based on the configuration anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| EXECUTOR_PODS_BY_IPS_LOCK.synchronized { | ||
| executorPodsByIPs += ((podIP, pod)) | ||
| } | ||
| } else if (action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe Action.DELETED and Action.ERROR should also be handled somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
It would be nice to have unit tests around this logic, but |
…lity' into dispatch-tasks-by-hdfs-node-locality
| import org.apache.spark.SparkContext | ||
| import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} | ||
| import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, | ||
| TaskSchedulerImpl} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think imports should be wrapping lines - they shouldn't be subject to the character count limit.
|
@mccheah Thanks for the review. I was on the borderline for many things you commented on. Good to learn what we prefer. And I agree unit tests will be nice for this PR. I'll work on that in a followup. |
|
rerun unit tests please |
|
Thanks for the reviews. Maybe it's ready for merge? |
|
Sorry for the delay here @kimoonkim -- this still looks good so I'll merge when this latest build passes |
|
@ash211 Thanks for the merge! |
|
Filed square/okhttp issue #3339 for the web socket ping thread. |
) * Dispatch tasks to right executors that have tasks' input HDFS data on local disks * Fix style issues * Clean up unnecessary fields * Clean up a misleading method name * Address review comments * Fix import ordering * Delete executor pods in watcher * Fix the driver hang by unblocking the main thread * Fix import order * Clear runningExecutorPods * Fix incorrect merge * Address review comments * Clean up imports
…pache-spark-on-k8s#216) * Dispatch tasks to right executors that have tasks' input HDFS data on local disks * Fix style issues * Clean up unnecessary fields * Clean up a misleading method name * Address review comments * Fix import ordering * Delete executor pods in watcher * Fix the driver hang by unblocking the main thread * Fix import order * Clear runningExecutorPods * Fix incorrect merge * Address review comments * Clean up imports
What changes were proposed in this pull request?
CC @foxish @ash211 @tnachen
Fixes the task dispatching layer problem discussed in #206. But only the node locality part for now:
With this change, we now translate executor pod IPs to physical cluster node names and use the physical names for looking up the hosts-to-tasks map. Note this patch relies on #214 that switches executors to use pod IPs for registration.
How was this patch tested?
Ran the DFSReadWriteTest job with the combined patch of this and #214. The driver log says we successfully found the preferred tasks to the right executors. Task 0 to executor pod 10.36.0.1 on kube-n4 and Task 1 to executor pod 10.42.0.3 on kube-n7.