Skip to content

Conversation

holdenk
Copy link
Contributor

@holdenk holdenk commented Nov 8, 2019

This PR is based on an existing/previou PR - #19045

What changes were proposed in this pull request?

This changes adds a decommissioning state that we can enter when the cloud provider/scheduler lets us know we aren't going to be removed immediately but instead will be removed soon. This concept fits nicely in K8s and also with spot-instances on AWS / preemptible instances all of which we can get a notice that our host is going away. For now we simply stop scheduling jobs, in the future we could perform some kind of migration of data during scale-down, or at least stop accepting new blocks to cache.

There is a design document at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE/edit?usp=sharing

Why are the changes needed?

With more move to preemptible multi-tenancy, serverless environments, and spot-instances better handling of node scale down is required.

Does this PR introduce any user-facing change?

There is no API change, however an additional configuration flag is added to enable/disable this behaviour.

How was this patch tested?

New integration tests in the Spark K8s integration testing. Extension of the AppClientSuite to test decommissioning seperate from the K8s.

…he cloud provider/scheduler lets them know they aren't going to be removed immeditely but instead will be removed soon. This concept fits nicely in K8s and also with spot-instances on AWS / pre-emptible instances all of which we can get a notice that our host is going away. For now we simply stop scheduling jobs & caching blocks, in the future we could perform some kind of migration of data during scale-down.
@SparkQA
Copy link

SparkQA commented Nov 8, 2019

Test build #113474 has finished for PR 26440 at commit 55b6f9e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just took a quick skim through, I know this is WIP so feel free to ignore comments if you just haven't implemented parts yet.

@holdenk
Copy link
Contributor Author

holdenk commented Nov 9, 2019

Looks like the JDK11 build is borked right now. I'll merge in master on Monday.

@SparkQA
Copy link

SparkQA commented Nov 9, 2019

Test build #113477 has finished for PR 26440 at commit bfa06ce.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

GitHub Action failure is due to Maven Outage.

[ERROR] Failed to execute goal on project spark-tools_2.12: Could not resolve dependencies for project org.apache.spark:spark-tools_2.12:jar:3.0.0-SNAPSHOT: Failed to collect dependencies at org.clapper:classutil_2.12:jar:1.5.1: Failed to read artifact descriptor for org.clapper:classutil_2.12:jar:1.5.1: 
Could not transfer artifact org.clapper:classutil_2.12:pom:1.5.1 from/to central (https://repo.maven.apache.org/maven2): Connection timed out (Read failed) -> [Help 1]

@dongjoon-hyun
Copy link
Member

Hi, @holdenk . Could you fix the WorkerDecommissionSuite UT failure?

I'm also facing the UT failure locally on my mac.

[info] WorkerDecommissionSuite:
[info] - verify task with no decommissioning works as expected (4 seconds, 318 milliseconds)
[info] - verify a task with all workers decommissioned succeeds *** FAILED *** (4 seconds, 194 milliseconds)

After fixing all UTs, let's trigger JDK11 testing, too.

@dongjoon-hyun dongjoon-hyun requested a review from dbtsai November 10, 2019 02:06
@holdenk
Copy link
Contributor Author

holdenk commented Nov 10, 2019

Sure, I’ll work on that this Monday.

@holdenk
Copy link
Contributor Author

holdenk commented Nov 11, 2019

Ok digging into, looks like while I was updating the PR from 2017 I accidentally broke the message receive code path, but I've just been doing integration on K8s with the new PR hence why the UT is broken. This might take another day to resolve because the code path is a little convulted and life is busy.

@SparkQA
Copy link

SparkQA commented Nov 12, 2019

Test build #113637 has started for PR 26440 at commit a63b68f.

@holdenk holdenk force-pushed the SPARK-20628-keep-track-of-nodes-which-are-going-to-be-shutdown-r4 branch from c1a0735 to 317c76b Compare November 12, 2019 23:09
@SparkQA
Copy link

SparkQA commented Nov 12, 2019

Test build #113654 has finished for PR 26440 at commit c1a0735.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 13, 2019

Test build #113655 has finished for PR 26440 at commit 317c76b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 13, 2019

Test build #113658 has finished for PR 26440 at commit 86c0ff6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 13, 2019

Test build #113660 has finished for PR 26440 at commit bdd7df3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk holdenk changed the title [WIP][SPARK-20628][CORE] Start to improve Spark decommissioning & preemption support [WIP][SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support Nov 13, 2019
decommissioned = true
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(executorId))
Copy link
Contributor

@SaurabhChawla100 SaurabhChawla100 Feb 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of decommission Executor, Can we have Entire node decommission
eg. driver.get.askSync[Boolean](AddNodeToDecommission(hostname, terminationTime, NodeLossReason))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as previous comment, in standalone only sure, but in YARN/K8s we could see individual executors decommission.

@holdenk
Copy link
Contributor Author

holdenk commented Feb 10, 2020

So @tooptoop4 in its present state it could help, you'd call decom instead of stop. But we'd probably want to see the last step to fully consider https://issues.apache.org/jira/browse/SPARK-30610 solved, right now it won't schedule any new jobs but won't exit & shutdown automatically (you can use a timer and sort of approximate it but it's not perfect).

@holdenk
Copy link
Contributor Author

holdenk commented Feb 10, 2020

.@itskals so I'm not 100% sure what you want us to do in handleFailedTask of fetch failure, I'm open to changing behavior there but given how long this PR has been going on I'm partial to pushing out any non-bug fixes in it to follow-ups (since there is already plan for follow-ups).

@holdenk
Copy link
Contributor Author

holdenk commented Feb 10, 2020

It passes tests now, I'm going to do a read through this week. If no one has any outstanding concerns though, I'm planning on merging this on Friday (to master). We can continue the discussion about 3.0 on dev@

@SparkQA
Copy link

SparkQA commented Feb 14, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/23137/

@SparkQA
Copy link

SparkQA commented Feb 14, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/23137/

@SparkQA
Copy link

SparkQA commented Feb 14, 2020

Test build #118380 has finished for PR 26440 at commit af55030.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in d273a2b Feb 14, 2020
@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Feb 15, 2020

Hi, @holdenk . This PR seems to add a flaky test on master branch. When I triggered twice, one run passed and the other run failed.

KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Use SparkLauncher.NO_RESOURCE
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- All pods have the same service account by default
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Run SparkPi with env and mount secrets.
- Run PySpark on simple pi.py example
- Run PySpark with Python2 to test a pyfiles example
- Run PySpark with Python3 to test a pyfiles example
- Run PySpark with memory customization
- Run in client mode.
- Start pod creation from template
- PVs with local storage
- Launcher client dependencies
- Test basic decommissioning *** FAILED ***
  The code passed to eventually never returned normally. Attempted 126 times over 2.0003060982000003 minutes.

Last failure message: "++ id -u ..." did not contain "decommissioning executor" The application did not complete.. (KubernetesSuite.scala:383)
- Run SparkR on simple dataframe.R example

@dongjoon-hyun
Copy link
Member

Do you have any idea about the root cause of flakiness?

sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…emption support

This PR is based on an existing/previou PR - apache#19045

### What changes were proposed in this pull request?

This changes adds a decommissioning state that we can enter when the cloud provider/scheduler lets us know we aren't going to be removed immediately but instead will be removed soon. This concept fits nicely in K8s and also with spot-instances on AWS / preemptible instances all of which we can get a notice that our host is going away. For now we simply stop scheduling jobs, in the future we could perform some kind of migration of data during scale-down, or at least stop accepting new blocks to cache.

There is a design document at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE/edit?usp=sharing

### Why are the changes needed?

With more move to preemptible multi-tenancy, serverless environments, and spot-instances better handling of node scale down is required.

### Does this PR introduce any user-facing change?

There is no API change, however an additional configuration flag is added to enable/disable this behaviour.

### How was this patch tested?

New integration tests in the Spark K8s integration testing. Extension of the AppClientSuite to test decommissioning seperate from the K8s.

Closes apache#26440 from holdenk/SPARK-20628-keep-track-of-nodes-which-are-going-to-be-shutdown-r4.

Lead-authored-by: Holden Karau <[email protected]>
Co-authored-by: Holden Karau <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
@Ngone51
Copy link
Member

Ngone51 commented May 8, 2020

Right so just send SIGPWR to the worker. Are you saying in standalone mode you have one worker with multiple executors and you want to decommission a specific executor? Regardless this isn’t really the place, how about you go back to the PR introducing SIGPWR handler

I am talking about decommission all executors in standalone. IIUC, all executors will shutdown at once (because of WorkerWatcher) when a worker shutdown(e.g. receive SIGPWR) in standalone. So we obviously have no time or it's meaningless to do decommission(cache copy, adding into executorsPendingDecommission ) for executors.

But I think it would work if SIGPWR is manually controlled.

@holdenk

@holdenk
Copy link
Contributor Author

holdenk commented May 8, 2020

I'm having difficulty understanding your concern @Ngone51

So in standalone mode it's up to the user to write their decommissioning shell script and register it with the cloud provider or whatever mechanism is being used to notify the executors of decommissioning (or if it's maintenance then send the signal manually). All of the executors will not shut down because one executor receives a SIGPWR. WorkerWatcher just checks to make sure the RPC connection stills works and we don't shut down the RPC mechanism during decommissioning.

Can you point out what in my understanding doesn't match your understanding?

@Ngone51
Copy link
Member

Ngone51 commented May 9, 2020

thanks @holdenk . It was me trying to understand the whole story. I can image how decommission performs in Standalone now if the signal is somehow controlled manually.

(At the beginning, I was thinking if SIGPWR is from hard-ware failure, then no one could escape and even impossible to do decommission.)

@gatorsmile
Copy link
Member

@holdenk Did we get any LGTM from the other committer before merging this PR?

@holdenk
Copy link
Contributor Author

holdenk commented May 22, 2020

Some other committers reviewed it early on and I brought it to the dev@ list and left it open after stating intent to merge Incase any committer who had been involved with reviewing had any blocking issues they wanted to raise.

@HyukjinKwon
Copy link
Member

I left a comment but I removed it back here. I will comment on the JIRA to discuss in single place to avoid having many branches of the same discussions, see SPARK-20624

@HyukjinKwon
Copy link
Member

For this PR specifically, I think it should have been explicitly approved as it affects all other components and it's pretty big. I think here is when we needed to call more reviews, and explicit approvals.

For #28370 specifically, the review feedback was not fully addressed, but just merged.

Maybe we should better avoid merging in this way.

@holdenk
Copy link
Contributor Author

holdenk commented Jun 19, 2020

So #28370 was merged with all committer comments addressed. There were some minor concerns from @Ngone51 but they were all suitable for follow up work. Is there a comment in there I was missing though that you believe needed to be addressed @HyukjinKwon ?

@HyukjinKwon
Copy link
Member

I haven't looked into the codes closely yet - I will try to read and follow more closely. I just noticed the discussions made in these PRs which are virtually from you.

My point is that:

It looks to me that we're rushing on these PRs where actually we should be the most conservative.

@holdenk
Copy link
Contributor Author

holdenk commented Jun 19, 2020

I haven't looked into the codes closely yet - I will try to read and follow more closely. I just noticed the discussions made in these PRs which are virtually from you.

My point is that:

That was my LGTM I'm going to merge this comment so yeah that's sort of what I expect. If there was another engaged committer who had expressed interest here of course I'd wait a bit for them to sign off as well.

  • It looks we needed to have a SPIP.

I'm not sure I agree, but if you do feel free to bring it up on the dev@ list and I can refactor the design doc into an SPIP formatted one.

It looks to me that we're rushing on these PRs where actually we should be the most conservative.

There is no plans to cut a release from master anytime soon, this isn't being back ported to branch-3, we've had multiple eyes on the design doc from various committers, it's disabled by default. The PR was open for multiple weeks (I've seen commiters merge commits larger than this with the PR being open for less than a day). I don't agree with you here, and if you still think I've been too hasty lets have the discussion on dev@ or private@ as appropriate.

(edit: formatting)

@holdenk
Copy link
Contributor Author

holdenk commented Jun 19, 2020

Also the first PR I made here was back in Aug 24, 2017. There has been plenty of time.

holdenk added a commit to holdenk/spark that referenced this pull request Jun 25, 2020
…emption support

This PR is based on an existing/previou PR - apache#19045

This changes adds a decommissioning state that we can enter when the cloud provider/scheduler lets us know we aren't going to be removed immediately but instead will be removed soon. This concept fits nicely in K8s and also with spot-instances on AWS / preemptible instances all of which we can get a notice that our host is going away. For now we simply stop scheduling jobs, in the future we could perform some kind of migration of data during scale-down, or at least stop accepting new blocks to cache.

There is a design document at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE/edit?usp=sharing

With more move to preemptible multi-tenancy, serverless environments, and spot-instances better handling of node scale down is required.

There is no API change, however an additional configuration flag is added to enable/disable this behaviour.

New integration tests in the Spark K8s integration testing. Extension of the AppClientSuite to test decommissioning seperate from the K8s.

Closes apache#26440 from holdenk/SPARK-20628-keep-track-of-nodes-which-are-going-to-be-shutdown-r4.

Lead-authored-by: Holden Karau <[email protected]>
Co-authored-by: Holden Karau <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
holdenk added a commit to holdenk/spark that referenced this pull request Oct 27, 2020
…emption support

This PR is based on an existing/previou PR - apache#19045

This changes adds a decommissioning state that we can enter when the cloud provider/scheduler lets us know we aren't going to be removed immediately but instead will be removed soon. This concept fits nicely in K8s and also with spot-instances on AWS / preemptible instances all of which we can get a notice that our host is going away. For now we simply stop scheduling jobs, in the future we could perform some kind of migration of data during scale-down, or at least stop accepting new blocks to cache.

There is a design document at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE/edit?usp=sharing

With more move to preemptible multi-tenancy, serverless environments, and spot-instances better handling of node scale down is required.

There is no API change, however an additional configuration flag is added to enable/disable this behaviour.

New integration tests in the Spark K8s integration testing. Extension of the AppClientSuite to test decommissioning seperate from the K8s.

Closes apache#26440 from holdenk/SPARK-20628-keep-track-of-nodes-which-are-going-to-be-shutdown-r4.

Lead-authored-by: Holden Karau <[email protected]>
Co-authored-by: Holden Karau <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.