Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Conversation

@foxish
Copy link
Member

@foxish foxish commented Feb 24, 2017

Fixes #136

@foxish
Copy link
Member Author

foxish commented Feb 24, 2017

This handles executor death as long as spark knows about it (the executor registered with the driver). However, if the executor pod was still pending and killed, the driver doesn't know about it at all.

In the time that the executor is still pending, we want some logic in the driver to watch the executor pods. I think we can make this pod watching generic, since the client-code needs to watch the driver and the driver needs to watch executors. thoughts? @mccheah @ash211

@kimoonkim
Copy link
Member

kimoonkim commented Feb 24, 2017

@foxish FYI, the Travis failed not because of unit tests, but because of style issues:

error file=/home/travis/build/apache-spark-on-k8s/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala message=Use Javadoc style indentation for multiline comments line=262 column=0

[ERROR] Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.8.0:check (default) on project spark-kubernetes_2.11: Failed during scalastyle execution: You have 1 Scalastyle violation(s).


/**
* We assume for now that we should create a replacement executor pod when it is lost.
* TODO: if spark scales down the number of executors (in dynamic allocation mode),
Copy link

@mccheah mccheah Feb 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this case is actually supposed to be handled by CoarseGrainedSchedulerBackend. How does the code there react to a lost executor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YARN scheduler backend uses this pattern. However, I think I should be calling super.OnDisconnected because it does some things like unregister the executor from the block manager and other bookkeeping.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YARN isn't a good precedent because they have a construct of loss reasons, in order to find why the executor disconnected and to change around some of the logic accordingly. For example, when the executor is eliminated because of YARN's preemption, Spark doesn't count tasks that were allocated to that executor towards a count of failures that fail the entire job at a certain threshold.

I think just calling RemoveExecutor should suffice, which is what the default implementation does. I'm surprised that CoarseGrainedSchedulerBackend isn't trying to re-sync the number of active executors to what is desired - I would anticipate this to be said class's job. It might be worth tracing down how the other implementations eventually do this re-sync - I believe YARN mode depends on logic on the Application Master side.

Copy link

@mccheah mccheah Feb 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of how YARN goes about doing it is that it has a maintained target number of executors stored on the ApplicationMaster. YarnSchedulerBackend#doRequestTotalExecutors (code) synchronizes the requested number of executors from the driver up to the ApplicationMaster. The ApplicationMaster periodically runs YarnAllocator#allocateResources (code) which synchronizes the amount of resources (# of executors) required with the YARN resource manager.

The periodic polling "self-corrects" in incidents where executors are lost during the lifetime of an application. We can follow this precedent, but it's not completely unreasonable to handle the situation in our own way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for investigating. Perhaps then we should adopt our own model, and rather than react to executor loss using onDisconnected, we should watch all the executor pods with the kubernetes API and create new executors if it falls below the expected number?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think trying to query the Spark UI that runs on the executors is good enough. @ash211 thoughts?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can check that an executor is individually healthy with a liveness probe on it. But another failure mode is when it gets network partitioned from the driver. It might create a lot of traffic on the driver if every probe check on the executor also verified connectivity to the driver with a ping.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tentatively fine with being inaccurate on that particular failure mode if it means we get to use mostly Kubernetes primitives in an elegant way. I'm not sure if executors detect if they can't reach the driver or if the driver is failing to send messages to the executor, and I don't know if the executors shut themselves down in that case - it would be worth looking into that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of checking pods using k8s API.

On health checks. Executors periodically send heartbeats to the driver so the driver can expire unresponsive executors. (See HeartbeatReceiver code) Maybe we can piggyback on them:

  private def expireDeadHosts(): Unit = {
    logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
    val now = clock.getTimeMillis()
    for ((executorId, lastSeenMs) <- executorLastSeen) {
      if (now - lastSeenMs > executorTimeoutMs) {
        logWarning(s"Removing executor $executorId with no recent heartbeats: " +
          s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
        scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
          s"timed out after ${now - lastSeenMs} ms"))
          // Asynchronously kill the executor to avoid blocking the current thread
        killExecutorThread.submit(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            // Note: we want to get an executor back after expiring this one,
            // so do not simply call `sc.killExecutor` here (SPARK-8119)
            sc.killAndReplaceExecutor(executorId)
          }
        })
        executorLastSeen.remove(executorId)
      }
    }
  }

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@foxish Let's go with a strategy that watches the K8s API and go from there.

*/
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
addressToExecutorId.get(rpcAddress).foreach { executorId =>
if (disableExecutor(executorId)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the onDisconnected method in the super calls removeExecutor not disableExecutor -- should we be using that one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder onDisconnected is called also when an executor JVM crashed inside a running pod. (Seems this is the caller code):

          case RemoteProcessDisconnected(remoteAddress) =>
            endpoint.onDisconnected(remoteAddress)

We may not want to allocate a new pod in such a case because the JVM will get restarted. i.e. We may want to allocate a new pod only when we lost a pod.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to be consistent everywhere and always operate on the pod level as opposed to the container level. So if the container JVM exits we should immediately shut down the whole pod and allocate a new one.

@lins05
Copy link

lins05 commented Mar 9, 2017

We don't specify the restart policy for executor pods, so it defaults to "always" (according to k8s api doc here). When I test this patch with spark.executor.instances=2, I found that after I killed an executor, soon I would see three executors on the driver web ui.

The questions is: do we really need to take care of allocating a new executor when losing one, or we can just let k8s restart it (maybe adding a liveness probe) ?

@mccheah
Copy link

mccheah commented Mar 9, 2017

I think there's trickiness if the container dies but the pod doesn't - is the container restarted in that case?

@foxish
Copy link
Member Author

foxish commented Mar 9, 2017

I think there's trickiness if the container dies but the pod doesn't - is the container restarted in that case?

Yes, the container will be restarted in-place.

@mccheah
Copy link

mccheah commented Mar 9, 2017

Oh, but @lins05 if an executor disconnects and then re-connects with the same executor ID, I'm not sure if Spark handles that well.

@lins05
Copy link

lins05 commented Mar 10, 2017

Oh, but @lins05 if an executor disconnects and then re-connects with the same executor ID, I'm not sure if Spark handles that well.

Good point. Seems we need to set the default restart policy to never and manage the lost in the k8s backend.

@kimoonkim
Copy link
Member

kimoonkim commented Mar 10, 2017

Oh, but @lins05 if an executor disconnects and then re-connects with the same executor ID, I'm not sure if Spark handles that well.

Good point. Seems we need to set the default restart policy to never and manage the lost in the k8s backend.

I could be wrong here, but I think Spark handles this, or at least try to handle. The base class CoarseGrainedSchedulerBackend(code) removes the dead executor inside onDisconnected, which will delete the executor ID from a map of live executors. When the executor JVM restarts, it will send the register request to the driver again, and the request will be accepted as long as the live executor map is missing the executor ID.

I think we should still consider reusing this existing mechanism. Note this PR is calling disableExecutor inside onDisconnected, which might have broken this mechanism.

Here's onDisconnected calling removeExecutor:

   override def onDisconnected(remoteAddress: RpcAddress): Unit = {
      addressToExecutorId
        .get(remoteAddress)
        .foreach(removeExecutor(_, SlaveLost("Remote RPC client disassociated. Likely due to " +
          "containers exceeding thresholds, or network issues. Check driver logs for WARN " +
          "messages.")))
    }

Here's removeExecutor deleting the executor ID from the live executor map. Note executorDataMap -= executorId below:

    // Remove a disconnected slave from the cluster
    private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
      logDebug(s"Asked to remove executor $executorId with reason $reason")
      executorDataMap.get(executorId) match {
        case Some(executorInfo) =>
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          val killed = CoarseGrainedSchedulerBackend.this.synchronized {
            addressToExecutorId -= executorInfo.executorAddress
            executorDataMap -= executorId
            executorsPendingLossReason -= executorId
            executorsPendingToRemove.remove(executorId).getOrElse(false)
          }
          totalCoreCount.addAndGet(-executorInfo.totalCores)
          totalRegisteredExecutors.addAndGet(-1)
          scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason)
          listenerBus.post(
            SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))

Here's RegisterExecutor handler checking on the executorDataMap. Hopefully, the restarted executor will go to the else part:

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
        if (executorDataMap.contains(executorId)) {
          executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
          context.reply(true)
        } else {
          // If the executor's rpc env is not listening for incoming connections, `hostPort`
          // will be null, and the client connection should be used to contact the executor.
          val executorAddress = if (executorRef.address != null) {
              executorRef.address
            } else {
              context.senderAddress
            }
          logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")

@kimoonkim
Copy link
Member

Another thing I noticed is that Spark core tries to tell whether an executor died because of bad user code or a cluster operation like preemption. In the former case, Spark would want to abort the job reasonably fast so it doesn't waste cluster resources. In the latter, it's no fault of the job so it would try to continue the job's execution as long as possible. It seems it's important not to mistake one for another.

Here is ExecutorLossReason.scala (code) which has three subclasses. Notice ExecutorExited has a boolean flag exitCausedByApp. Also notice there is LossReasonPending. It seems YARN code uses it as the temporary state before the driver figures out what happened to the executor by talking to YARN AM/scheduler (disableExecutor uses this) :

/**
 * Represents an explanation for an executor or whole slave failing or exiting.
 */
private[spark]
class ExecutorLossReason(val message: String) extends Serializable {
  override def toString: String = message
}

...
private[spark]
case class ExecutorExited(exitCode: Int, exitCausedByApp: Boolean, reason: String)
  extends ExecutorLossReason(reason)
...

private[spark] object ExecutorKilled extends ExecutorLossReason("Executor killed by driver.")

...
/**
 * A loss reason that means we don't yet know why the executor exited.
 *
 * This is used by the task scheduler to remove state associated with the executor, but
 * not yet fail any tasks that were running in the executor before the real loss reason
 * is known.
 */
private [spark] object LossReasonPending extends ExecutorLossReason("Pending loss reason.")

@ash211
Copy link

ash211 commented Mar 16, 2017

Please rebase onto branch-2.1-kubernetes and send this PR into that branch instead of k8s-support-alternate-incremental which is now deprecated.

@foxish
Copy link
Member Author

foxish commented Apr 27, 2017

Closing in favor of #244

@foxish foxish closed this Apr 27, 2017
@foxish foxish deleted the executor-loss branch July 25, 2017 00:47
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants