Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
200ce24
Dispatch tasks to right executors that have tasks' input HDFS data on…
kimoonkim Apr 4, 2017
7499e3b
Fix style issues
kimoonkim Apr 5, 2017
66e79d6
Clean up unnecessary fields
kimoonkim Apr 5, 2017
46f1140
Clean up a misleading method name
kimoonkim Apr 5, 2017
23d287f
Address review comments
kimoonkim Apr 5, 2017
a026cc1
Sync and resolve conflict
kimoonkim Apr 5, 2017
f56f3f9
Fix import ordering
kimoonkim Apr 6, 2017
177e1eb
Delete executor pods in watcher
kimoonkim Apr 7, 2017
a94522a
Fix the driver hang by unblocking the main thread
kimoonkim Apr 7, 2017
4a7738e
Fix import order
kimoonkim Apr 7, 2017
2aa7c6a
Merge branch 'branch-2.1-kubernetes' into dispatch-tasks-by-hdfs-node…
ash211 Apr 19, 2017
a772e7f
Merge branch 'branch-2.1-kubernetes' into dispatch-tasks-by-hdfs-node…
kimoonkim Apr 21, 2017
6b1e4b4
Merge remote-tracking branch 'origin/dispatch-tasks-by-hdfs-node-loca…
kimoonkim Apr 22, 2017
fef7ebc
Clear runningExecutorPods
kimoonkim Apr 22, 2017
e07b084
Merge branch 'branch-2.1-kubernetes' into dispatch-tasks-by-hdfs-node…
ash211 Apr 25, 2017
b3855d6
Fix incorrect merge
ash211 Apr 26, 2017
7085995
Address review comments
kimoonkim Apr 26, 2017
ee958b3
Merge remote-tracking branch 'origin/dispatch-tasks-by-hdfs-node-loca…
kimoonkim Apr 26, 2017
dc0755a
Clean up imports
kimoonkim Apr 26, 2017
4ce3066
Merge branch 'branch-2.1-kubernetes' into dispatch-tasks-by-hdfs-node…
ash211 May 10, 2017
80decdc
Merge branch 'branch-2.1-kubernetes' into dispatch-tasks-by-hdfs-node…
ash211 May 10, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private[spark] class TaskSetManager(
* Return the pending tasks list for a given host, or an empty list if
* there is no map entry for that host
*/
private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
protected def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
pendingTasksForHost.getOrElse(host, ArrayBuffer())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,50 @@
*/
package org.apache.spark.scheduler.cluster.kubernetes

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend,
TaskScheduler, TaskSchedulerImpl, TaskSet, TaskSetManager}

private[spark] class KubernetesClusterManager extends ExternalClusterManager {

override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")

override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
val scheduler = new TaskSchedulerImpl(sc)
val scheduler = new TaskSchedulerImpl(sc) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would much prefer this to be a separate class file, e.g. KubernetesTaskSchedulerImpl.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = {
new TaskSetManager(sched = this, taskSet, maxTaskFailures) {

// Returns preferred tasks for an executor that may have local data there,
// using the physical cluster node name that it is running on.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overrides the lookup to look not just for the pod's IP but also for the pod's node name and the pod's host IP address. Where in the parent class would executors get registered with those node names / host IP addresses?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the spark driver is about to execute a stage, it creates a new task set and a new instance of this TaskSetManager. The constructor of TaskSetManager then populates this and other locality-related maps with the preferred locations of each task in the task set.

Here's the relevant method addPendingTask, which will be called for each task index:

  /** Add a task to all the pending-task lists that it should be on. */
  private def addPendingTask(index: Int) {
    for (loc <- tasks(index).preferredLocations) {
      loc match {
        case e: ExecutorCacheTaskLocation =>
          pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
        case e: HDFSCacheTaskLocation =>
          val exe = sched.getExecutorsAliveOnHost(loc.host)
          exe match {
            case Some(set) =>
              for (e <- set) {
                pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
              }
              logInfo(s"Pending task $index has a cached location at ${e.host} " +
                ", where there are executors " + set.mkString(","))
            case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
                ", but there are no executors alive there.")
          }
        case _ =>
      }
      pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
      for (rack <- sched.getRackForHost(loc.host)) {
        pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
      }
    }

override def getPendingTasksForHost(executorIP: String): ArrayBuffer[Int] = {
var pendingTasks = super.getPendingTasksForHost(executorIP)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See if we can avoid using var here. Having a separate val for each kind of pendingTasks that we're checking and validating would make it clearer how each pendingTasks set was obtained. E.g.

val pendingTasksExecutorIP = super.getPendingTasksForHost(executorIP)
...
val pendingTasksClusterNodeName = super.getPendingTasksForHost(clusterNodeName)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (pendingTasks.nonEmpty) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using return in Scala (see below).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return pendingTasks
}
val backend = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].backend.asInstanceOf[
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again - put this in a separate file. TaskSetManager also has a TaskSchedulerImpl reference available to it - no need to extract it from the SparkContext.

KubernetesClusterSchedulerBackend]
val pod = backend.getExecutorPodByIP(executorIP)
if (pod.isEmpty) {
return pendingTasks // Empty
Copy link

@mccheah mccheah Apr 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using return in scala. Instead:

if (pod.isEmpty) {
  pendingTasks // Since this is the last evaluated statement it also becomes the return value
} else {
  // do other things
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
val clusterNodeName = pod.get.getSpec.getNodeName
val clusterNodeIP = pod.get.getStatus.getHostIP
pendingTasks = super.getPendingTasksForHost(pod.get.getSpec.getNodeName)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a bit have a bit more comments explaining why you're doing this.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it's a bit confusing if you use clusterNodeName and clusterNodeIP in the log statement but you used pod.get.getSpec.getXXXX in these calls. I think it's better just to use the variables you declared

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. My bad. Failed to clean up some early code. Also added more comments. PTAL. Thanks!

if (pendingTasks.isEmpty) {
pendingTasks = super.getPendingTasksForHost(pod.get.getStatus.getHostIP)
}
if (pendingTasks.nonEmpty) {
logInfo(s"Got preferred task list $pendingTasks for executor host $executorIP " +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be logDebug instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. Done.

s"using cluster node $clusterNodeName at $clusterNodeIP")
}
pendingTasks
}
}
}
}
sc.taskScheduler = scheduler
scheduler
}
Expand All @@ -37,6 +72,5 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager {
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
*/
package org.apache.spark.scheduler.cluster.kubernetes

import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.io.Closeable
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}

