-
Notifications
You must be signed in to change notification settings - Fork 117
Changes to support executor recovery behavior during static allocation. #244
Changes from 9 commits
a8831b7
c4b949f
d87d393
4dcb1b3
5a064ba
fbe4b18
4d60c3d
01e8ec7
5e1a143
1a579ce
608b08b
5cbea23
99b338d
57bb38b
2bc0ff4
e6bb8c2
b5bd8d1
1e2e49f
1131d2c
4e75491
8acefef
382278a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,22 +17,22 @@ | |
| package org.apache.spark.scheduler.cluster.kubernetes | ||
|
|
||
| import java.io.Closeable | ||
| import java.util.concurrent.TimeUnit | ||
| import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable | ||
| import scala.collection.JavaConverters._ | ||
| import scala.concurrent.{ExecutionContext, Future} | ||
|
|
||
| import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, | ||
| EnvVarSourceBuilder, Pod, QuantityBuilder} | ||
| import io.fabric8.kubernetes.api.model._ | ||
| import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} | ||
| import io.fabric8.kubernetes.client.Watcher.Action | ||
|
|
||
| import org.apache.spark.{SparkContext, SparkException} | ||
| import org.apache.spark.deploy.kubernetes.config._ | ||
| import org.apache.spark.deploy.kubernetes.constants._ | ||
| import org.apache.spark.rpc.RpcEndpointAddress | ||
| import org.apache.spark.scheduler.TaskSchedulerImpl | ||
| import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} | ||
| import org.apache.spark.scheduler._ | ||
| import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend | ||
| import org.apache.spark.util.{ThreadUtils, Utils} | ||
|
|
||
|
|
@@ -44,10 +44,14 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| import KubernetesClusterSchedulerBackend._ | ||
|
|
||
| private val RUNNING_EXECUTOR_PODS_LOCK = new Object | ||
| private val runningExecutorPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs. | ||
|
|
||
| private val runningExecutorsToPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs. | ||
| private val runningPodsToExecutors = new mutable.HashMap[Pod, String] // Indexed by executor Pods. | ||
|
||
| private val EXECUTOR_PODS_BY_IPS_LOCK = new Object | ||
| private val executorPodsByIPs = new mutable.HashMap[String, Pod] // Indexed by executor IP addrs. | ||
| private val FAILED_PODS_LOCK = new Object | ||
| private val failedPods = new mutable.HashMap[String, ExecutorLossReason] // Indexed by pod names. | ||
| private val EXECUTORS_TO_REMOVE_LOCK = new Object | ||
| private val executorsToRemove = new mutable.HashSet[String] | ||
|
|
||
| private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) | ||
| private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) | ||
|
|
@@ -76,8 +80,8 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( | ||
| ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) | ||
|
|
||
| private val kubernetesClient = new DriverPodKubernetesClientProvider(conf, kubernetesNamespace) | ||
| .get | ||
| private val kubernetesClient = new KubernetesClientBuilder(conf, kubernetesNamespace) | ||
| .buildFromWithinPod() | ||
|
|
||
| private val driverPod = try { | ||
| kubernetesClient.pods().inNamespace(kubernetesNamespace). | ||
|
|
@@ -88,22 +92,25 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| throw new SparkException(s"Executor cannot find driver pod", throwable) | ||
| } | ||
|
|
||
| override val minRegisteredRatio = | ||
| override val minRegisteredRatio: Double = | ||
|
||
| if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { | ||
| 0.8 | ||
| } else { | ||
| super.minRegisteredRatio | ||
| } | ||
|
|
||
| private val executorWatchResource = new AtomicReference[Closeable] | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be of type |
||
| private val executorCleanupScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( | ||
| "executor-recovery-worker") | ||
| protected var totalExpectedExecutors = new AtomicInteger(0) | ||
|
|
||
|
|
||
| private val driverUrl = RpcEndpointAddress( | ||
| sc.getConf.get("spark.driver.host"), | ||
| sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), | ||
| CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString | ||
|
|
||
| private val initialExecutors = getInitialTargetExecutorNumber(1) | ||
| private val initialExecutors = getInitialTargetExecutorNumber() | ||
|
|
||
| private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { | ||
| if (Utils.isDynamicAllocationEnabled(conf)) { | ||
|
|
@@ -133,20 +140,18 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| if (!Utils.isDynamicAllocationEnabled(sc.conf)) { | ||
| doRequestTotalExecutors(initialExecutors) | ||
| } | ||
| executorCleanupScheduler.scheduleWithFixedDelay(executorRecoveryRunnable, 0, | ||
| TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS) | ||
| } | ||
|
|
||
| override def stop(): Unit = { | ||
| // send stop message to executors so they shut down cleanly | ||
| super.stop() | ||
|
|
||
| // then delete the executor pods | ||
| // TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context. | ||
| // When using Utils.tryLogNonFatalError some of the code fails but without any logs or | ||
| // indication as to why. | ||
| try { | ||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) | ||
| runningExecutorPods.clear() | ||
| runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. |
||
| runningPodsToExecutors.clear() | ||
| } | ||
| EXECUTOR_PODS_BY_IPS_LOCK.synchronized { | ||
| executorPodsByIPs.clear() | ||
|
|
@@ -164,6 +169,8 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| } catch { | ||
| case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e) | ||
| } | ||
| executorCleanupScheduler.shutdown() | ||
| super.stop() | ||
|
||
| } | ||
|
|
||
| private def allocateNewExecutorPod(): (String, Pod) = { | ||
|
|
@@ -255,11 +262,15 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { | ||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| if (requestedTotal > totalExpectedExecutors.get) { | ||
| logInfo(s"Requesting ${requestedTotal - totalExpectedExecutors.get}" | ||
| logInfo(s"Requesting ${ | ||
| requestedTotal - totalExpectedExecutors.get | ||
| }" | ||
| + s" additional executors, expecting total $requestedTotal and currently" + | ||
| s" expected ${totalExpectedExecutors.get}") | ||
| for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) { | ||
| runningExecutorPods += allocateNewExecutorPod() | ||
| val (executorId, pod) = allocateNewExecutorPod() | ||
| runningExecutorsToPods.put(executorId, pod) | ||
| runningPodsToExecutors.put(pod, executorId) | ||
| } | ||
| } | ||
| totalExpectedExecutors.set(requestedTotal) | ||
|
|
@@ -270,8 +281,10 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { | ||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| for (executor <- executorIds) { | ||
| runningExecutorPods.remove(executor) match { | ||
| case Some(pod) => kubernetesClient.pods().delete(pod) | ||
| runningExecutorsToPods.remove(executor) match { | ||
| case Some(pod) => | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is some scala community debate about whether There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Strongly prefer not to match on Options anywhere. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks..this is addressed now. |
||
| kubernetesClient.pods().delete(pod) | ||
| runningPodsToExecutors.remove(pod) | ||
| case None => logWarning(s"Unable to remove pod for unknown executor $executor") | ||
| } | ||
| } | ||
|
|
@@ -287,17 +300,20 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
|
|
||
| private class ExecutorPodsWatcher extends Watcher[Pod] { | ||
|
|
||
| private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1 | ||
|
|
||
| override def eventReceived(action: Action, pod: Pod): Unit = { | ||
|
|
||
| if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running" | ||
| && pod.getMetadata.getDeletionTimestamp == null) { | ||
| && pod.getMetadata.getDeletionTimestamp == null) { | ||
|
||
| val podIP = pod.getStatus.getPodIP | ||
| val clusterNodeName = pod.getSpec.getNodeName | ||
| logDebug(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.") | ||
| EXECUTOR_PODS_BY_IPS_LOCK.synchronized { | ||
| executorPodsByIPs += ((podIP, pod)) | ||
| } | ||
| } else if ((action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) || | ||
| action == Action.DELETED || action == Action.ERROR) { | ||
| action == Action.DELETED || action == Action.ERROR) { | ||
|
||
| val podName = pod.getMetadata.getName | ||
| val podIP = pod.getStatus.getPodIP | ||
| logDebug(s"Executor pod $podName at IP $podIP was at $action.") | ||
|
|
@@ -306,16 +322,168 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| executorPodsByIPs -= podIP | ||
| } | ||
| } | ||
| if (action == Action.ERROR) { | ||
| logInfo(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason) | ||
| handleErroredPod(pod) | ||
| } | ||
| else if (action == Action.DELETED) { | ||
|
||
| logInfo(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason) | ||
| handleDeletedPod(pod) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| override def onClose(cause: KubernetesClientException): Unit = { | ||
| logDebug("Executor pod watch closed.", cause) | ||
| } | ||
|
|
||
| def getContainerExitStatus(pod: Pod): Int = { | ||
|
||
| val containerStatuses = pod.getStatus.getContainerStatuses | ||
| if (!containerStatuses.isEmpty) { | ||
| return getContainerExitStatus(containerStatuses.get(0)) | ||
| } | ||
| DEFAULT_CONTAINER_FAILURE_EXIT_STATUS | ||
| } | ||
|
|
||
| def getContainerExitStatus(containerStatus: ContainerStatus): Int = { | ||
| containerStatus.getState.getTerminated.getExitCode.intValue | ||
| } | ||
|
|
||
| def handleErroredPod(pod: Pod): Unit = { | ||
| def isPodAlreadyReleased(pod: Pod): Boolean = { | ||
|
||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| runningPodsToExecutors.keySet.foreach(runningPod => | ||
|
||
| if (runningPod.getMetadata.getName == pod.getMetadata.getName) { | ||
| return false | ||
| } | ||
| ) | ||
| } | ||
| true | ||
| } | ||
| val alreadyReleased = isPodAlreadyReleased(pod) | ||
| val containerExitStatus = getContainerExitStatus(pod) | ||
| // container was probably actively killed by the driver. | ||
| val exitReason = if (alreadyReleased) { | ||
| ExecutorExited(containerExitStatus, exitCausedByApp = false, | ||
| s"Container in pod " + pod.getMetadata.getName + | ||
| " exited from explicit termination request.") | ||
| } else { | ||
| val containerExitReason = containerExitStatus match { | ||
| case VMEM_EXCEEDED_EXIT_CODE | PMEM_EXCEEDED_EXIT_CODE => | ||
| memLimitExceededLogMessage(pod.getStatus.getReason) | ||
| case _ => | ||
| // Here we can't be sure that that exit was caused by the application but this seems | ||
| // to be the right default since we know the pod was not explicitly deleted by | ||
| // the user. | ||
| "Pod exited with following container exit status code " + containerExitStatus | ||
| } | ||
| ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason) | ||
| } | ||
| FAILED_PODS_LOCK.synchronized { | ||
|
||
| failedPods.put(pod.getMetadata.getName, exitReason) | ||
| } | ||
| } | ||
|
|
||
| def handleDeletedPod(pod: Pod): Unit = { | ||
| val exitReason = ExecutorExited(getContainerExitStatus(pod), exitCausedByApp = false, | ||
| "Pod " + pod.getMetadata.getName + " deleted or lost.") | ||
| FAILED_PODS_LOCK.synchronized { | ||
| failedPods.put(pod.getMetadata.getName, exitReason) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { | ||
| new KubernetesDriverEndpoint(rpcEnv, properties) | ||
| } | ||
|
|
||
| private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) | ||
| extends DriverEndpoint(rpcEnv, sparkProperties) { | ||
|
|
||
| override def onDisconnected(rpcAddress: RpcAddress): Unit = { | ||
| addressToExecutorId.get(rpcAddress).foreach { executorId => | ||
| if (disableExecutor(executorId)) { | ||
| EXECUTORS_TO_REMOVE_LOCK.synchronized { | ||
| executorsToRemove.add(executorId) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private val executorRecoveryRunnable: Runnable = new Runnable { | ||
|
|
||
| private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10 | ||
|
||
| private val executorsToRecover = new mutable.HashSet[String] | ||
| // Maintains a map of executor id to count of checks performed to learn the loss reason | ||
| // for an executor. | ||
| private val executorReasonChecks = new mutable.HashMap[String, Int] | ||
|
||
|
|
||
| override def run(): Unit = removeFailedAndRequestNewExecutors() | ||
|
|
||
| def removeFailedAndRequestNewExecutors(): Unit = { | ||
| val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| runningExecutorsToPods.toMap | ||
|
||
| } | ||
| val localFailedPods = FAILED_PODS_LOCK.synchronized { | ||
| failedPods.toMap | ||
| } | ||
| val localExecutorsToRemove = EXECUTORS_TO_REMOVE_LOCK.synchronized { | ||
| executorsToRemove.toSet | ||
| } | ||
| localExecutorsToRemove.foreach { case (executorId) => | ||
| localRunningExecutorsToPods.get(executorId) match { | ||
| case Some(pod) => | ||
| localFailedPods.get(pod.getMetadata.getName) match { | ||
| case Some(executorExited: ExecutorExited) => | ||
| logDebug(s"Removing executor $executorId with loss reason " | ||
| + executorExited.message) | ||
| removeExecutor(executorId, executorExited) | ||
| if (!executorExited.exitCausedByApp) { | ||
| executorsToRecover.add(executorId) | ||
|
||
| } | ||
| case None => | ||
| removeExecutorOrIncrementLossReasonCheckCount(executorId) | ||
| } | ||
| case None => | ||
| removeExecutorOrIncrementLossReasonCheckCount(executorId) | ||
| } | ||
| } | ||
| executorsToRecover.foreach(executorId => | ||
| EXECUTORS_TO_REMOVE_LOCK.synchronized { | ||
| executorsToRemove -= executorId | ||
| executorReasonChecks -= executorId | ||
| } | ||
| ) | ||
| if (executorsToRecover.nonEmpty) { | ||
| requestExecutors(executorsToRecover.size) | ||
|
||
| } | ||
| executorsToRecover.clear() | ||
| } | ||
|
|
||
|
|
||
| def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = { | ||
| val reasonCheckCount = executorReasonChecks.getOrElse(executorId, 0) | ||
| if (reasonCheckCount > MAX_EXECUTOR_LOST_REASON_CHECKS) { | ||
| removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons")) | ||
| executorsToRecover.add(executorId) | ||
| executorReasonChecks -= executorId | ||
| } else { | ||
| executorReasonChecks.put(executorId, reasonCheckCount + 1) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private object KubernetesClusterSchedulerBackend { | ||
| private val DEFAULT_STATIC_PORT = 10000 | ||
| private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) | ||
| private val VMEM_EXCEEDED_EXIT_CODE = -103 | ||
| private val PMEM_EXCEEDED_EXIT_CODE = -104 | ||
|
|
||
| def memLimitExceededLogMessage(diagnostics: String): String = { | ||
| s"Pod/Container killed for exceeding memory limits. $diagnostics" + | ||
| " Consider boosting spark executor memory overhead." | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused?