@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster.kubernetes
1919import java .io .Closeable
2020import java .net .InetAddress
2121import java .util .Collections
22- import java .util .concurrent .{ConcurrentHashMap , TimeUnit }
22+ import java .util .concurrent .{ConcurrentHashMap , ExecutorService , ScheduledExecutorService , ThreadPoolExecutor , TimeUnit }
2323import java .util .concurrent .atomic .{AtomicInteger , AtomicLong , AtomicReference }
2424
2525import io .fabric8 .kubernetes .api .model ._
@@ -29,25 +29,28 @@ import scala.collection.{concurrent, mutable}
2929import scala .collection .JavaConverters ._
3030import scala .concurrent .{ExecutionContext , Future }
3131
32- import org .apache .spark .{SparkContext , SparkEnv , SparkException }
32+ import org .apache .spark .{SparkEnv , SparkException }
3333import org .apache .spark .deploy .kubernetes .config ._
3434import org .apache .spark .deploy .kubernetes .constants ._
3535import org .apache .spark .rpc .{RpcAddress , RpcCallContext , RpcEndpointAddress , RpcEnv }
3636import org .apache .spark .scheduler .{ExecutorExited , SlaveLost , TaskSchedulerImpl }
3737import org .apache .spark .scheduler .cluster .CoarseGrainedClusterMessages .{RetrieveSparkAppConfig , SparkAppConfig }
3838import org .apache .spark .scheduler .cluster .CoarseGrainedSchedulerBackend
39- import org .apache .spark .util .{ ThreadUtils , Utils }
39+ import org .apache .spark .util .Utils
4040
4141private [spark] class KubernetesClusterSchedulerBackend (
4242 scheduler : TaskSchedulerImpl ,
43- val sc : SparkContext ,
43+ rpcEnv : RpcEnv ,
4444 executorPodFactory : ExecutorPodFactory ,
4545 shuffleManager : Option [KubernetesExternalShuffleManager ],
46- kubernetesClient : KubernetesClient )
47- extends CoarseGrainedSchedulerBackend (scheduler, sc.env.rpcEnv) {
46+ kubernetesClient : KubernetesClient ,
47+ allocatorExecutor : ScheduledExecutorService ,
48+ requestExecutorsService : ExecutorService )
49+ extends CoarseGrainedSchedulerBackend (scheduler, rpcEnv) {
4850
4951 import KubernetesClusterSchedulerBackend ._
5052
53+ private val EXECUTOR_ID_COUNTER = new AtomicLong (0L )
5154 private val RUNNING_EXECUTOR_PODS_LOCK = new Object
5255 // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
5356 private val runningExecutorsToPods = new mutable.HashMap [String , Pod ]
@@ -57,21 +60,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
5760 private val EXECUTOR_PODS_BY_IPS_LOCK = new Object
5861 // Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK
5962 private val executorPodsByIPs = new mutable.HashMap [String , Pod ]
60- private val failedPods : concurrent.Map [String , ExecutorExited ] = new
61- ConcurrentHashMap [String , ExecutorExited ]().asScala
62- private val executorsToRemove = Collections .newSetFromMap[ String ](
63- new ConcurrentHashMap [String , java.lang. Boolean ]() ).asScala
63+ private val podsWithKnownExitReasons : concurrent.Map [String , ExecutorExited ] =
64+ new ConcurrentHashMap [String , ExecutorExited ]().asScala
65+ private val disconnectedPodsByExecutorIdPendingRemoval =
66+ new ConcurrentHashMap [String , Pod ]( ).asScala
6467
6568 private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE )
6669
6770 private val kubernetesDriverPodName = conf
6871 .get(KUBERNETES_DRIVER_POD_NAME )
6972 .getOrElse(
7073 throw new SparkException (" Must specify the driver pod name" ))
71- private val executorPodNamePrefix = conf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX )
72-
7374 private implicit val requestExecutorContext = ExecutionContext .fromExecutorService(
74- ThreadUtils .newDaemonCachedThreadPool( " kubernetes-executor-requests " ) )
75+ requestExecutorsService )
7576
7677 private val driverPod = try {
7778 kubernetesClient.pods().inNamespace(kubernetesNamespace).
@@ -93,9 +94,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
9394 protected var totalExpectedExecutors = new AtomicInteger (0 )
9495
9596 private val driverUrl = RpcEndpointAddress (
96- sc.getConf .get(" spark.driver.host" ),
97- sc.getConf .getInt(" spark.driver.port" , DEFAULT_DRIVER_PORT ),
98- CoarseGrainedSchedulerBackend .ENDPOINT_NAME ).toString
97+ conf .get(" spark.driver.host" ),
98+ conf .getInt(" spark.driver.port" , DEFAULT_DRIVER_PORT ),
99+ CoarseGrainedSchedulerBackend .ENDPOINT_NAME ).toString
99100
100101 private val initialExecutors = getInitialTargetExecutorNumber()
101102
@@ -109,21 +110,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
109110 s " ${KUBERNETES_ALLOCATION_BATCH_SIZE } " +
110111 s " is ${podAllocationSize}, should be a positive integer " )
111112
112- private val allocator = ThreadUtils
113- .newDaemonSingleThreadScheduledExecutor(" kubernetes-pod-allocator" )
113+ private val allocatorRunnable = new Runnable {
114114
115- private val allocatorRunnable : Runnable = new Runnable {
116-
117- // Number of times we are allowed check for the loss reason for an executor before we give up
118- // and assume the executor failed for good, and attribute it to a framework fault.
119- private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
120- private val executorsToRecover = new mutable.HashSet [String ]
121115 // Maintains a map of executor id to count of checks performed to learn the loss reason
122116 // for an executor.
123- private val executorReasonChecks = new mutable.HashMap [String , Int ]
117+ private val executorReasonCheckAttemptCounts = new mutable.HashMap [String , Int ]
124118
125119 override def run (): Unit = {
126- removeFailedExecutors ()
120+ handleDisconnectedExecutors ()
127121 RUNNING_EXECUTOR_PODS_LOCK .synchronized {
128122 if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) {
129123 logDebug(" Waiting for pending executors before scaling" )
@@ -132,7 +126,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
132126 } else {
133127 val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
134128 for (i <- 0 until math.min(
135- totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
129+ totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
136130 val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount)
137131 runningExecutorsToPods.put(executorId, pod)
138132 runningPodsToExecutors.put(pod.getMetadata.getName, executorId)
@@ -143,43 +137,47 @@ private[spark] class KubernetesClusterSchedulerBackend(
143137 }
144138 }
145139
146- def removeFailedExecutors (): Unit = {
147- val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK .synchronized {
148- runningExecutorsToPods.toMap
149- }
150- executorsToRemove.foreach { case (executorId) =>
151- localRunningExecutorsToPods.get(executorId).map { pod : Pod =>
152- failedPods.get(pod.getMetadata.getName).map { executorExited : ExecutorExited =>
153- logDebug(s " Removing executor $executorId with loss reason " + executorExited.message)
154- removeExecutor(executorId, executorExited)
155- if (! executorExited.exitCausedByApp) {
156- executorsToRecover.add(executorId)
157- }
158- }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))
159- }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))
160-
161- executorsToRecover.foreach(executorId => {
162- executorsToRemove -= executorId
163- executorReasonChecks -= executorId
164- RUNNING_EXECUTOR_PODS_LOCK .synchronized {
165- runningExecutorsToPods.remove(executorId).map { pod : Pod =>
166- kubernetesClient.pods().delete(pod)
167- runningPodsToExecutors.remove(pod.getMetadata.getName)
168- }.getOrElse(logWarning(s " Unable to remove pod for unknown executor $executorId" ))
140+ def handleDisconnectedExecutors (): Unit = {
141+ // For each disconnected executor, synchronize with the loss reasons that may have been found
142+ // by the executor pod watcher. If the loss reason was discovered by the watcher,
143+ // inform the parent class with removeExecutor.
144+ val disconnectedPodsByExecutorIdPendingRemovalCopy =
145+ Map .empty ++ disconnectedPodsByExecutorIdPendingRemoval
146+ disconnectedPodsByExecutorIdPendingRemovalCopy.foreach { case (executorId, executorPod) =>
147+ val knownExitReason = podsWithKnownExitReasons.remove(executorPod.getMetadata.getName)
148+ knownExitReason.fold {
149+ removeExecutorOrIncrementLossReasonCheckCount(executorId)
150+ } { executorExited =>
151+ logDebug(s " Removing executor $executorId with loss reason " + executorExited.message)
152+ removeExecutor(executorId, executorExited)
153+ // We keep around executors that have exit conditions caused by the application. This
154+ // allows them to be debugged later on. Otherwise, mark them as to be deleted from the
155+ // the API server.
156+ if (! executorExited.exitCausedByApp) {
157+ deleteExecutorFromClusterAndDataStructures(executorId)
169158 }
170- })
171- executorsToRecover.clear()
159+ }
172160 }
173161 }
174162
175163 def removeExecutorOrIncrementLossReasonCheckCount (executorId : String ): Unit = {
176- val reasonCheckCount = executorReasonChecks.getOrElse(executorId, 0 )
177- if (reasonCheckCount > MAX_EXECUTOR_LOST_REASON_CHECKS ) {
178- removeExecutor(executorId, SlaveLost (" Executor lost for unknown reasons" ))
179- executorsToRecover.add(executorId)
180- executorReasonChecks -= executorId
164+ val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0 )
165+ if (reasonCheckCount >= MAX_EXECUTOR_LOST_REASON_CHECKS ) {
166+ removeExecutor(executorId, SlaveLost (" Executor lost for unknown reasons." ))
167+ deleteExecutorFromClusterAndDataStructures(executorId)
181168 } else {
182- executorReasonChecks.put(executorId, reasonCheckCount + 1 )
169+ executorReasonCheckAttemptCounts.put(executorId, reasonCheckCount + 1 )
170+ }
171+ }
172+
173+ def deleteExecutorFromClusterAndDataStructures (executorId : String ): Unit = {
174+ disconnectedPodsByExecutorIdPendingRemoval -= executorId
175+ executorReasonCheckAttemptCounts -= executorId
176+ RUNNING_EXECUTOR_PODS_LOCK .synchronized {
177+ runningExecutorsToPods.remove(executorId).map { pod =>
178+ kubernetesClient.pods().delete(pod)
179+ runningPodsToExecutors.remove(pod.getMetadata.getName)
180+ }.getOrElse(logWarning(s " Unable to remove pod for unknown executor $executorId" ))
183181 }
184182 }
185183 }
@@ -214,18 +212,18 @@ private[spark] class KubernetesClusterSchedulerBackend(
214212 .withLabel(SPARK_APP_ID_LABEL , applicationId())
215213 .watch(new ExecutorPodsWatcher ()))
216214
217- allocator .scheduleWithFixedDelay(
218- allocatorRunnable, 0 , podAllocationInterval, TimeUnit .SECONDS )
215+ allocatorExecutor .scheduleWithFixedDelay(
216+ allocatorRunnable, 0L , podAllocationInterval, TimeUnit .SECONDS )
219217 shuffleManager.foreach(_.start(applicationId()))
220218
221- if (! Utils .isDynamicAllocationEnabled(sc. conf)) {
219+ if (! Utils .isDynamicAllocationEnabled(conf)) {
222220 doRequestTotalExecutors(initialExecutors)
223221 }
224222 }
225223
226224 override def stop (): Unit = {
227225 // stop allocation of new resources and caches.
228- allocator .shutdown()
226+ allocatorExecutor .shutdown()
229227 shuffleManager.foreach(_.stop())
230228
231229 // send stop message to executors so they shut down cleanly
@@ -298,7 +296,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
298296 executorId,
299297 applicationId(),
300298 driverUrl,
301- sc. conf.getExecutorEnv,
299+ conf.getExecutorEnv,
302300 driverPod,
303301 nodeToLocalTaskCount)
304302 try {
@@ -318,11 +316,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
318316 override def doKillExecutors (executorIds : Seq [String ]): Future [Boolean ] = Future [Boolean ] {
319317 RUNNING_EXECUTOR_PODS_LOCK .synchronized {
320318 for (executor <- executorIds) {
321- runningExecutorsToPods.remove(executor) match {
322- case Some (pod) =>
323- kubernetesClient.pods().delete(pod)
324- runningPodsToExecutors.remove(pod.getMetadata.getName)
325- case None => logWarning(s " Unable to remove pod for unknown executor $executor" )
319+ val maybeRemovedExecutor = runningExecutorsToPods.remove(executor)
320+ maybeRemovedExecutor.foreach { executorPod =>
321+ kubernetesClient.pods().delete(executorPod)
322+ disconnectedPodsByExecutorIdPendingRemoval(executor) = executorPod
323+ runningPodsToExecutors.remove(executorPod.getMetadata.getName)
324+ }
325+ if (maybeRemovedExecutor.isEmpty) {
326+ logWarning(s " Unable to remove pod for unknown executor $executor" )
326327 }
327328 }
328329 }
@@ -396,10 +397,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
396397 }
397398
398399 def handleErroredPod (pod : Pod ): Unit = {
399- val alreadyReleased = isPodAlreadyReleased(pod)
400400 val containerExitStatus = getExecutorExitStatus(pod)
401401 // container was probably actively killed by the driver.
402- val exitReason = if (alreadyReleased ) {
402+ val exitReason = if (isPodAlreadyReleased(pod) ) {
403403 ExecutorExited (containerExitStatus, exitCausedByApp = false ,
404404 s " Container in pod " + pod.getMetadata.getName +
405405 " exited from explicit termination request." )
@@ -411,17 +411,23 @@ private[spark] class KubernetesClusterSchedulerBackend(
411411 // Here we can't be sure that that exit was caused by the application but this seems
412412 // to be the right default since we know the pod was not explicitly deleted by
413413 // the user.
414- " Pod exited with following container exit status code " + containerExitStatus
414+ s " Pod ${pod.getMetadata.getName}'s executor container exited with exit status " +
415+ s " code $containerExitStatus. "
415416 }
416417 ExecutorExited (containerExitStatus, exitCausedByApp = true , containerExitReason)
417418 }
418- failedPods .put(pod.getMetadata.getName, exitReason)
419+ podsWithKnownExitReasons .put(pod.getMetadata.getName, exitReason)
419420 }
420421
421422 def handleDeletedPod (pod : Pod ): Unit = {
422- val exitReason = ExecutorExited (getExecutorExitStatus(pod), exitCausedByApp = false ,
423- " Pod " + pod.getMetadata.getName + " deleted or lost." )
424- failedPods.put(pod.getMetadata.getName, exitReason)
423+ val exitMessage = if (isPodAlreadyReleased(pod)) {
424+ s " Container in pod ${pod.getMetadata.getName} exited from explicit termination request. "
425+ } else {
426+ s " Pod ${pod.getMetadata.getName} deleted or lost. "
427+ }
428+ val exitReason = ExecutorExited (
429+ getExecutorExitStatus(pod), exitCausedByApp = false , exitMessage)
430+ podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason)
425431 }
426432 }
427433
@@ -433,12 +439,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
433439 rpcEnv : RpcEnv ,
434440 sparkProperties : Seq [(String , String )])
435441 extends DriverEndpoint (rpcEnv, sparkProperties) {
436- private val externalShufflePort = conf.getInt(" spark.shuffle.service.port" , 7337 )
437442
438443 override def onDisconnected (rpcAddress : RpcAddress ): Unit = {
439444 addressToExecutorId.get(rpcAddress).foreach { executorId =>
440445 if (disableExecutor(executorId)) {
441- executorsToRemove.add(executorId)
446+ RUNNING_EXECUTOR_PODS_LOCK .synchronized {
447+ runningExecutorsToPods.get(executorId).foreach { pod =>
448+ disconnectedPodsByExecutorIdPendingRemoval(executorId) = pod
449+ }
450+ }
442451 }
443452 }
444453 }
@@ -448,7 +457,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
448457 new PartialFunction [Any , Unit ]() {
449458 override def isDefinedAt (msg : Any ): Boolean = {
450459 msg match {
451- case RetrieveSparkAppConfig (executorId ) =>
460+ case RetrieveSparkAppConfig (_ ) =>
452461 shuffleManager.isDefined
453462 case _ => false
454463 }
@@ -477,11 +486,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
477486}
478487
479488private object KubernetesClusterSchedulerBackend {
480- private val DEFAULT_STATIC_PORT = 10000
481- private val EXECUTOR_ID_COUNTER = new AtomicLong (0L )
482489 private val VMEM_EXCEEDED_EXIT_CODE = - 103
483490 private val PMEM_EXCEEDED_EXIT_CODE = - 104
484491 private val UNKNOWN_EXIT_CODE = - 111
492+ // Number of times we are allowed check for the loss reason for an executor before we give up
493+ // and assume the executor failed for good, and attribute it to a framework fault.
494+ val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
485495
486496 def memLimitExceededLogMessage (diagnostics : String ): String = {
487497 s " Pod/Container killed for exceeding memory limits. $diagnostics" +
0 commit comments