Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Conversation

@kimoonkim
Copy link
Member

What changes were proposed in this pull request?

CC @foxish @ash211 @tnachen

Fixes the task dispatching layer problem discussed in #206. But only the node locality part for now:

  1. Task dispatching: Once it gets executors, Spark Driver will dispatch tasks to executors. For each task, the Driver tries to send it to one of right executors that has the partition of the task on local disk or local rack. For this, the Driver builds the hosts-to- tasks and racks-to-tasks mapping with the datanode info from the namenode, and later looks up the maps using executor host names.

This layer is broken. Spark Driver will build the mapping using the data node names. Then it will look up the maps using the executor pod host names, which will never match.

With this change, we now translate executor pod IPs to physical cluster node names and use the physical names for looking up the hosts-to-tasks map. Note this patch relies on #214 that switches executors to use pod IPs for registration.

How was this patch tested?

Ran the DFSReadWriteTest job with the combined patch of this and #214. The driver log says we successfully found the preferred tasks to the right executors. Task 0 to executor pod 10.36.0.1 on kube-n4 and Task 1 to executor pod 10.42.0.3 on kube-n7.

2017-04-02 16:41:39 INFO KubernetesClusterManager$$anon$1$$anon$2:54 - Got preferred task list ArrayBuffer(0) for executor host 10.36.0.1 using cluster node name Some(kube-n4)
2017-04-02 16:41:39 INFO KubernetesClusterManager$$anon$1$$anon$2:54 - Starting task 0.0 in stage 1.0 (TID 2, 10.36.0.1, executor 1, partition 0, NODE_LOCAL, 6156 bytes)
2017-04-02 16:41:39 INFO KubernetesClusterManager$$anon$1$$anon$2:54 - Got preferred task list ArrayBuffer(1) for executor host 10.42.0.3 using cluster node name Some(kube-n7)
2017-04-02 16:41:39 INFO KubernetesClusterManager$$anon$1$$anon$2:54 - Starting task 1.0 in stage 1.0 (TID 3, 10.42.0.3, executor 2, partition 1, NODE_LOCAL, 6156 bytes)

@ash211
Copy link

ash211 commented Apr 5, 2017

Looks like there's still a scalastyle failure on the import ordering. See the "Imports" section of http://spark.apache.org/contributing.html for the expected ordering

Copy link

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love to see locality coming into play here! This looks like it might also improve node locality for two executor pods colocated on the same k8s node.

I'm not entirely sure just overriding getPendingTasksForHost is enough though, have you observed that with this change locality is enabled for within-node pods and disabled for cross-node pods?

runningExecutorPods += ((executorId, pod))
val podReadyFuture = SettableFuture.create[Pod]
val podWatcher = new ExecutorPodReadyWatcher(podReadyFuture)
val watchConnectionManager = kubernetesClient
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of opening a watch for each executor pod, we should try to open a single watch on all executor pods (by the executor pod selectors) to receive all events. That way we don't have N concurrent watches open, which each have an active connection with the apiserver. It also will hopefully simplify lifecycle tracking for these N watches

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great suggestion. I was curious how to do it. The selector idea makes sense and it does clean up. Thanks!

private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod]
private val runningExecutorPods = new HashMap[String, Pod] // Indexed by IDs.
private val executorWatchResources = new HashMap[String, Closeable] // Indexed by IDs.
private val executorPodsByIPs = new HashMap[String, Pod] // Indexed by IP addrs.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the scala convention is that unqualified HashMap is often an immutable hashmap, so it might be surprising to a scala developer that these are mutable. Maybe import the mutable package and refer to these by new mutable.HashMap

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
Also can you elaborate what ID is this indexed on

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Good to learn. Thanks!

val (executorId, pod) = allocateNewExecutorPod()
logInfo(s"Allocated executor $executorId")
runningExecutorPods += ((executorId, pod))
val podReadyFuture = SettableFuture.create[Pod]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem to be used anywhere -- maybe you can eliminate the SettableFuture here and also as a parameter into ExecutorPodReadyWatcher

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Removed.

EXECUTOR_MODIFICATION_LOCK.synchronized {
for (executor <- executorIds) {
runningExecutorPods.remove(executor) match {
case Some(pod) => kubernetesClient.pods().delete(pod)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push the kubernetesClient.pods().delete(pod) onto a new line too so it lines up with the others in this block

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

new TaskSetManager(sched = this, taskSet, maxTaskFailures) {

// Returns preferred tasks for an executor that may have local data there,
// using the physical cluster node name that it is running on.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overrides the lookup to look not just for the pod's IP but also for the pod's node name and the pod's host IP address. Where in the parent class would executors get registered with those node names / host IP addresses?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the spark driver is about to execute a stage, it creates a new task set and a new instance of this TaskSetManager. The constructor of TaskSetManager then populates this and other locality-related maps with the preferred locations of each task in the task set.

Here's the relevant method addPendingTask, which will be called for each task index:

  /** Add a task to all the pending-task lists that it should be on. */
  private def addPendingTask(index: Int) {
    for (loc <- tasks(index).preferredLocations) {
      loc match {
        case e: ExecutorCacheTaskLocation =>
          pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
        case e: HDFSCacheTaskLocation =>
          val exe = sched.getExecutorsAliveOnHost(loc.host)
          exe match {
            case Some(set) =>
              for (e <- set) {
                pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
              }
              logInfo(s"Pending task $index has a cached location at ${e.host} " +
                ", where there are executors " + set.mkString(","))
            case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
                ", but there are no executors alive there.")
          }
        case _ =>
      }
      pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
      for (rack <- sched.getRackForHost(loc.host)) {
        pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
      }
    }

}
val clusterNodeName = pod.get.getSpec.getNodeName
val clusterNodeIP = pod.get.getStatus.getHostIP
pendingTasks = super.getPendingTasksForHost(pod.get.getSpec.getNodeName)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a bit have a bit more comments explaining why you're doing this.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it's a bit confusing if you use clusterNodeName and clusterNodeIP in the log statement but you used pod.get.getSpec.getXXXX in these calls. I think it's better just to use the variables you declared

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. My bad. Failed to clean up some early code. Also added more comments. PTAL. Thanks!

pendingTasks = super.getPendingTasksForHost(pod.get.getStatus.getHostIP)
}
if (pendingTasks.nonEmpty) {
logInfo(s"Got preferred task list $pendingTasks for executor host $executorIP " +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be logDebug instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. Done.

try {
runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_))
executorWatchResources.values.foreach(_.close)
executorPodsByIPs.clear()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused when we need to use the EXECUTOR_MODIFICATION_LOCK lock. The lookup in the method is locking it, but in places where we're clearing and modifying it we're not really locking anything.

I think if we're explicitly assuming there is only one possible execution of this method we should be more explicit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think we should lock here as well to be on the safe side, unless someone objects to it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have simplified locking a bit in the latest diff. Now executorPodsByIPs is guarded by its own lock and add/delete is done only by the watcher. Hopefully, this makes it easier to separate locking concerns going forward.

@kimoonkim
Copy link
Member Author

@ash211

This looks like it might also improve node locality for two executor pods colocated on the same k8s node.

Ah, I didn't think about that. Do you have an example in your mind for such a scenario? Maybe external shuffle service?

One thing to note is that there are other locality-related maps in the base class. For instance, pendingTasksForExecutor below is keyed by executor IDs, not by pod IPs. We may look at these together for other locality related scenarios:

  private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]

  // Set of pending tasks for each host. Similar to pendingTasksForExecutor,
  // but at host level.
  private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]

  // Set of pending tasks for each rack -- similar to the above.
  private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]

I'm not entirely sure just overriding getPendingTasksForHost is enough though, have you observed that with this change locality is enabled for within-node pods and disabled for cross-node pods?

I tested this with a Spark DFSReadWriteTest job, which writes to HDFS and then reads. I observed that reader tasks go to right executors who wrote the data to the local disks. But I didn't pay attention to the multiple executor pods on the same node scenario. The job used only two executors and they were on different nodes.

@kimoonkim
Copy link
Member Author

@ash211

Looks like there's still a scalastyle failure on the import ordering. See the "Imports" section of http://spark.apache.org/contributing.html for the expected ordering

Thanks for the pointer. The ordering finally makes sense to me.

@kimoonkim
Copy link
Member Author

Thanks for the reviews so far. I addressed the comments in the latest diff. PTAL.

In the meantime, I'll run the DFSReadWriteTest job again to see if it still works :-)

@kimoonkim
Copy link
Member Author

Doing a bit more testing, I have uncovered a bug in this change. The addition of executor pod watcher thread will make the driver hang in case error was happening.

I was accidentally running DFSReadWriteTest job with an existing directory given as a command line argument.

$ /usr/local/spark-on-k8s/bin/spark-submit --verbose --class org.apache.spark.examples.DFSReadWriteTest --conf spark.app.name=spark-dfstest --conf spark.executor.instances=2 --conf spark.hadoop.fs.defaultFS=hdfs://hdfs-namenode-0.hdfs-namenode.kube-system.svc.cluster.local:8020 /usr/local/spark-on-k8s/examples/jars/spark-examples_2.11-2.1.0-k8s-0.1.0-SNAPSHOT.jar /etc/hosts /user/root/dfstest/13

