Skip to content

Conversation

agrawaldevesh
Copy link

@agrawaldevesh agrawaldevesh commented Jul 25, 2020

What changes were proposed in this pull request?

This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.

Description of the problem

Make the block manager decommissioning test be less flaky

An interesting failure happens when migrateDuring = true (and persist or shuffle is true):

  • We schedule the job with tasks on executors 0, 1, 2.
  • We wait 300 ms and decommission executor 0.
  • If the task is not yet done on executor 0, it will now fail because
    the block manager won't be able to save the block. This condition is
    easy to trigger on a loaded machine where the github checks run.
  • The task with retry on a different executor (1 or 2) and its shuffle
    blocks will land there.
  • No actual block migration happens here because the decommissioned
    executor technically failed before it could even produce a block.

To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.

The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.

Why are the changes needed?

I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.

Does this PR introduce any user-facing change?

No, unit test only change.

How was this patch tested?

Github checks. Ran this test 100 times, 10 at a time in parallel in a script.

@agrawaldevesh agrawaldevesh force-pushed the block-manager-decom-flaky branch 2 times, most recently from 7659d22 to 558702e Compare July 25, 2020 00:28
@agrawaldevesh agrawaldevesh changed the title Make the block manager decommissioning test be less flaky Fix flakyness of BlockManagerDecommissionIntegrationSuite Jul 25, 2020
@agrawaldevesh
Copy link
Author

cc: @holdenk @attilapiros for review please.

@agrawaldevesh agrawaldevesh marked this pull request as draft July 25, 2020 02:07
@holdenk
Copy link
Contributor

holdenk commented Jul 25, 2020 via email

@SparkQA
Copy link

SparkQA commented Jul 25, 2020

Test build #126517 has finished for PR 29226 at commit 7659d22.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 25, 2020

Test build #126518 has finished for PR 29226 at commit 558702e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@agrawaldevesh
Copy link
Author

Sorry it didn't work. I will make another go at it.

@SparkQA
Copy link

SparkQA commented Jul 25, 2020

Test build #126526 has finished for PR 29226 at commit 288c34e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 25, 2020

Test build #126530 has finished for PR 29226 at commit 4c8bafb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 25, 2020

Test build #126531 has finished for PR 29226 at commit 199ea03.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 25, 2020

Test build #126536 has finished for PR 29226 at commit b2c2fad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@agrawaldevesh
Copy link
Author

The remaining failure is in SparkR and is probably environment related.

@agrawaldevesh agrawaldevesh marked this pull request as ready for review July 25, 2020 16:38
@agrawaldevesh agrawaldevesh force-pushed the block-manager-decom-flaky branch from b2c2fad to ba12869 Compare July 25, 2020 17:20
@SparkQA
Copy link

SparkQA commented Jul 25, 2020

Test build #126550 has finished for PR 29226 at commit ba12869.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@agrawaldevesh agrawaldevesh force-pushed the block-manager-decom-flaky branch from ba12869 to a939c67 Compare July 25, 2020 20:05
@SparkQA
Copy link

SparkQA commented Jul 25, 2020

Test build #126554 has finished for PR 29226 at commit a939c67.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR might resolve the issue at hand but during the problem analysis it stops at a level and fails to reveal the root cause. I would like to encourage you to look a bit deeper: there might be an easier fix or a more straightforward one.

@agrawaldevesh agrawaldevesh force-pushed the block-manager-decom-flaky branch from a939c67 to 2cea5f4 Compare July 26, 2020 21:16
Comment on lines 40 to 49
test(s"verify that an already running task which is going to cache data succeeds " +
s"on a decommissioned executor after task start") {
runDecomTest(true, false, true, 1)
}

test(s"verify that an already running task which is going to cache data succeeds " +
s"on a decommissioned executor after iterator start") {
runDecomTest(true, false, true, 2)
}

Copy link
Author

@agrawaldevesh agrawaldevesh Jul 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk .. this is what I meant: I have added two tests to test the decom after "Task Start".

The first test is a negative test. It defines task start as when the driver says that the task is "launched" and as we discussed, it is expected to fail because decommissioning will be triggered before BlockManager.doPut is called and thus fail the task without starting the iterator.

The second test is what you want mentioned in your "intention": We want the decommissioning after the "iterator start" (ie user code started) to trigger a migration: The block manager will write the block anyway and it will be migrated. Do you think this second test indeed captures the intention of the test ?