import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder}
import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder
import org.apache.spark.deploy.kubernetes.config._
Expand All @@ -39,7 +44,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
import KubernetesClusterSchedulerBackend._

private val EXECUTOR_MODIFICATION_LOCK = new Object
private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod]
private val runningExecutorPods = new HashMap[String, Pod] // Indexed by IDs.
private val executorWatchResources = new HashMap[String, Closeable] // Indexed by IDs.
private val executorPodsByIPs = new HashMap[String, Pod] // Indexed by IP addrs.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the scala convention is that unqualified HashMap is often an immutable hashmap, so it might be surprising to a scala developer that these are mutable. Maybe import the mutable package and refer to these by new mutable.HashMap

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
Also can you elaborate what ID is this indexed on

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Good to learn. Thanks!


private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE)
private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
Expand Down Expand Up @@ -135,6 +142,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
// indication as to why.
try {
runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_))
executorWatchResources.values.foreach(_.close)
executorPodsByIPs.clear()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused when we need to use the EXECUTOR_MODIFICATION_LOCK lock. The lookup in the method is locking it, but in places where we're clearing and modifying it we're not really locking anything.

I think if we're explicitly assuming there is only one possible execution of this method we should be more explicit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think we should lock here as well to be on the safe side, unless someone objects to it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have simplified locking a bit in the latest diff. Now executorPodsByIPs is guarded by its own lock and add/delete is done only by the watcher. Hopefully, this makes it easier to separate locking concerns going forward.

} catch {
case e: Throwable => logError("Uncaught exception while shutting down controllers.", e)
}
Expand All @@ -144,6 +153,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
case e: Throwable => logError("Uncaught exception while shutting down driver service.", e)
}
try {
logInfo("Closing kubernetes client")
kubernetesClient.close()
} catch {
case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e)
Expand Down Expand Up @@ -236,7 +246,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
+ s" additional executors, expecting total $requestedTotal and currently" +
s" expected ${totalExpectedExecutors.get}")
for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) {
runningExecutorPods += allocateNewExecutorPod()
val (executorId, pod) = allocateNewExecutorPod()
logInfo(s"Allocated executor $executorId")
runningExecutorPods += ((executorId, pod))
val podReadyFuture = SettableFuture.create[Pod]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem to be used anywhere -- maybe you can eliminate the SettableFuture here and also as a parameter into ExecutorPodReadyWatcher

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Removed.

val podWatcher = new ExecutorPodReadyWatcher(podReadyFuture)
val watchConnectionManager = kubernetesClient
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of opening a watch for each executor pod, we should try to open a single watch on all executor pods (by the executor pod selectors) to receive all events. That way we don't have N concurrent watches open, which each have an active connection with the apiserver. It also will hopefully simplify lifecycle tracking for these N watches

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great suggestion. I was curious how to do it. The selector idea makes sense and it does clean up. Thanks!

.pods()
.withName(pod.getMetadata.getName)
.watch(podWatcher)
executorWatchResources += ((executorId, watchConnectionManager))
}
}
totalExpectedExecutors.set(requestedTotal)
Expand All @@ -249,12 +268,46 @@ private[spark] class KubernetesClusterSchedulerBackend(
for (executor <- executorIds) {
runningExecutorPods.remove(executor) match {
case Some(pod) => kubernetesClient.pods().delete(pod)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push the kubernetesClient.pods().delete(pod) onto a new line too so it lines up with the others in this block

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

executorWatchResources.remove(executor).foreach(_.close)
executorPodsByIPs.remove(pod.getStatus.getPodIP)
case None => logWarning(s"Unable to remove pod for unknown executor $executor")
}
}
}
true
}

def getExecutorPodByIP(podIP: String): Option[Pod] = {
EXECUTOR_MODIFICATION_LOCK.synchronized {
executorPodsByIPs.get(podIP)
}
}

private class ExecutorPodReadyWatcher(resolvedExecutorPod: SettableFuture[Pod])
extends Watcher[Pod] {

override def eventReceived(action: Action, pod: Pod): Unit = {
if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running"
&& !resolvedExecutorPod.isDone) {
pod.getStatus
.getContainerStatuses
.asScala
.find(status => status.getReady)
.foreach { _ => resolvedExecutorPod.set(pod) }
val podName = pod.getMetadata.getName
val podIP = pod.getStatus.getPodIP
val clusterNodeName = pod.getSpec.getNodeName
logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
EXECUTOR_MODIFICATION_LOCK.synchronized {
executorPodsByIPs += ((podIP, pod))
}
}
}

override def onClose(cause: KubernetesClientException): Unit = {
logDebug("Executor pod readiness watch closed.", cause)
}
}
}

private object KubernetesClusterSchedulerBackend {
Expand Down