- 
                Notifications
    You must be signed in to change notification settings 
- Fork 28.9k
[SPARK-4751] Dynamic allocation in standalone mode #7532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @andrewor14 , maybe I'm a bit confused here, we add the workers to blacklist to avoid launching the executors immediately after killing them (https://github.com/apache/spark/pull/7532/files#diff-29dffdccd5a7f4c8b496c293e87c8668R836) ....but it seems that, if we run one executor per worker, this rule does not apply?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean when blacklistedWorkers.length <= numMissingExecutors ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it's OK for numMissingExecutors > blacklistedWorkers. In that case we just remove everything from the blacklist. Imagine the case where we have 5 workers (each has 1 executor), then we kill two executors and add their workers to the blacklist. Now if I request 10 extra executors, then 10 > 2 and everything from my blacklist is removed such that we launch executors again, which is what I want.
Does that answer your question?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for answering, but if so, why we have to blacklist the workers? Upon I request more resources, the just-released workers will be taken again...
my second thought is that even numMissingExecutors > blacklistedWorkers, can we ensure that the numMissingExecutors executor IDs we take from blacklist are the earliest ones we push into the HashSet? (I think no, since it's a HashSet)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for answering, but if so, why we have to blacklist the workers? Upon I request more resources, the just-released workers will be taken again...
Maybe it's not clear in the comments. There are two kinds of applications, ones that set spark.executor.cores and ones that didn't. For the applications that didn't set it, each executor just grabs all the available cores on the worker, so each worker can run at most 1 executor.
In this mode, whenever we kill an executor we blacklist its corresponding worker to avoid immediately launching a new executor on that worker. However, there are cases where we do want a new executor to be launched after we kill an executor. For example, if we had previously requested 5 total executors, but there is only space for 3, then 2 executors will be waiting. Then, if we kill 2 existing executors, 2 new ones (the ones that were waiting) should come back up immediately.
This is a little hard to describe concisely. Let me know if that makes sense.
my second thought is that even numMissingExecutors > blacklistedWorkers, can we ensure that the numMissingExecutors executor IDs we take from blacklist are the earliest ones we push into the HashSet? (I think no, since it's a HashSet)
I don't think there are benefits to imposing an ordering on the workers we remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so blacklist is just for filtering the workers in the next scheduling rounds until we do request more resources. I thought its semantics is related to "unlike this worker"
Everything is clear now! Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly. I've added many more paragraphs of comments in the latest commit to clarify this. Thanks for pointing out the potential source of confusion.
| Test build #37860 has finished for   PR 7532 at commit  
 | 
| Test build #37862 has finished for   PR 7532 at commit  
 | 
This adds two messages between the AppClient on the driver and the standalone Master: request and kill. The scheduling on the Master side handles both applications that explicitly set `spark.executor.cores` and those that did not. TODO: clean up shuffle files on application exit and unit tests.
The blacklisting logic is fairly intricate, but there was not really high level documentation describing why the blacklist is needed. This commit addresses that.
0250a03    to
    1334e9a      
    Compare
  
    | Test build #37863 has finished for   PR 7532 at commit  
 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think it doesn't need to be both a var and a mutable collection. You could (with minimal changes) make this an immutable.HashSet.
My 2c.
| Hey, @andrewor14 , this patch is a very useful, especially for the users like me, who runs Spark in standalone mode I left more comments here, most for the outdated comments in SparkContext which says the dynamic allocations are only support in YARN mode the other question is that from the code, I think what will happen if  if so...I'm questioning that if it is a good way to manage the resources, the application has to release the resources by explicitly calling sc.killExecutors()? However...killExecutors is pretty hard to use, because I need to get the exact IDs of executors....can we bring a new API like  | 
Conflicts: core/src/main/scala/org/apache/spark/deploy/master/Master.scala project/MimaExcludes.scala
This commit significantly simplifies the logic by using the executor limit as a cap. Instead of a completely separate code path that added and removed workers from blacklist, reusing the executor limit allows us to reason about the behavior of dynamic allocation more straightforwardly.
| Test build #38499 has finished for   PR 7532 at commit  
 | 
This commit also uncovers a bug where the limit is not applied correctly for cases where cores per executor is not defined. The symptom is that every executor launched under this limit has exactly 1 core. There is a regression test for this case now.
| 
 I don't understand your question. Are you asking what happens if I request more than what my cluster can support? Spark expects the cluster manager to eventually fulfill the request, so once space frees up the Master will launch executors that could not be launched before. 
 That is completely orthogonal to this patch. We can add that in a separate issue if there is demand for it. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it cleaner to declare one variable (coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)) instead of two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, I thought we used coresPerExecutor down there, but maybe not in the latest code. If not we can merge these two.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you missed this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually in the latest code we do use coresPerExecutor in 2 places. This saves us from duplicating app.desc.coresPerExecutor twice. (minor point)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see
| Hey @andrewor14, the scheduling logic and unit tests look good. Would it make sense to add a test for the dynamic allocation behavior? | 
| @vanzin I just looked at the code again and there doesn't seem to be a great way to separate it out. This is because, in the "one executor per worker" case, we actually need to keep scheduling cores on the existing executors even when we're already at the limit. E.g. if the limit is 3 and we have already scheduled 1 core on each worker (1, 1, 1), we should keep scheduling even though we already "have" 3 executors. I think the logic there is inherently complex because we have 2 different cases. Instead, I updated the comment and variable placement to make this easier to read. | 
| Test build #39276 has finished for   PR 7532 at commit  
 | 
| 
 I'm not sure I completely understand all the code here, but it seems to me like this case would be covered by just ignoring the requested number of cores in the "one executor per worker" case, and requesting as many cores as the worker has available. | 
| But, regardless, don't hold off the patch because of that. | 
This commit is surprisingly difficult because of the complex interactions between various components in the cluster. Some modifications to the code were necessary to ensure a strict ordering of events that the test can assume. This test uncovered one race condition! If we send an add request immediately after a kill request, we may not get an executor back because we never released the cores in the ApplicationInfo. Only until after the executor is actually killed by the worker do we release these cores. This is now fixed with a regression test.
| 
 We can't ignore the requested number of cores because we need to support  | 
| Alright, as of the latest commit I've added end-to-end tests in addition to the existing unit tests per @nishkamravi2's suggestion. Given the extensive test coverage and my manual testing I'm am fairly confident about the correctness in this patch. Thanks everyone for the reviews. @CodingCat @dragos @vanzin @nishkamravi2 | 
| Test build #39338 has finished for   PR 7532 at commit  
 | 
| Test build #1267 has finished for   PR 7532 at commit  
 | 
| retest this please | 
| Test build #1270 has finished for   PR 7532 at commit  
 | 
| Test build #1268 has finished for   PR 7532 at commit  
 | 
| Test build #1271 has finished for   PR 7532 at commit  
 | 
| Test build #39349 has finished for   PR 7532 at commit  
 | 
| Alright, merging into master. After 3 versions dynamic allocation is officially supported on all cluster managers. Woohoo!! | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be logged at DEBUG level ?
| Been testing this merged PR specifically and it looks great so far :). One thing I noticed however is that the TaskSchedulerImpl throws and error once an executor has been lost, even if that's the expected behaviour of using Dynamic Allocation. Would it make more sense to produce a warning as opposed to error in the scenario that the executor is dropped as a result of executor idle timeout? | 
| Hi @mkhaitman definitely. There's an outstanding JIRA for this SPARK-4134. This will be addressed in the future hopefully by 1.6.0. | 
Dynamic allocation is a feature that allows a Spark application to scale the number of executors up and down dynamically based on the workload. Support was first introduced in YARN since 1.2, and then extended to Mesos coarse-grained mode recently. Today, it is finally supported in standalone mode as well!
I tested this locally and it works as expected. This is WIP because unit tests are coming.