Skip to content

Conversation

@andrewor14
Copy link
Contributor

Dynamic allocation is a feature that allows a Spark application to scale the number of executors up and down dynamically based on the workload. Support was first introduced in YARN since 1.2, and then extended to Mesos coarse-grained mode recently. Today, it is finally supported in standalone mode as well!

I tested this locally and it works as expected. This is WIP because unit tests are coming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @andrewor14 , maybe I'm a bit confused here, we add the workers to blacklist to avoid launching the executors immediately after killing them (https://github.com/apache/spark/pull/7532/files#diff-29dffdccd5a7f4c8b496c293e87c8668R836) ....but it seems that, if we run one executor per worker, this rule does not apply?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean when blacklistedWorkers.length <= numMissingExecutors ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's OK for numMissingExecutors > blacklistedWorkers. In that case we just remove everything from the blacklist. Imagine the case where we have 5 workers (each has 1 executor), then we kill two executors and add their workers to the blacklist. Now if I request 10 extra executors, then 10 > 2 and everything from my blacklist is removed such that we launch executors again, which is what I want.

Does that answer your question?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for answering, but if so, why we have to blacklist the workers? Upon I request more resources, the just-released workers will be taken again...

my second thought is that even numMissingExecutors > blacklistedWorkers, can we ensure that the numMissingExecutors executor IDs we take from blacklist are the earliest ones we push into the HashSet? (I think no, since it's a HashSet)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for answering, but if so, why we have to blacklist the workers? Upon I request more resources, the just-released workers will be taken again...

Maybe it's not clear in the comments. There are two kinds of applications, ones that set spark.executor.cores and ones that didn't. For the applications that didn't set it, each executor just grabs all the available cores on the worker, so each worker can run at most 1 executor.

In this mode, whenever we kill an executor we blacklist its corresponding worker to avoid immediately launching a new executor on that worker. However, there are cases where we do want a new executor to be launched after we kill an executor. For example, if we had previously requested 5 total executors, but there is only space for 3, then 2 executors will be waiting. Then, if we kill 2 existing executors, 2 new ones (the ones that were waiting) should come back up immediately.

This is a little hard to describe concisely. Let me know if that makes sense.

my second thought is that even numMissingExecutors > blacklistedWorkers, can we ensure that the numMissingExecutors executor IDs we take from blacklist are the earliest ones we push into the HashSet? (I think no, since it's a HashSet)

I don't think there are benefits to imposing an ordering on the workers we remove.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so blacklist is just for filtering the workers in the next scheduling rounds until we do request more resources. I thought its semantics is related to "unlike this worker"

Everything is clear now! Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. I've added many more paragraphs of comments in the latest commit to clarify this. Thanks for pointing out the potential source of confusion.

@SparkQA
Copy link

SparkQA commented Jul 20, 2015

Test build #37860 has finished for PR 7532 at commit 8352801.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class RequestExecutors(appId: String, requestedTotal: Int)
    • case class KillExecutors(appId: String, executorIds: Seq[String])

@SparkQA
Copy link

SparkQA commented Jul 20, 2015

Test build #37862 has finished for PR 7532 at commit e282874.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class RequestExecutors(appId: String, requestedTotal: Int)
    • case class KillExecutors(appId: String, executorIds: Seq[String])

Andrew Or added 6 commits July 20, 2015 14:47
This adds two messages between the AppClient on the driver and
the standalone Master: request and kill. The scheduling on the
Master side handles both applications that explicitly set
`spark.executor.cores` and those that did not.

TODO: clean up shuffle files on application exit and unit tests.
The blacklisting logic is fairly intricate, but there was not
really high level documentation describing why the blacklist
is needed. This commit addresses that.
There's a weird scalastyle bug that thinks there is no space
after the comma in the following:
```
someMethod(name, /* clean up */ true)
```
@SparkQA
Copy link

SparkQA commented Jul 21, 2015

Test build #37863 has finished for PR 7532 at commit 1334e9a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class RequestExecutors(appId: String, requestedTotal: Int)
    • case class KillExecutors(appId: String, executorIds: Seq[String])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think it doesn't need to be both a var and a mutable collection. You could (with minimal changes) make this an immutable.HashSet.

My 2c.

@CodingCat
Copy link
Contributor

Hey, @andrewor14 , this patch is a very useful, especially for the users like me, who runs Spark in standalone mode

I left more comments here, most for the outdated comments in SparkContext which says the dynamic allocations are only support in YARN mode

the other question is that

from the code, I think what will happen if an application requests (call sc.requestTotalExecutors(x)) less resources than what it has been assigned is that it will not get more resources but still runs with the resources in hand, right?

if so...I'm questioning that if it is a good way to manage the resources, the application has to release the resources by explicitly calling sc.killExecutors()? However...killExecutors is pretty hard to use, because I need to get the exact IDs of executors....can we bring a new API like killExecutors(numExectors: Int)?

Andrew Or added 4 commits July 26, 2015 11:20
Conflicts:
	core/src/main/scala/org/apache/spark/deploy/master/Master.scala
	project/MimaExcludes.scala
This commit significantly simplifies the logic by using the
executor limit as a cap. Instead of a completely separate code
path that added and removed workers from blacklist, reusing the
executor limit allows us to reason about the behavior of dynamic
allocation more straightforwardly.
@SparkQA
Copy link

SparkQA commented Jul 27, 2015

Test build #38499 has finished for PR 7532 at commit 0a8be79.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class RequestExecutors(appId: String, requestedTotal: Int)
    • case class KillExecutors(appId: String, executorIds: Seq[String])

Andrew Or added 2 commits July 26, 2015 23:51
This commit also uncovers a bug where the limit is not applied
correctly for cases where cores per executor is not defined.
The symptom is that every executor launched under this limit
has exactly 1 core. There is a regression test for this case now.
@andrewor14 andrewor14 changed the title [WIP][SPARK-4751] Dynamic allocation in standalone mode [SPARK-4751] Dynamic allocation in standalone mode Jul 27, 2015
@andrewor14
Copy link
Contributor Author

from the code, I think what will happen if an application requests (call sc.requestTotalExecutors(x)) less resources than what it has been assigned is that it will not get more resources but still runs with the resources in hand, right?

I don't understand your question. Are you asking what happens if I request more than what my cluster can support? Spark expects the cluster manager to eventually fulfill the request, so once space frees up the Master will launch executors that could not be launched before.

However...killExecutors is pretty hard to use, because I need to get the exact IDs of executors....can we bring a new API like killExecutors(numExectors: Int)?

That is completely orthogonal to this patch. We can add that in a separate issue if there is demand for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it cleaner to declare one variable (coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)) instead of two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I thought we used coresPerExecutor down there, but maybe not in the latest code. If not we can merge these two.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you missed this one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually in the latest code we do use coresPerExecutor in 2 places. This saves us from duplicating app.desc.coresPerExecutor twice. (minor point)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see

@nishkamravi2
Copy link
Contributor

Hey @andrewor14, the scheduling logic and unit tests look good. Would it make sense to add a test for the dynamic allocation behavior?

@andrewor14
Copy link
Contributor Author

@vanzin I just looked at the code again and there doesn't seem to be a great way to separate it out. This is because, in the "one executor per worker" case, we actually need to keep scheduling cores on the existing executors even when we're already at the limit. E.g. if the limit is 3 and we have already scheduled 1 core on each worker (1, 1, 1), we should keep scheduling even though we already "have" 3 executors. I think the logic there is inherently complex because we have 2 different cases.

Instead, I updated the comment and variable placement to make this easier to read.

@SparkQA
Copy link

SparkQA commented Jul 31, 2015

Test build #39276 has finished for PR 7532 at commit accc8f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class RequestExecutors(appId: String, requestedTotal: Int)
    • case class KillExecutors(appId: String, executorIds: Seq[String])
    • class MultilayerPerceptronClassifier(override val uid: String)
    • class KinesisUtils(object):
    • class InitialPositionInStream(object):

@vanzin
Copy link
Contributor

vanzin commented Aug 1, 2015

in the "one executor per worker" case, we actually need to keep scheduling cores

I'm not sure I completely understand all the code here, but it seems to me like this case would be covered by just ignoring the requested number of cores in the "one executor per worker" case, and requesting as many cores as the worker has available.

@vanzin
Copy link
Contributor

vanzin commented Aug 1, 2015

But, regardless, don't hold off the patch because of that.

Andrew Or added 2 commits August 1, 2015 00:27
This commit is surprisingly difficult because of the complex
interactions between various components in the cluster. Some
modifications to the code were necessary to ensure a strict
ordering of events that the test can assume.

This test uncovered one race condition! If we send an add request
immediately after a kill request, we may not get an executor back
because we never released the cores in the ApplicationInfo. Only
until after the executor is actually killed by the worker do we
release these cores. This is now fixed with a regression test.
@andrewor14
Copy link
Contributor Author

I'm not sure I completely understand all the code here, but it seems to me like this case would be covered by just ignoring the requested number of cores in the "one executor per worker" case, and requesting as many cores as the worker has available.

We can't ignore the requested number of cores because we need to support spark.cores.max in the spread out case. E.g. if we have 5 workers with 10 cores each and spark.cores.max = 18 then we should end up with [4, 4, 4, 3, 3]. In this case we can't just allocate all the available cores on the worker, so instead we need to schedule the cores 1 by 1.

@andrewor14
Copy link
Contributor Author

Alright, as of the latest commit I've added end-to-end tests in addition to the existing unit tests per @nishkamravi2's suggestion. Given the extensive test coverage and my manual testing I'm am fairly confident about the correctness in this patch.

Thanks everyone for the reviews. @CodingCat @dragos @vanzin @nishkamravi2

@SparkQA
Copy link

SparkQA commented Aug 1, 2015

Test build #39338 has finished for PR 7532 at commit b3c1736.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public static final class BytesToBytesMapIterator implements Iterator<Location>
    • case class RequestExecutors(appId: String, requestedTotal: Int)
    • case class KillExecutors(appId: String, executorIds: Seq[String])
    • public abstract class UnsafeKeyValueSorter
    • abstract class InternalRow extends GenericSpecializedGetters with Serializable
    • trait GenericSpecializedGetters extends SpecializedGetters
    • case class SortArray(base: Expression, ascendingOrder: Expression)
    • case class GetArrayItem(child: Expression, ordinal: Expression)
    • case class GetMapValue(child: Expression, key: Expression)
    • class ArrayBasedMapData(val keyArray: ArrayData, val valueArray: ArrayData) extends MapData
    • class GenericArrayData(array: Array[Any]) extends ArrayData with GenericSpecializedGetters
    • abstract class MapData extends Serializable
    • public abstract class KVIterator<K, V>

@SparkQA
Copy link

SparkQA commented Aug 1, 2015

Test build #1267 has finished for PR 7532 at commit b3c1736.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 1, 2015

Test build #1270 has finished for PR 7532 at commit b3c1736.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class RequestExecutors(appId: String, requestedTotal: Int)
    • case class KillExecutors(appId: String, executorIds: Seq[String])

@SparkQA
Copy link

SparkQA commented Aug 1, 2015

Test build #1268 has finished for PR 7532 at commit b3c1736.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 1, 2015

Test build #1271 has finished for PR 7532 at commit b3c1736.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 1, 2015

Test build #39349 has finished for PR 7532 at commit b3c1736.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class RequestExecutors(appId: String, requestedTotal: Int)
    • case class KillExecutors(appId: String, executorIds: Seq[String])

@andrewor14
Copy link
Contributor Author

Alright, merging into master. After 3 versions dynamic allocation is officially supported on all cluster managers. Woohoo!!

@asfgit asfgit closed this in 6688ba6 Aug 1, 2015
@andrewor14 andrewor14 deleted the standalone-da branch August 1, 2015 22:00
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be logged at DEBUG level ?

@mkhaitman
Copy link

Been testing this merged PR specifically and it looks great so far :). One thing I noticed however is that the TaskSchedulerImpl throws and error once an executor has been lost, even if that's the expected behaviour of using Dynamic Allocation. Would it make more sense to produce a warning as opposed to error in the scenario that the executor is dropped as a result of executor idle timeout?

@andrewor14
Copy link
Contributor Author

Hi @mkhaitman definitely. There's an outstanding JIRA for this SPARK-4134. This will be addressed in the future hopefully by 1.6.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants