-
Notifications
You must be signed in to change notification settings - Fork 117
Dispatch tasks to right executors that have tasks' input HDFS data #216
Changes from 4 commits
200ce24
7499e3b
66e79d6
46f1140
23d287f
a026cc1
f56f3f9
177e1eb
a94522a
4a7738e
2aa7c6a
a772e7f
6b1e4b4
fef7ebc
e07b084
b3855d6
7085995
ee958b3
dc0755a
4ce3066
80decdc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,15 +16,50 @@ | |
| */ | ||
| package org.apache.spark.scheduler.cluster.kubernetes | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark.SparkContext | ||
| import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} | ||
| import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, | ||
| TaskScheduler, TaskSchedulerImpl, TaskSet, TaskSetManager} | ||
|
|
||
| private[spark] class KubernetesClusterManager extends ExternalClusterManager { | ||
|
|
||
| override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") | ||
|
|
||
| override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { | ||
| val scheduler = new TaskSchedulerImpl(sc) | ||
| val scheduler = new TaskSchedulerImpl(sc) { | ||
|
|
||
| override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { | ||
| new TaskSetManager(sched = this, taskSet, maxTaskFailures) { | ||
|
|
||
| // Returns preferred tasks for an executor that may have local data there, | ||
| // using the physical cluster node name that it is running on. | ||
|
||
| override def getPendingTasksForHost(executorIP: String): ArrayBuffer[Int] = { | ||
| var pendingTasks = super.getPendingTasksForHost(executorIP) | ||
|
||
| if (pendingTasks.nonEmpty) { | ||
|
||
| return pendingTasks | ||
| } | ||
| val backend = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].backend.asInstanceOf[ | ||
|
||
| KubernetesClusterSchedulerBackend] | ||
| val pod = backend.getExecutorPodByIP(executorIP) | ||
| if (pod.isEmpty) { | ||
| return pendingTasks // Empty | ||
|
||
| } | ||
| val clusterNodeName = pod.get.getSpec.getNodeName | ||
| val clusterNodeIP = pod.get.getStatus.getHostIP | ||
| pendingTasks = super.getPendingTasksForHost(pod.get.getSpec.getNodeName) | ||
|
||
| if (pendingTasks.isEmpty) { | ||
| pendingTasks = super.getPendingTasksForHost(pod.get.getStatus.getHostIP) | ||
| } | ||
| if (pendingTasks.nonEmpty) { | ||
| logInfo(s"Got preferred task list $pendingTasks for executor host $executorIP " + | ||
|
||
| s"using cluster node $clusterNodeName at $clusterNodeIP") | ||
| } | ||
| pendingTasks | ||
| } | ||
| } | ||
| } | ||
| } | ||
| sc.taskScheduler = scheduler | ||
| scheduler | ||
| } | ||
|
|
@@ -37,6 +72,5 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager { | |
| override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { | ||
| scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) | ||
| } | ||
|
|
||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,12 +16,17 @@ | |
| */ | ||
| package org.apache.spark.scheduler.cluster.kubernetes | ||
|
|
||
| import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} | ||
| import java.io.Closeable | ||
| import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} | ||
|
|
||
| import com.google.common.util.concurrent.SettableFuture | ||
| import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder} | ||
| import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher} | ||
| import io.fabric8.kubernetes.client.Watcher.Action | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable.HashMap | ||
| import scala.concurrent.{ExecutionContext, Future} | ||
|
|
||
| import org.apache.spark.{SparkContext, SparkException} | ||
| import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder | ||
| import org.apache.spark.deploy.kubernetes.config._ | ||
|
|
@@ -39,7 +44,9 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| import KubernetesClusterSchedulerBackend._ | ||
|
|
||
| private val EXECUTOR_MODIFICATION_LOCK = new Object | ||
| private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod] | ||
| private val runningExecutorPods = new HashMap[String, Pod] // Indexed by IDs. | ||
| private val executorWatchResources = new HashMap[String, Closeable] // Indexed by IDs. | ||
| private val executorPodsByIPs = new HashMap[String, Pod] // Indexed by IP addrs. | ||
|
||
|
|
||
| private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) | ||
| private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) | ||
|
|
@@ -135,6 +142,8 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| // indication as to why. | ||
| try { | ||
| runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) | ||
| executorWatchResources.values.foreach(_.close) | ||
| executorPodsByIPs.clear() | ||
|
||
| } catch { | ||
| case e: Throwable => logError("Uncaught exception while shutting down controllers.", e) | ||
| } | ||
|
|
@@ -144,6 +153,7 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| case e: Throwable => logError("Uncaught exception while shutting down driver service.", e) | ||
| } | ||
| try { | ||
| logInfo("Closing kubernetes client") | ||
| kubernetesClient.close() | ||
| } catch { | ||
| case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e) | ||
|
|
@@ -236,7 +246,16 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| + s" additional executors, expecting total $requestedTotal and currently" + | ||
| s" expected ${totalExpectedExecutors.get}") | ||
| for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) { | ||
| runningExecutorPods += allocateNewExecutorPod() | ||
| val (executorId, pod) = allocateNewExecutorPod() | ||
| logInfo(s"Allocated executor $executorId") | ||
| runningExecutorPods += ((executorId, pod)) | ||
| val podReadyFuture = SettableFuture.create[Pod] | ||
|
||
| val podWatcher = new ExecutorPodReadyWatcher(podReadyFuture) | ||
| val watchConnectionManager = kubernetesClient | ||
|
||
| .pods() | ||
| .withName(pod.getMetadata.getName) | ||
| .watch(podWatcher) | ||
| executorWatchResources += ((executorId, watchConnectionManager)) | ||
| } | ||
| } | ||
| totalExpectedExecutors.set(requestedTotal) | ||
|
|
@@ -249,12 +268,46 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| for (executor <- executorIds) { | ||
| runningExecutorPods.remove(executor) match { | ||
| case Some(pod) => kubernetesClient.pods().delete(pod) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. push the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| executorWatchResources.remove(executor).foreach(_.close) | ||
| executorPodsByIPs.remove(pod.getStatus.getPodIP) | ||
| case None => logWarning(s"Unable to remove pod for unknown executor $executor") | ||
| } | ||
| } | ||
| } | ||
| true | ||
| } | ||
|
|
||
| def getExecutorPodByIP(podIP: String): Option[Pod] = { | ||
| EXECUTOR_MODIFICATION_LOCK.synchronized { | ||
| executorPodsByIPs.get(podIP) | ||
| } | ||
| } | ||
|
|
||
| private class ExecutorPodReadyWatcher(resolvedExecutorPod: SettableFuture[Pod]) | ||
| extends Watcher[Pod] { | ||
|
|
||
| override def eventReceived(action: Action, pod: Pod): Unit = { | ||
| if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running" | ||
| && !resolvedExecutorPod.isDone) { | ||
| pod.getStatus | ||
| .getContainerStatuses | ||
| .asScala | ||
| .find(status => status.getReady) | ||
| .foreach { _ => resolvedExecutorPod.set(pod) } | ||
| val podName = pod.getMetadata.getName | ||
| val podIP = pod.getStatus.getPodIP | ||
| val clusterNodeName = pod.getSpec.getNodeName | ||
| logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.") | ||
| EXECUTOR_MODIFICATION_LOCK.synchronized { | ||
| executorPodsByIPs += ((podIP, pod)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| override def onClose(cause: KubernetesClientException): Unit = { | ||
| logDebug("Executor pod readiness watch closed.", cause) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private object KubernetesClusterSchedulerBackend { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would much prefer this to be a separate class file, e.g.
KubernetesTaskSchedulerImpl.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.