The driver tries to write to the dir and then crashed complaining about the existing dir:

Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hdfs-namenode-0.hdfs-namenode.kube-system.svc.cluster.local:8020/user/root/dfstest/13/dfs_read_write_test already exists
at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1191)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1168)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1071)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1037)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:963)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1488)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1467)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1467)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1467)
at org.apache.spark.examples.DFSReadWriteTest$.main(DFSReadWriteTest.scala:113)
at org.apache.spark.examples.DFSReadWriteTest.main(DFSReadWriteTest.scala)

And the driver just hangs there. Looking at what threads are still active in the driver JVM, you can see OkHttp threads associated with the watcher:

bash-4.3# jstack 50 | grep tid= | grep -v daemon
"DestroyJavaVM" #67 prio=5 os_prio=0 tid=0x00007f1cd46d9800 nid=0x34 waiting on condition [0x0000000000000000]
"OkHttp WebSocket https://kubernetes.default.svc/..." #51 prio=5 os_prio=0 tid=0x00007f1cd401e000 nid=0x63 waiting on condition [0x00007f1cafbaa000]
"OkHttp https://kubernetes.default.svc/..." #50 prio=5 os_prio=0 tid=0x00007f1cd265a000 nid=0x62 runnable [0x00007f1caf53e000]
"VM Thread" os_prio=0 tid=0x00007f1ccf81d800 nid=0x39 runnable
"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f1ccf6fc000 nid=0x35 runnable
"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f1ccf6fe000 nid=0x36 runnable
"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f1ccf6ff800 nid=0x37 runnable
"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f1ccf701800 nid=0x38 runnable
"VM Periodic Task Thread" os_prio=0 tid=0x00007f1ccf8e2800 nid=0x41 waiting on condition

FYI, befor this change, you would see the shutdown hook kicking in after such an error and clean-up happens normally:

2017-04-07 01:19:14 INFO SparkContext:54 - Invoking stop() from shutdown hook
2017-04-07 01:19:14 INFO ServerConnector:306 - Stopped ServerConnector@72f46e16{HTTP/1.1}{0.0.0.0:4040}
...
2017-04-07 01:19:15 INFO KubernetesSparkRestServer$KubernetesSubmitRequestServlet:54 - Received stop command, shutting down the running Spark application...
2017-04-07 01:19:15 INFO ShutdownHookManager:54 - Shutdown hook called
2017-04-07 01:19:15 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-ba801dd3-4ee5-4832-8e80-f86798f1961a

I wonder how we can handle this bug? I was looking for a way to launch OkHttp threads as daemon thread. So far no luck.

@kimoonkim
Copy link
Member Author

Fixed the driver hanging issue in two places.

  1. Supplies a custom thread factory that sets watcher threads to be daemons.
  2. Sets the ping period to zero. This avoids launching the "OkHttp WebSocket" ping thread, which is not a daemon (And there is no way to set it to be a daemon. We may want to change the upstream code)

With this, we don't see OkHttp threads in the grep output below:

bash-4.3# jstack 49 | grep tid= | grep -v daemon
"main" #1 prio=5 os_prio=0 tid=0x00007f2291d64000 nid=0x33 in Object.wait() [0x00007f229119c000]
"VM Thread" os_prio=0 tid=0x00007f2291e9c800 nid=0x38 runnable
"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f2291d7b000 nid=0x34 runnable
"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f2291d7d000 nid=0x35 runnable
"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f2291d7e800 nid=0x36 runnable
"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f2291d80800 nid=0x37 runnable
"VM Periodic Task Thread" os_prio=0 tid=0x00007f2291f60800 nid=0x40 waiting on condition

serviceAccountConfigBuilder
}
new DefaultKubernetesClient(configBuilder.build)
val threadPoolExecutor = new Dispatcher().executorService().asInstanceOf[ThreadPoolExecutor]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@foxish Can you take a look at this part of code? I don't like the fact that we have to do this much workaround to avoid hanging. I wonder if we want to change the upstream k8s client or okhttp code to make this easier. If yes, I'm willing to send a PR to them.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does seem like an issue that you have to do this much to get exceptions in the main thread to terminate the JVM. Let's figure it out in the upstream library as that seems to be where the problem is.

@kimoonkim
Copy link
Member Author

@ash211 @tnachen This PR is ready for another look. PTAL if I addressed your comments properly. Thanks!

Copy link

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking really good! I like that even though we had to make changes to core Spark, we were able to keep them rather minimal.

Definitely let's figure out what's causing the issue with hanging that you had to work around. Don't think that should block this PR merging though.

Only real thing I think we need to do is fix the merge conflict then I'd say this is good to go.

@mccheah @tnachen @foxish any other thoughts?

serviceAccountConfigBuilder
}
new DefaultKubernetesClient(configBuilder.build)
val threadPoolExecutor = new Dispatcher().executorService().asInstanceOf[ThreadPoolExecutor]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does seem like an issue that you have to do this much to get exceptions in the main thread to terminate the JVM. Let's figure it out in the upstream library as that seems to be where the problem is.

// the driver main thread to shut down upon errors. Otherwise, the driver
// will hang indefinitely.
val config = configBuilder
.withWebsocketPingInterval(0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely seems like kubernetes-client should run this as a daemon thread (meaning it doesn't prevent JVM exit per http://stackoverflow.com/questions/2213340/what-is-daemon-thread-in-java ). I don't think we want pinging the websocket to be worth keeping the JVM running for.

Check out https://github.com/fabric8io/kubernetes-client/blob/2f3e6de212f848774775bac6108b2f4d57c41f2b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java#L55 and how Spark has a ThreadUtils.newDaemonSingleThreadExecutor(). Possibly that use of the Executor (or another one) is the culprit here.

@kimoonkim can you please provide the full stacktrace of the thread that hangs the JVM shutdown?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ash211 @foxish
I looked at the full stack traces, but they don't tell you much as to who submitted the threads. (Because submission takes place in other threads)

For the web socket ping interval, I believe this is the relevant source code. From RealWebSocket.java:

  public void initReaderAndWriter(
      String name, long pingIntervalMillis, Streams streams) throws IOException {
    synchronized (this) {
      this.streams = streams;
      this.writer = new WebSocketWriter(streams.client, streams.sink, random);
      this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
      if (pingIntervalMillis != 0) {
        executor.scheduleAtFixedRate(
            new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
      }
      if (!messageAndCloseQueue.isEmpty()) {
        runWriter(); // Send messages that were enqueued before we were connected.
      }
    }

Notice it is calling Util.threadFactory(name, false). The second parameter false sets a new thread to be non-daemon. So passing true will fix the issue.

For the other one, I think this is the source code. From Dispatcher.java:

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

Again, passing true to Util.threadFactory will fix the issue.

Unfortunately, both are inside the okhttp project outside the k8s client code. Interestingly, the comment of the main class OkHttpClient claims it is using daemon threads. So the above two lines could be simply bugs:

OkHttp also uses daemon threads for HTTP/2 connections. These will exit automatically if they remain idle.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great investigation! With that work I think we have enough information to know what to change in okhttp upstream for it to correctly adhere to its stated contract on daemon threads.

Do you feel ready to open an issue with accompanying PR on okhttp?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One can set the Dispatcher instance on okhttp clients also, such that it uses a daemon thread pool. See this in the context of retrofit: https://github.com/apache-spark-on-k8s/spark/pull/227/files#diff-716e31eb38cfd793ba5b2eea49cf5487R43

But Okhttp should certainly be doing this by default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Let's try to open an issue/PR on okhttp. I'll work on that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah Right. We can change k8s client to specify a custom dispatcher. Or a custom executor service like this code did. We should keep that as an option in case fixing that part of okhttp turns out harder than we imagined. Thanks!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kimoonkim did you end up opening an issue in okhttp upstream? I couldn't find the issue if you did when I looked at https://github.com/square/okhttp

square/okhttp#1890 seems like the most relevant issue.

I think the custom dispatcher fixes this anyway though

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. I was carried away by HDFS experiments. I'll open an issue today or tomorrow. Thanks for the pointer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the response in sqaure/okhttp#1890 says that the dispatcher shouldn't be a daemon. I'm going to file an issue for only the ping thread first.

try {
runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_))
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need a runningExecutorPods.clear() here to match the executorPodsByIPs.clear() below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the latest diff.

logDebug(s"Executor pod $podName at IP $podIP was deleted.")
EXECUTOR_PODS_BY_IPS_LOCK.synchronized {
executorPodsByIPs -= podIP
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to remove from runningExecutorPods here -- we currently handle intentional shutdown of executors (e.g. out of tasks) via doKillExecutors but I don't think we handle unintentional shutdown anywhere.

This should probably be the place to prune dead executors out of runningExecutorPods

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that this can be better handled by @varunkatta with the upcoming recovery handling code. His PR would also use the same watcher, but he has a lot more context of how different pieces work together for dead executors.

@kimoonkim
Copy link
Member Author

Thanks @ash211 for the review. I addressed remaining comments. So maybe good to merge after Jenkins passes.

@ash211
Copy link

ash211 commented Apr 26, 2017

My bad, I resolved the merge conflict incorrectly.

if (pendingTasks.nonEmpty) {
return pendingTasks
}
val backend = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].backend.asInstanceOf[
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again - put this in a separate file. TaskSetManager also has a TaskSchedulerImpl reference available to it - no need to extract it from the SparkContext.

KubernetesClusterSchedulerBackend]
val pod = backend.getExecutorPodByIP(executorIP)
if (pod.isEmpty) {
return pendingTasks // Empty
Copy link

@mccheah mccheah Apr 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using return in scala. Instead:

if (pod.isEmpty) {
  pendingTasks // Since this is the last evaluated statement it also becomes the return value
} else {
  // do other things
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*/
override def getPendingTasksForHost(executorIP: String): ArrayBuffer[Int] = {
var pendingTasks = super.getPendingTasksForHost(executorIP)
if (pendingTasks.nonEmpty) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using return in Scala (see below).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* only executor pod IPs may not match them.
*/
override def getPendingTasksForHost(executorIP: String): ArrayBuffer[Int] = {
var pendingTasks = super.getPendingTasksForHost(executorIP)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See if we can avoid using var here. Having a separate val for each kind of pendingTasks that we're checking and validating would make it clearer how each pendingTasks set was obtained. E.g.

val pendingTasksExecutorIP = super.getPendingTasksForHost(executorIP)
...
val pendingTasksClusterNodeName = super.getPendingTasksForHost(clusterNodeName)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (pendingTasks.isEmpty) {
pendingTasks = super.getPendingTasksForHost(clusterNodeIP)
}
if (pendingTasks.nonEmpty && log.isDebugEnabled) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check if debug logging is enabled - simply invoking logDebug will print the log or not based on the configuration anyways.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

EXECUTOR_PODS_BY_IPS_LOCK.synchronized {
executorPodsByIPs += ((podIP, pod))
}
} else if (action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe Action.DELETED and Action.ERROR should also be handled somewhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@mccheah
Copy link

mccheah commented Apr 26, 2017

It would be nice to have unit tests around this logic, but KubernetesClusterSchedulerBackend might not lend itself to be easily unit-tested right now. We can follow up on that in a separate patch - we can refactor the scheduler backend and test it in such a future diff.

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler,
TaskSchedulerImpl}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think imports should be wrapping lines - they shouldn't be subject to the character count limit.

@kimoonkim
Copy link
Member Author

@mccheah Thanks for the review. I was on the borderline for many things you commented on. Good to learn what we prefer.

And I agree unit tests will be nice for this PR. I'll work on that in a followup.

@kimoonkim
Copy link
Member Author

rerun unit tests please

@kimoonkim
Copy link
Member Author

Thanks for the reviews. Maybe it's ready for merge?

@kimoonkim
Copy link
Member Author

@ash211 @mccheah I wonder if we have more comments to address. This PR itself is not urgent, but it has overlapping code with other PRs like #244. Merging in this earlier may help settle other PRs as well.

@ash211
Copy link

ash211 commented May 10, 2017

Sorry for the delay here @kimoonkim -- this still looks good so I'll merge when this latest build passes

@ash211 ash211 merged commit 546f09c into apache-spark-on-k8s:branch-2.1-kubernetes May 10, 2017
@kimoonkim
Copy link
Member Author

@ash211 Thanks for the merge!

@kimoonkim kimoonkim deleted the dispatch-tasks-by-hdfs-node-locality branch May 10, 2017 15:26
@kimoonkim
Copy link
Member Author

Filed square/okhttp issue #3339 for the web socket ping thread.

foxish pushed a commit that referenced this pull request Jul 24, 2017
)

* Dispatch tasks to right executors that have tasks' input HDFS data on local disks

* Fix style issues

* Clean up unnecessary fields

* Clean up a misleading method name

* Address review comments

* Fix import ordering

* Delete executor pods in watcher

* Fix the driver hang by unblocking the main thread

* Fix import order

* Clear runningExecutorPods

* Fix incorrect merge

* Address review comments

* Clean up imports
puneetloya pushed a commit to puneetloya/spark that referenced this pull request Mar 11, 2019
…pache-spark-on-k8s#216)

* Dispatch tasks to right executors that have tasks' input HDFS data on local disks

* Fix style issues

* Clean up unnecessary fields

* Clean up a misleading method name

* Address review comments

* Fix import ordering

* Delete executor pods in watcher

* Fix the driver hang by unblocking the main thread

* Fix import order

* Clear runningExecutorPods

* Fix incorrect merge

* Address review comments

* Clean up imports
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants