@@ -18,18 +18,20 @@ package org.apache.spark.scheduler.cluster.kubernetes
1818
1919import  java .io .Closeable 
2020import  java .net .InetAddress 
21- import  java .util .concurrent .TimeUnit 
21+ import  java .util .Collections 
22+ import  java .util .concurrent .{ConcurrentHashMap , TimeUnit }
2223import  java .util .concurrent .atomic .{AtomicInteger , AtomicLong , AtomicReference }
2324
25+ import  scala .collection .{concurrent , mutable }
26+ import  scala .collection .JavaConverters ._ 
27+ import  scala .concurrent .{ExecutionContext , Future }
28+ 
2429import  com .fasterxml .jackson .databind .ObjectMapper 
2530import  com .fasterxml .jackson .module .scala .DefaultScalaModule 
26- import  io .fabric8 .kubernetes .api .model .{ ContainerBuilder ,  ContainerPortBuilder ,  EnvVarBuilder ,  EnvVarSourceBuilder ,  Pod ,  PodBuilder ,  QuantityBuilder } 
31+ import  io .fabric8 .kubernetes .api .model ._ 
2732import  io .fabric8 .kubernetes .client .{KubernetesClient , KubernetesClientException , Watcher }
2833import  io .fabric8 .kubernetes .client .Watcher .Action 
2934import  org .apache .commons .io .FilenameUtils 
30- import  scala .collection .JavaConverters ._ 
31- import  scala .collection .mutable 
32- import  scala .concurrent .{ExecutionContext , Future }
3335
3436import  org .apache .spark .{SparkContext , SparkEnv , SparkException }
3537import  org .apache .spark .deploy .kubernetes .{ConfigurationUtils , InitContainerResourceStagingServerSecretPlugin , PodWithDetachedInitContainer , SparkPodInitContainerBootstrap }
@@ -38,8 +40,8 @@ import org.apache.spark.deploy.kubernetes.constants._
3840import  org .apache .spark .deploy .kubernetes .submit .InitContainerUtil 
3941import  org .apache .spark .network .netty .SparkTransportConf 
4042import  org .apache .spark .network .shuffle .kubernetes .KubernetesExternalShuffleClient 
41- import  org .apache .spark .rpc .{RpcCallContext , RpcEndpointAddress , RpcEnv }
42- import  org .apache .spark .scheduler .TaskSchedulerImpl 
43+ import  org .apache .spark .rpc .{RpcAddress ,  RpcCallContext , RpcEndpointAddress , RpcEnv }
44+ import  org .apache .spark .scheduler .{ ExecutorExited ,  SlaveLost ,  TaskSchedulerImpl } 
4345import  org .apache .spark .scheduler .cluster .CoarseGrainedClusterMessages .{RetrieveSparkAppConfig , SparkAppConfig }
4446import  org .apache .spark .scheduler .cluster .CoarseGrainedSchedulerBackend 
4547import  org .apache .spark .util .{ThreadUtils , Utils }
@@ -55,10 +57,18 @@ private[spark] class KubernetesClusterSchedulerBackend(
5557  import  KubernetesClusterSchedulerBackend ._ 
5658
5759  private  val  RUNNING_EXECUTOR_PODS_LOCK  =  new  Object 
58-   private  val  runningExecutorPods  =  new  mutable.HashMap [String , Pod ] //  Indexed by executor IDs.
59- 
60+   //  Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
61+   private  val  runningExecutorsToPods  =  new  mutable.HashMap [String , Pod ]
62+   //  Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK.
63+   private  val  runningPodsToExecutors  =  new  mutable.HashMap [String , String ]
64+   //  TODO(varun): Get rid of this lock object by my making the underlying map a concurrent hash map.
6065  private  val  EXECUTOR_PODS_BY_IPS_LOCK  =  new  Object 
61-   private  val  executorPodsByIPs  =  new  mutable.HashMap [String , Pod ] //  Indexed by executor IP addrs.
66+   //  Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK
67+   private  val  executorPodsByIPs  =  new  mutable.HashMap [String , Pod ]
68+   private  val  failedPods :  concurrent.Map [String , ExecutorExited ] =  new 
69+       ConcurrentHashMap [String , ExecutorExited ]().asScala
70+   private  val  executorsToRemove  =  Collections .newSetFromMap[String ](
71+     new  ConcurrentHashMap [String , java.lang.Boolean ]()).asScala
6272
6373  private  val  executorExtraClasspath  =  conf.get(
6474    org.apache.spark.internal.config.EXECUTOR_CLASS_PATH )
@@ -135,7 +145,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
135145      val  parsedShuffleLabels  =  ConfigurationUtils .parseKeyValuePairs(
136146        conf.get(KUBERNETES_SHUFFLE_LABELS ), KUBERNETES_SHUFFLE_LABELS .key,
137147            " shuffle-labels"  )
138-       if  (parsedShuffleLabels.size  ==   0 ) {
148+       if  (parsedShuffleLabels.isEmpty ) {
139149        throw  new  SparkException (s " Dynamic allocation enabled  "  + 
140150          s " but no  ${KUBERNETES_SHUFFLE_LABELS .key} specified " )
141151      }
@@ -170,12 +180,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
170180  private  val  executorWatchResource  =  new  AtomicReference [Closeable ]
171181  protected  var  totalExpectedExecutors  =  new  AtomicInteger (0 )
172182
183+ 
173184  private  val  driverUrl  =  RpcEndpointAddress (
174185    sc.getConf.get(" spark.driver.host"  ),
175186    sc.getConf.getInt(" spark.driver.port"  , DEFAULT_DRIVER_PORT ),
176187    CoarseGrainedSchedulerBackend .ENDPOINT_NAME ).toString
177188
178-   private  val  initialExecutors  =  getInitialTargetExecutorNumber(1 )
189+   private  val  initialExecutors  =  getInitialTargetExecutorNumber()
179190
180191  private  val  podAllocationInterval  =  conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY )
181192  require(podAllocationInterval >  0 , s " Allocation batch delay  "  + 
@@ -192,23 +203,74 @@ private[spark] class KubernetesClusterSchedulerBackend(
192203
193204  private  val  allocatorRunnable :  Runnable  =  new  Runnable  {
194205
206+     //  Number of times we are allowed check for the loss reason for an executor before we give up
207+     //  and assume the executor failed for good, and attribute it to a framework fault.
208+     private  val  MAX_EXECUTOR_LOST_REASON_CHECKS  =  10 
209+     private  val  executorsToRecover  =  new  mutable.HashSet [String ]
210+     //  Maintains a map of executor id to count of checks performed to learn the loss reason
211+     //  for an executor.
212+     private  val  executorReasonChecks  =  new  mutable.HashMap [String , Int ]
213+ 
195214    override  def  run ():  Unit  =  {
196-       if  (totalRegisteredExecutors.get() <  runningExecutorPods.size) {
197-         logDebug(" Waiting for pending executors before scaling"  )
198-       } else  if  (totalExpectedExecutors.get() <=  runningExecutorPods.size) {
199-         logDebug(" Maximum allowed executor limit reached. Not scaling up further."  )
200-       } else  {
201-         val  nodeToLocalTaskCount  =  getNodesWithLocalTaskCounts
202-         RUNNING_EXECUTOR_PODS_LOCK .synchronized  {
215+       removeFailedExecutors()
216+       RUNNING_EXECUTOR_PODS_LOCK .synchronized  {
217+         if  (totalRegisteredExecutors.get() <  runningExecutorsToPods.size) {
218+           logDebug(" Waiting for pending executors before scaling"  )
219+         } else  if  (totalExpectedExecutors.get() <=  runningExecutorsToPods.size) {
220+           logDebug(" Maximum allowed executor limit reached. Not scaling up further."  )
221+         } else  {
222+           val  nodeToLocalTaskCount  =  getNodesWithLocalTaskCounts
203223          for  (i <-  0  until math.min(
204-             totalExpectedExecutors.get -  runningExecutorPods.size, podAllocationSize)) {
205-             runningExecutorPods +=  allocateNewExecutorPod(nodeToLocalTaskCount)
224+             totalExpectedExecutors.get -  runningExecutorsToPods.size, podAllocationSize)) {
225+             val  (executorId, pod) =  allocateNewExecutorPod(nodeToLocalTaskCount)
226+             runningExecutorsToPods.put(executorId, pod)
227+             runningPodsToExecutors.put(pod.getMetadata.getName, executorId)
206228            logInfo(
207-               s " Requesting a new executor, total executors is now  ${runningExecutorPods .size}" )
229+               s " Requesting a new executor, total executors is now  ${runningExecutorsToPods .size}" )
208230          }
209231        }
210232      }
211233    }
234+ 
235+     def  removeFailedExecutors ():  Unit  =  {
236+       val  localRunningExecutorsToPods  =  RUNNING_EXECUTOR_PODS_LOCK .synchronized  {
237+         runningExecutorsToPods.toMap
238+       }
239+       executorsToRemove.foreach { case  (executorId) => 
240+         localRunningExecutorsToPods.get(executorId).map { pod : Pod  => 
241+           failedPods.get(pod.getMetadata.getName).map { executorExited : ExecutorExited  => 
242+             logDebug(s " Removing executor  $executorId with loss reason  "  +  executorExited.message)
243+             removeExecutor(executorId, executorExited)
244+             if  (! executorExited.exitCausedByApp) {
245+               executorsToRecover.add(executorId)
246+             }
247+           }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))
248+         }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))
249+ 
250+         executorsToRecover.foreach(executorId =>  {
251+           executorsToRemove -=  executorId
252+           executorReasonChecks -=  executorId
253+           RUNNING_EXECUTOR_PODS_LOCK .synchronized  {
254+             runningExecutorsToPods.remove(executorId).map { pod : Pod  => 
255+               kubernetesClient.pods().delete(pod)
256+               runningPodsToExecutors.remove(pod.getMetadata.getName)
257+             }.getOrElse(logWarning(s " Unable to remove pod for unknown executor  $executorId" ))
258+           }
259+         })
260+         executorsToRecover.clear()
261+       }
262+     }
263+ 
264+     def  removeExecutorOrIncrementLossReasonCheckCount (executorId : String ):  Unit  =  {
265+       val  reasonCheckCount  =  executorReasonChecks.getOrElse(executorId, 0 )
266+       if  (reasonCheckCount >  MAX_EXECUTOR_LOST_REASON_CHECKS ) {
267+         removeExecutor(executorId, SlaveLost (" Executor lost for unknown reasons"  ))
268+         executorsToRecover.add(executorId)
269+         executorReasonChecks -=  executorId
270+       } else  {
271+         executorReasonChecks.put(executorId, reasonCheckCount +  1 )
272+       }
273+     }
212274  }
213275
214276  private  val  objectMapper  =  new  ObjectMapper ().registerModule(DefaultScalaModule )
@@ -280,8 +342,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
280342    //  indication as to why.
281343    try  {
282344      RUNNING_EXECUTOR_PODS_LOCK .synchronized  {
283-         runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_))
284-         runningExecutorPods.clear()
345+         runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_))
346+         runningExecutorsToPods.clear()
347+         runningPodsToExecutors.clear()
285348      }
286349      EXECUTOR_PODS_BY_IPS_LOCK .synchronized  {
287350        executorPodsByIPs.clear()
@@ -534,11 +597,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
534597    }
535598  }
536599
537-   override  def  createDriverEndpoint (
538-     properties : Seq [(String , String )]):  DriverEndpoint  =  {
539-     new  KubernetesDriverEndpoint (rpcEnv, properties)
540-   }
541- 
542600  override  def  doRequestTotalExecutors (requestedTotal : Int ):  Future [Boolean ] =  Future [Boolean ] {
543601    totalExpectedExecutors.set(requestedTotal)
544602    true 
@@ -547,8 +605,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
547605  override  def  doKillExecutors (executorIds : Seq [String ]):  Future [Boolean ] =  Future [Boolean ] {
548606    RUNNING_EXECUTOR_PODS_LOCK .synchronized  {
549607      for  (executor <-  executorIds) {
550-         runningExecutorPods.remove(executor) match  {
551-           case  Some (pod) =>  kubernetesClient.pods().delete(pod)
608+         runningExecutorsToPods.remove(executor) match  {
609+           case  Some (pod) => 
610+             kubernetesClient.pods().delete(pod)
611+             runningPodsToExecutors.remove(pod.getMetadata.getName)
552612          case  None  =>  logWarning(s " Unable to remove pod for unknown executor  $executor" )
553613        }
554614      }
@@ -564,6 +624,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
564624
565625  private  class  ExecutorPodsWatcher  extends  Watcher [Pod ] {
566626
627+     private  val  DEFAULT_CONTAINER_FAILURE_EXIT_STATUS  =  - 1 
628+ 
567629    override  def  eventReceived (action : Action , pod : Pod ):  Unit  =  {
568630      if  (action ==  Action .MODIFIED  &&  pod.getStatus.getPhase ==  " Running" 
569631          &&  pod.getMetadata.getDeletionTimestamp ==  null ) {
@@ -583,12 +645,75 @@ private[spark] class KubernetesClusterSchedulerBackend(
583645            executorPodsByIPs -=  podIP
584646          }
585647        }
648+         if  (action ==  Action .ERROR ) {
649+           logInfo(s " Received pod  $podName exited event. Reason:  "  +  pod.getStatus.getReason)
650+           handleErroredPod(pod)
651+         } else  if  (action ==  Action .DELETED ) {
652+           logInfo(s " Received delete pod  $podName event. Reason:  "  +  pod.getStatus.getReason)
653+           handleDeletedPod(pod)
654+         }
586655      }
587656    }
588657
589658    override  def  onClose (cause : KubernetesClientException ):  Unit  =  {
590659      logDebug(" Executor pod watch closed."  , cause)
591660    }
661+ 
662+     def  getExecutorExitStatus (pod : Pod ):  Int  =  {
663+       val  containerStatuses  =  pod.getStatus.getContainerStatuses
664+       if  (! containerStatuses.isEmpty) {
665+         //  we assume the first container represents the pod status. This assumption may not hold
666+         //  true in the future. Revisit this if side-car containers start running inside executor
667+         //  pods.
668+         getExecutorExitStatus(containerStatuses.get(0 ))
669+       } else  DEFAULT_CONTAINER_FAILURE_EXIT_STATUS 
670+     }
671+ 
672+     def  getExecutorExitStatus (containerStatus : ContainerStatus ):  Int  =  {
673+       Option (containerStatus.getState).map(containerState => 
674+         Option (containerState.getTerminated).map(containerStateTerminated => 
675+           containerStateTerminated.getExitCode.intValue()).getOrElse(UNKNOWN_EXIT_CODE )
676+       ).getOrElse(UNKNOWN_EXIT_CODE )
677+     }
678+ 
679+     def  isPodAlreadyReleased (pod : Pod ):  Boolean  =  {
680+       RUNNING_EXECUTOR_PODS_LOCK .synchronized  {
681+         ! runningPodsToExecutors.contains(pod.getMetadata.getName)
682+       }
683+     }
684+ 
685+     def  handleErroredPod (pod : Pod ):  Unit  =  {
686+       val  alreadyReleased  =  isPodAlreadyReleased(pod)
687+       val  containerExitStatus  =  getExecutorExitStatus(pod)
688+       //  container was probably actively killed by the driver.
689+       val  exitReason  =  if  (alreadyReleased) {
690+           ExecutorExited (containerExitStatus, exitCausedByApp =  false ,
691+             s " Container in pod  "  +  pod.getMetadata.getName + 
692+               "  exited from explicit termination request."  )
693+         } else  {
694+           val  containerExitReason  =  containerExitStatus match  {
695+             case  VMEM_EXCEEDED_EXIT_CODE  |  PMEM_EXCEEDED_EXIT_CODE  => 
696+               memLimitExceededLogMessage(pod.getStatus.getReason)
697+             case  _ => 
698+               //  Here we can't be sure that that exit was caused by the application but this seems
699+               //  to be the right default since we know the pod was not explicitly deleted by
700+               //  the user.
701+               " Pod exited with following container exit status code "   +  containerExitStatus
702+           }
703+           ExecutorExited (containerExitStatus, exitCausedByApp =  true , containerExitReason)
704+         }
705+         failedPods.put(pod.getMetadata.getName, exitReason)
706+     }
707+ 
708+     def  handleDeletedPod (pod : Pod ):  Unit  =  {
709+       val  exitReason  =  ExecutorExited (getExecutorExitStatus(pod), exitCausedByApp =  false ,
710+         " Pod "   +  pod.getMetadata.getName +  "  deleted or lost."  )
711+         failedPods.put(pod.getMetadata.getName, exitReason)
712+     }
713+   }
714+ 
715+   override  def  createDriverEndpoint (properties : Seq [(String , String )]):  DriverEndpoint  =  {
716+     new  KubernetesDriverEndpoint (rpcEnv, properties)
592717  }
593718
594719  private  class  KubernetesDriverEndpoint (
@@ -597,6 +722,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
597722    extends  DriverEndpoint (rpcEnv, sparkProperties) {
598723    private  val  externalShufflePort  =  conf.getInt(" spark.shuffle.service.port"  , 7337 )
599724
725+     override  def  onDisconnected (rpcAddress : RpcAddress ):  Unit  =  {
726+       addressToExecutorId.get(rpcAddress).foreach { executorId => 
727+         if  (disableExecutor(executorId)) {
728+             executorsToRemove.add(executorId)
729+         }
730+       }
731+     }
732+ 
600733    override  def  receiveAndReply (
601734      context : RpcCallContext ):  PartialFunction [Any , Unit ] =  {
602735      new  PartialFunction [Any , Unit ]() {
@@ -615,7 +748,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
615748                var  resolvedProperties  =  sparkProperties
616749                val  runningExecutorPod  =  kubernetesClient
617750                  .pods()
618-                   .withName(runningExecutorPods (executorId).getMetadata.getName)
751+                   .withName(runningExecutorsToPods (executorId).getMetadata.getName)
619752                  .get()
620753                val  nodeName  =  runningExecutorPod.getSpec.getNodeName
621754                val  shufflePodIp  =  shufflePodCache.get.getShufflePodForExecutor(nodeName)
@@ -637,7 +770,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
637770      }.orElse(super .receiveAndReply(context))
638771    }
639772  }
640- 
641773}
642774case  class  ShuffleServiceConfig (
643775    shuffleNamespace : String ,
@@ -647,6 +779,14 @@ case class ShuffleServiceConfig(
647779private  object  KubernetesClusterSchedulerBackend  {
648780  private  val  DEFAULT_STATIC_PORT  =  10000 
649781  private  val  EXECUTOR_ID_COUNTER  =  new  AtomicLong (0L )
782+   private  val  VMEM_EXCEEDED_EXIT_CODE  =  - 103 
783+   private  val  PMEM_EXCEEDED_EXIT_CODE  =  - 104 
784+   private  val  UNKNOWN_EXIT_CODE  =  - 111 
785+ 
786+   def  memLimitExceededLogMessage (diagnostics : String ):  String  =  {
787+     s " Pod/Container killed for exceeding memory limits.  $diagnostics"  + 
788+       "  Consider boosting spark executor memory overhead." 
789+   }
650790}
651791
652792/** 
0 commit comments