-
Notifications
You must be signed in to change notification settings - Fork 117
Handle executor loss during job execution #151
Conversation
|
This handles executor death as long as spark knows about it (the executor registered with the driver). However, if the executor pod was still pending and killed, the driver doesn't know about it at all. In the time that the executor is still pending, we want some logic in the driver to watch the executor pods. I think we can make this pod watching generic, since the client-code needs to watch the driver and the driver needs to watch executors. thoughts? @mccheah @ash211 |
|
@foxish FYI, the Travis failed not because of unit tests, but because of style issues:
|
|
|
||
| /** | ||
| * We assume for now that we should create a replacement executor pod when it is lost. | ||
| * TODO: if spark scales down the number of executors (in dynamic allocation mode), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this case is actually supposed to be handled by CoarseGrainedSchedulerBackend. How does the code there react to a lost executor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
YARN scheduler backend uses this pattern. However, I think I should be calling super.OnDisconnected because it does some things like unregister the executor from the block manager and other bookkeeping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
YARN isn't a good precedent because they have a construct of loss reasons, in order to find why the executor disconnected and to change around some of the logic accordingly. For example, when the executor is eliminated because of YARN's preemption, Spark doesn't count tasks that were allocated to that executor towards a count of failures that fail the entire job at a certain threshold.
I think just calling RemoveExecutor should suffice, which is what the default implementation does. I'm surprised that CoarseGrainedSchedulerBackend isn't trying to re-sync the number of active executors to what is desired - I would anticipate this to be said class's job. It might be worth tracing down how the other implementations eventually do this re-sync - I believe YARN mode depends on logic on the Application Master side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of how YARN goes about doing it is that it has a maintained target number of executors stored on the ApplicationMaster. YarnSchedulerBackend#doRequestTotalExecutors (code) synchronizes the requested number of executors from the driver up to the ApplicationMaster. The ApplicationMaster periodically runs YarnAllocator#allocateResources (code) which synchronizes the amount of resources (# of executors) required with the YARN resource manager.
The periodic polling "self-corrects" in incidents where executors are lost during the lifetime of an application. We can follow this precedent, but it's not completely unreasonable to handle the situation in our own way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for investigating. Perhaps then we should adopt our own model, and rather than react to executor loss using onDisconnected, we should watch all the executor pods with the kubernetes API and create new executors if it falls below the expected number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think trying to query the Spark UI that runs on the executors is good enough. @ash211 thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can check that an executor is individually healthy with a liveness probe on it. But another failure mode is when it gets network partitioned from the driver. It might create a lot of traffic on the driver if every probe check on the executor also verified connectivity to the driver with a ping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm tentatively fine with being inaccurate on that particular failure mode if it means we get to use mostly Kubernetes primitives in an elegant way. I'm not sure if executors detect if they can't reach the driver or if the driver is failing to send messages to the executor, and I don't know if the executors shut themselves down in that case - it would be worth looking into that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of checking pods using k8s API.
On health checks. Executors periodically send heartbeats to the driver so the driver can expire unresponsive executors. (See HeartbeatReceiver code) Maybe we can piggyback on them:
private def expireDeadHosts(): Unit = {
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
val now = clock.getTimeMillis()
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (now - lastSeenMs > executorTimeoutMs) {
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
s"timed out after ${now - lastSeenMs} ms"))
// Asynchronously kill the executor to avoid blocking the current thread
killExecutorThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
// Note: we want to get an executor back after expiring this one,
// so do not simply call `sc.killExecutor` here (SPARK-8119)
sc.killAndReplaceExecutor(executorId)
}
})
executorLastSeen.remove(executorId)
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@foxish Let's go with a strategy that watches the K8s API and go from there.
| */ | ||
| override def onDisconnected(rpcAddress: RpcAddress): Unit = { | ||
| addressToExecutorId.get(rpcAddress).foreach { executorId => | ||
| if (disableExecutor(executorId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the onDisconnected method in the super calls removeExecutor not disableExecutor -- should we be using that one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder onDisconnected is called also when an executor JVM crashed inside a running pod. (Seems this is the caller code):
case RemoteProcessDisconnected(remoteAddress) =>
endpoint.onDisconnected(remoteAddress)
We may not want to allocate a new pod in such a case because the JVM will get restarted. i.e. We may want to allocate a new pod only when we lost a pod.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to be consistent everywhere and always operate on the pod level as opposed to the container level. So if the container JVM exits we should immediately shut down the whole pod and allocate a new one.
|
We don't specify the restart policy for executor pods, so it defaults to "always" (according to k8s api doc here). When I test this patch with The questions is: do we really need to take care of allocating a new executor when losing one, or we can just let k8s restart it (maybe adding a liveness probe) ? |
|
I think there's trickiness if the container dies but the pod doesn't - is the container restarted in that case? |
Yes, the container will be restarted in-place. |
|
Oh, but @lins05 if an executor disconnects and then re-connects with the same executor ID, I'm not sure if Spark handles that well. |
Good point. Seems we need to set the default restart policy to never and manage the lost in the k8s backend. |
I could be wrong here, but I think Spark handles this, or at least try to handle. The base class I think we should still consider reusing this existing mechanism. Note this PR is calling Here's Here's Here's |
|
Another thing I noticed is that Spark core tries to tell whether an executor died because of bad user code or a cluster operation like preemption. In the former case, Spark would want to abort the job reasonably fast so it doesn't waste cluster resources. In the latter, it's no fault of the job so it would try to continue the job's execution as long as possible. It seems it's important not to mistake one for another. Here is |
|
Please rebase onto |
|
Closing in favor of #244 |
Fixes #136