The first negative test is just to demonstrate that it is a case we don't handle yet. Since we agree that it is not a case worth fixing for production, I will eventually delete it.

So now we cover the full gamut of interesting decom points in task: When it is just launched, When user code is activated, After it has ended.

@SparkQA
Copy link

SparkQA commented Jul 28, 2020

Test build #126678 has finished for PR 29226 at commit 1864347.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 28, 2020

Test build #126684 has finished for PR 29226 at commit 070d30c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 28, 2020

Test build #126687 has finished for PR 29226 at commit bea7776.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 28, 2020

Test build #126681 has finished for PR 29226 at commit b98495c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@agrawaldevesh
Copy link
Author

agrawaldevesh#1

I think this isn't needed anymore.

sc.addSparkListener(new SparkListener {

def getCandidateExecutorToDecom: Option[String] = if (whenToDecom == TaskStarted) {
accum.value.asScala.headOption
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is going to work as intended, accumulators send updates back at the end of the task, unless something has changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't believe that is the case, can we add an assertion in here that none of the tasks have finished yet?

@holdenk
Copy link
Contributor

holdenk commented Jul 28, 2020

I think I understand your intention with accumulators now, I don't think that's going to work.

@agrawaldevesh agrawaldevesh force-pushed the block-manager-decom-flaky branch from bea7776 to ed8224b Compare July 29, 2020 01:23
An interesting failure happens when migrateDuring = true (and persist or
shuffle is true):

- We schedule the job with tasks on executors 0, 1, 2.
- We wait 300 ms and decommission executor 0.
- If the task is not yet done on executor 0, it will now fail because
  the block manager won't be able to save the block. This condition is
  easy to trigger on a loaded machine where the github checks run.
- The task with retry on a different executor (1 or 2) and its shuffle
  blocks will land there.
- No actual block migration happens here because the decommissioned
  executor technically failed before it could even produce a block.

So this change makes two fixes to remove the above race condition.
- When migrateDuring = true, wait for a task to complete and write the
  block, and then decommission that executor.
- When migrateDuring = false, it is still possible (because of delay
  scheduling) for two tasks to be run on the same executor serially and
  one executor to go idle. In which case, we must make sure to
  decommission an executor that actually had a task run on it.
Now that we wait for an actual task to succeed, we don't need to wait
for events prior to that: broadcast of job-info finished and task
started. The waiting for the task end/success subsumes that. Simplifying
the test even further.
@agrawaldevesh agrawaldevesh force-pushed the block-manager-decom-flaky branch from ed8224b to d709002 Compare July 29, 2020 01:25
@agrawaldevesh agrawaldevesh force-pushed the block-manager-decom-flaky branch from d709002 to 1b469fb Compare July 29, 2020 01:29
@agrawaldevesh
Copy link
Author

I think I understand your intention with accumulators now, I don't think that's going to work.

Thanks for pointing this out @holdenk. As a spark n00b I have certainly learnt quite a bit by fixing this test. I tried another approach and also added the asserts you asked for.

I learnt that while spark does not update the accumulator until the task completes, it does graciously trigger a listener event when the accumulators are received from an executor via the heartbeat mechanism. I am using that to know for sure that the task has started for "real".

Please take a look.

@SparkQA
Copy link

SparkQA commented Jul 29, 2020

Test build #126743 has finished for PR 29226 at commit ed8224b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 29, 2020

Test build #126744 has finished for PR 29226 at commit 1b469fb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor

holdenk commented Jul 29, 2020

Yeah if you wait long enough for a heartbeat and check the event that should do the trick, but to avoid a flaky test your going to need a good length of time. I'll take another look tomorrow afternoon (have meetings all morning).

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. If no other comments I'll merge this tomorrow.

@asfgit asfgit closed this in 6032c5b Jul 30, 2020
@HyukjinKwon
Copy link
Member

HyukjinKwon commented Aug 7, 2020

For some reasons, the test seems still flaky:

BlockManagerDecommissionIntegrationSuite:
- verify that an already running task which is going to cache data succeeds on a decommissioned executor after task start *** FAILED *** (13 seconds, 145 milliseconds)
  The code passed to eventually never returned normally. Attempted 1829 times over 6.001362179 seconds. Last failure message: getCandidateExecutorToDecom.isDefined was false. (BlockManagerDecommissionIntegrationSuite.scala:179)
  org.scalatest.exceptions.TestFailedDueToTimeoutException:
  at org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:185)
  at org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:192)
  at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:402)
  at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:401)
  at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite.eventually(BlockManagerDecommissionIntegrationSuite.scala:34)
  at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:312)
  at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:311)
  at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite.eventually(BlockManagerDecommissionIntegrationSuite.scala:34)
  at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite.runDecomTest(BlockManagerDecommissionIntegrationSuite.scala:179)
  at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite.$anonfun$new$1(BlockManagerDecommissionIntegrationSuite.scala:45)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
  at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
  at org.scalatest.Transformer.apply(Transformer.scala:22)
  at org.scalatest.Transformer.apply(Transformer.scala:20)
  at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
  at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:158)
  at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
  at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
  at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:60)
  at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
  at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
  at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:60)
  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
  at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
  at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
  at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
  at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
  at org.scalatest.Suite.run(Suite.scala:1112)
  at org.scalatest.Suite.run$(Suite.scala:1094)
  at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
  at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
  at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
  at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
  at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:60)
  at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
  at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
  at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
  at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:60)
  at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318)
  at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513)
  at sbt.ForkMain$Run$2.call(ForkMain.java:296)
  at sbt.ForkMain$Run$2.call(ForkMain.java:286)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
  Cause: org.scalatest.exceptions.TestFailedException: getCandidateExecutorToDecom.isDefined was false
  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
  at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
  at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
  at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite.$anonfun$runDecomTest$6(BlockManagerDecommissionIntegrationSuite.scala:180)
  at org.scalatest.enablers.Retrying$$anon$4.makeAValiantAttempt$1(Retrying.scala:150)
  at org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:162)
  at org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:192)
  at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:402)
  at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:401)
  at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite.eventually(BlockManagerDecommissionIntegrationSuite.scala:34)
  at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:312)
  at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:311)
  at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite.eventually(BlockManagerDecommissionIntegrationSuite.scala:34)
  at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite.runDecomTest(BlockManagerDecommissionIntegrationSuite.scala:179)
  at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite.$anonfun$new$1(BlockManagerDecommissionIntegrationSuite.scala:45)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
  at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
  at org.scalatest.Transformer.apply(Transformer.scala:22)
  at org.scalatest.Transformer.apply(Transformer.scala:20)
  at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
  at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:158)
  at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
  at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
  at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:60)
  at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
  at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
  at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:60)
  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
  at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
  at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
  at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
  at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
  at org.scalatest.Suite.run(Suite.scala:1112)
  at org.scalatest.Suite.run$(Suite.scala:1094)
  at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
  at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
  at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
  at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
  at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:60)
  at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
  at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
  at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
  at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:60)
  at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318)
  at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513)
  at sbt.ForkMain$Run$2.call(ForkMain.java:296)
  at sbt.ForkMain$Run$2.call(ForkMain.java:286)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

dongjoon-hyun pushed a commit that referenced this pull request Aug 10, 2020
…onIntegrationSuite to reduce flakyness

### What changes were proposed in this pull request?

As reported by HyukjinKwon, BlockManagerDecommissionIntegrationSuite test is apparently still flaky (even after #29226): #29226 (comment).

The new flakyness is because the executors are not launching in the 6 seconds time out I had given them when run under github checks.

Bumped up the timeouts.

### Why are the changes needed?

To make this test not flaky so that it can give us high signal if decommissioning regresses.

### Does this PR introduce _any_ user-facing change?

No, unit test only check.

### How was this patch tested?

No new tests. Just github and jenkins.

Closes #29388 from agrawaldevesh/more_bm_harden.

Authored-by: Devesh Agrawal <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
holdenk added a commit to holdenk/spark that referenced this pull request Oct 27, 2020
[SPARK-21040][CORE] Speculate tasks which are running on decommission executors

This PR adds functionality to consider the running tasks on decommission executors based on some config.
In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.

Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.

Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.

Added UT.

Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.

Authored-by: Prakhar Jain <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`.

Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.

The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning.

New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

This PR is a cleaned-up version of the previous WIP PR I made apache#28331 (thanks to attilapiros for his very helpful reviewing on it :)).

Closes apache#28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up.

Lead-authored-by: Holden Karau <[email protected]>
Co-authored-by: Holden Karau <[email protected]>
Co-authored-by: “attilapiros” <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

[SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s

Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.

For more relevant information see here: fabric8io/kubernetes-client#1075

No

Running spark-submit to a k8s cluster.

Not sure how to make an automated test for this. If someone can help me out that would be great.

Closes apache#28423 from stijndehaes/bugfix/k8s-submit-resource-version-change.

Authored-by: Stijn De Haes <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

[SPARK-32217] Plumb whether a worker would also be decommissioned along with executor

This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along
with the DecommissionExecutor message.

The primary motivation is to know whether a decommissioned executor
would also be loosing shuffle files -- and thus it is important to know
whether the host would also be decommissioned.

In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message.

In the future, this `ExecutorDecommissionInfo` can be embellished for
knowing how long the executor has to live for scenarios like Cloud spot
kills (or Yarn preemption) and the like.

No

Tweaked an existing unit test in `AppClientSuite`

Closes apache#29032 from agrawaldevesh/plumb_decom_info.

Authored-by: Devesh Agrawal <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

[SPARK-32199][SPARK-32198] Reduce job failures during decommissioning

This PR reduces the prospect of a job loss during decommissioning. It
fixes two holes in the current decommissioning framework:

- (a) Loss of decommissioned executors is not treated as a job failure:
We know that the decommissioned executor would be dying soon, so its death is
clearly not caused by the application.

- (b) Shuffle files on the decommissioned host are cleared when the
first fetch failure is detected from a decommissioned host: This is a
bit tricky in terms of when to clear the shuffle state ? Ideally you
want to clear it the millisecond before the shuffle service on the node
dies (or the executor dies when there is no external shuffle service) --
too soon and it could lead to some wastage and too late would lead to
fetch failures.

  The approach here is to do this clearing when the very first fetch
failure is observed on the decommissioned block manager, without waiting for
other blocks to also signal a failure.

Without them decommissioning a lot of executors at a time leads to job failures.

The task scheduler tracks the executors that were decommissioned along with their
`ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure.

No

Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-)

- Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-).

Closes apache#29014 from agrawaldevesh/decom_harden.

Authored-by: Devesh Agrawal <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

[SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite

This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.

Make the block manager decommissioning test be less flaky

An interesting failure happens when migrateDuring = true (and persist or shuffle is true):
- We schedule the job with tasks on executors 0, 1, 2.
- We wait 300 ms and decommission executor 0.
- If the task is not yet done on executor 0, it will now fail because
   the block manager won't be able to save the block. This condition is
   easy to trigger on a loaded machine where the github checks run.
- The task with retry on a different executor (1 or 2) and its shuffle
   blocks will land there.
- No actual block migration happens here because the decommissioned
   executor technically failed before it could even produce a block.

To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.

The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.

I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.

No, unit test only change.

Github checks. Ran this test 100 times, 10 at a time in parallel in a script.

Closes apache#29226 from agrawaldevesh/block-manager-decom-flaky.

Authored-by: Devesh Agrawal <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

[SPARK-31197][CORE] Shutdown executor once we are done decommissioning

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

This is a rebase of apache#28817

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible.

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

Closes apache#29211 from holdenk/SPARK-31197-exit-execs-redone.

Authored-by: Holden Karau <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

Connect decommissioning to dynamic scaling

Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads

Make Spark's dynamic allocation use decommissioning

Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission

Fix up executor add for resource profile

Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits.

Verify executors decommissioned, then killed by external external cluster manager are re-launched

Verify some additional calls are not occuring in the executor allocation manager suite.

Dont' close the watcher until the end of the test

Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors

bump numparts up to 6

Revert "bump numparts up to 6"

This reverts commit daf96dd.

Small coment & visibility cleanup

CR feedback/cleanup

Cleanup the merge

[SPARK-21040][CORE] Speculate tasks which are running on decommission executors

This PR adds functionality to consider the running tasks on decommission executors based on some config.
In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.

Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.

Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.

Added UT.

Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.

Authored-by: Prakhar Jain <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`.

Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.

The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning.

New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

This PR is a cleaned-up version of the previous WIP PR I made apache#28331 (thanks to attilapiros for his very helpful reviewing on it :)).

Closes apache#28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up.

Lead-authored-by: Holden Karau <[email protected]>
Co-authored-by: Holden Karau <[email protected]>
Co-authored-by: “attilapiros” <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

[SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s

Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.

For more relevant information see here: fabric8io/kubernetes-client#1075

No

Running spark-submit to a k8s cluster.

Not sure how to make an automated test for this. If someone can help me out that would be great.

Closes apache#28423 from stijndehaes/bugfix/k8s-submit-resource-version-change.

Authored-by: Stijn De Haes <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

[SPARK-32217] Plumb whether a worker would also be decommissioned along with executor

This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along
with the DecommissionExecutor message.

The primary motivation is to know whether a decommissioned executor
would also be loosing shuffle files -- and thus it is important to know
whether the host would also be decommissioned.

In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message.

In the future, this `ExecutorDecommissionInfo` can be embellished for
knowing how long the executor has to live for scenarios like Cloud spot
kills (or Yarn preemption) and the like.

No

Tweaked an existing unit test in `AppClientSuite`

Closes apache#29032 from agrawaldevesh/plumb_decom_info.

Authored-by: Devesh Agrawal <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

[SPARK-32199][SPARK-32198] Reduce job failures during decommissioning

This PR reduces the prospect of a job loss during decommissioning. It
fixes two holes in the current decommissioning framework:

- (a) Loss of decommissioned executors is not treated as a job failure:
We know that the decommissioned executor would be dying soon, so its death is
clearly not caused by the application.

- (b) Shuffle files on the decommissioned host are cleared when the
first fetch failure is detected from a decommissioned host: This is a
bit tricky in terms of when to clear the shuffle state ? Ideally you
want to clear it the millisecond before the shuffle service on the node
dies (or the executor dies when there is no external shuffle service) --
too soon and it could lead to some wastage and too late would lead to
fetch failures.

  The approach here is to do this clearing when the very first fetch
failure is observed on the decommissioned block manager, without waiting for
other blocks to also signal a failure.

Without them decommissioning a lot of executors at a time leads to job failures.

The task scheduler tracks the executors that were decommissioned along with their
`ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure.

No

Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-)

- Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-).

Closes apache#29014 from agrawaldevesh/decom_harden.

Authored-by: Devesh Agrawal <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

[SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite

This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.

Make the block manager decommissioning test be less flaky

An interesting failure happens when migrateDuring = true (and persist or shuffle is true):
- We schedule the job with tasks on executors 0, 1, 2.
- We wait 300 ms and decommission executor 0.
- If the task is not yet done on executor 0, it will now fail because
   the block manager won't be able to save the block. This condition is
   easy to trigger on a loaded machine where the github checks run.
- The task with retry on a different executor (1 or 2) and its shuffle
   blocks will land there.
- No actual block migration happens here because the decommissioned
   executor technically failed before it could even produce a block.

To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.

The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.

I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.

No, unit test only change.

Github checks. Ran this test 100 times, 10 at a time in parallel in a script.

Closes apache#29226 from agrawaldevesh/block-manager-decom-flaky.

Authored-by: Devesh Agrawal <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

[SPARK-31197][CORE] Shutdown executor once we are done decommissioning

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

This is a rebase of apache#28817

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible.

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

Closes apache#29211 from holdenk/SPARK-31197-exit-execs-redone.

Authored-by: Holden Karau <[email protected]>
Signed-off-by: Holden Karau <[email protected]>

Connect decommissioning to dynamic scaling

Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads

Make Spark's dynamic allocation use decommissioning

Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission

Fix up executor add for resource profile

Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits.

Verify executors decommissioned, then killed by external external cluster manager are re-launched

Verify some additional calls are not occuring in the executor allocation manager suite.

Dont' close the watcher until the end of the test

Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors

bump numparts up to 6

Revert "bump numparts up to 6"

This reverts commit daf96dd.

Small coment & visibility cleanup

CR feedback/cleanup

Fix up the merge

CR feedback, move adjustExecutors to a common utility function

Exclude some non-public APIs

Remove junk

More CR feedback

Fix adjustExecutors backport

This test fails for me locally and from what I recall it's because we use a different method of resolving the bind address than upstream so disabling the test

This test fails for me locally and from what I recall it's because we use a different method of resolving the bind address than upstream so disabling the test

Cleanup and drop watcher changes from the backport
holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
…onIntegrationSuite to reduce flakyness

### What changes were proposed in this pull request?

As reported by HyukjinKwon, BlockManagerDecommissionIntegrationSuite test is apparently still flaky (even after apache#29226): apache#29226 (comment).

The new flakyness is because the executors are not launching in the 6 seconds time out I had given them when run under github checks.

Bumped up the timeouts.

### Why are the changes needed?

To make this test not flaky so that it can give us high signal if decommissioning regresses.

### Does this PR introduce _any_ user-facing change?

No, unit test only check.

### How was this patch tested?

No new tests. Just github and jenkins.

Closes apache#29388 from agrawaldevesh/more_bm_harden.

Authored-by: Devesh Agrawal <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants