-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow #37533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
|
Hi, @otterc Could you help to review this PR ? thanks |
Which main thread are you referring to here? Could you please explain which thread is being blocked. AFAICT this is already being done by Why can't you reduce the |
|
Also your solution is adding shuffle service nodes to an excluded list which isn't what the description says. Could you please explain with examples/logs of what problems are you facing and the solution that is being proposed? |
Yes, DAGSchuedler will finalize each shuffle map stage in one Now we will only call |
|
Hi, @otterc could you help me to review this PR? Or should I provide more information on this issue ? |
|
So the issue is that the wait period timer doesn't take into account the time for connection creation which is a bug. However, in this PR you are adding another major change of excluding merger nodes based on this. I don't think we should combine that with the fix for this bug because if we decide to do that then we need to pay more careful consideration of how it interacts with exclusion of executor nodes, etc. |
| shuffleMergeFinalizeScheduler.schedule(new Runnable { | ||
| override def run(): Unit = { | ||
| stage.shuffleDep.getMergerLocs.zipWithIndex.foreach { | ||
| case (shuffleServiceLoc, index) | ||
| if finalizeBlackNodes.getIfPresent(shuffleServiceLoc.host) == null => | ||
| // Sends async request to shuffle service to finalize shuffle merge on that host | ||
| // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is cancelled | ||
| // TODO: during shuffleMergeFinalizeWaitSec | ||
| shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host, | ||
| shuffleServiceLoc.port, shuffleId, shuffleMergeId, | ||
| new MergeFinalizerListener { | ||
| override def onShuffleMergeSuccess(statuses: MergeStatuses): Unit = { | ||
| assert(shuffleId == statuses.shuffleId) | ||
| eventProcessLoop.post(RegisterMergeStatuses(stage, MergeStatus. | ||
| convertMergeStatusesToMergeStatusArr(statuses, shuffleServiceLoc))) | ||
| results(index).set(true) | ||
| } | ||
|
|
||
| override def onShuffleMergeFailure(e: Throwable): Unit = { | ||
| logWarning(s"Exception encountered when trying to finalize shuffle " + | ||
| s"merge on ${shuffleServiceLoc.host} for shuffle $shuffleId", e) | ||
| // Do not fail the future as this would cause dag scheduler to prematurely | ||
| // give up on waiting for merge results from the remaining shuffle services | ||
| // if one fails | ||
| results(index).set(false) | ||
| } | ||
| }) | ||
| } | ||
| override def onShuffleMergeFailure(e: Throwable): Unit = { | ||
| logWarning(s"Exception encountered when trying to finalize shuffle " + | ||
| s"merge on ${shuffleServiceLoc.host} for shuffle $shuffleId", e) | ||
| // Do not fail the future as this would cause dag scheduler to prematurely | ||
| // give up on waiting for merge results from the remaining shuffle services | ||
| // if one fails | ||
| if (e.isInstanceOf[IOException]) { | ||
| logInfo(s"Failed to connect external shuffle service on " + | ||
| s"${shuffleServiceLoc.host} and add it to blacklist") | ||
| blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host) | ||
| finalizeBlackNodes.put(shuffleServiceLoc.host, shuffleServiceLoc.host) | ||
| } | ||
| results(index).set(false) | ||
| } | ||
| }) | ||
|
|
||
| case (_, index) => results(index).set(true) | ||
| } | ||
| } | ||
| }, 0, TimeUnit.SECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when shuffleMergeFinalizeScheduler has just 1 thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If shuffleMergeFinalizeScheduler has just 1 thread, finalizeShuffleMerge method and send RPCs to merger locations will run one by one. so we won't get any merged results, and stages will behave as if push based shuffle was disabled.
A small UT code
test("test schedule executor with only one thread") {
import java.util.concurrent.TimeoutException
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.SettableFuture
import org.apache.spark.util.ThreadUtils
logInfo("UT start")
val pool =
ThreadUtils.newDaemonThreadPoolScheduledExecutor("test_thread_pool", 1)
val workerNum = 2
pool.schedule(new Runnable() {
override def run(): Unit = {
logInfo("run finalizeShuffleMerge method")
val results = (0 until workerNum).map(_ => SettableFuture.create[Boolean]())
pool.schedule(new Runnable() {
override def run(): Unit = {
logInfo("run finalizeShuffleMerge method")
(0 until workerNum).map(index => {
logInfo(s"begin send finalize RPC to ESS $index")
Thread.sleep(2000)
logInfo(s"end send finalize RPC to ESS $index")
try{
results(index).set(true)
} catch {
case ex: Throwable =>
logError(s"Fail to set result status")
}
})
}
}, 0, TimeUnit.SECONDS)
try {
Futures.allAsList(results: _*).get(5, TimeUnit.SECONDS)
} catch {
case _: TimeoutException =>
logError(s"Timed out exception from main thread")
}
}
}, 0, TimeUnit.SECONDS)
Thread.sleep(30000L)
logInfo("UT end")
}22/08/24 15:52:39.584 ScalaTest-run-running-DAGSchedulerSuite INFO DAGSchedulerSuite: UT start
22/08/24 15:52:39.595 test_thread_pool-0 INFO DAGSchedulerSuite: run finalizeShuffleMerge method
22/08/24 15:52:44.614 test_thread_pool-0 ERROR DAGSchedulerSuite: Timed out exception from main thread
22/08/24 15:52:44.614 test_thread_pool-0 INFO DAGSchedulerSuite: run finalizeShuffleMerge method
22/08/24 15:52:44.615 test_thread_pool-0 INFO DAGSchedulerSuite: begin send finalize RPC to ESS 0
22/08/24 15:52:46.621 test_thread_pool-0 INFO DAGSchedulerSuite: end send finalize RPC to ESS 0
22/08/24 15:52:46.623 test_thread_pool-0 INFO DAGSchedulerSuite: begin send finalize RPC to ESS 1
22/08/24 15:52:48.625 test_thread_pool-0 INFO DAGSchedulerSuite: end send finalize RPC to ESS 1
22/08/24 15:53:09.597 ScalaTest-run-running-DAGSchedulerSuite INFO DAGSchedulerSuite: UT end
Hi, @otterc thanks for your review. I have remove the logical for adding merger nodes into blacklist if an IOException was thrown. Now each stage needs to wait for |
| convertMergeStatusesToMergeStatusArr(statuses, shuffleServiceLoc))) | ||
| results(index).set(true) | ||
| } | ||
| shuffleMergeFinalizeScheduler.schedule(new Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering just 1 thread in the shuffleMergeFinalizerScheduler, that thread will submit another task (task to send RPCs) to pool here but then it will wait on line 2312 for the results. It will timeout because the other task is pending. This is a problem. I don't think we can use the same threadpool to run finalizeMethod and send the RPCs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on pushing finalization send into the threadpool instead ?
val scheduledFutures = stage.shuffleDep.getMergerLocs.zipWithIndex.map {
case (shuffleServiceLoc, index) =>
shuffleMergeFinalizeScheduler.schedule(new Runnable() {
override def run(): Unit = {
// existing code within the "case"
}
}, 0, TimeUnit.SECONDS));
}.toList
And if there is a TimeoutException, we cancel all the scheduledFutures (scheduledFutures.map(_.cancel(true)))
We should also bump up the default number of threads in this threadpool.
This will make sure that we wait utmost for shuffleMergeResultsTimeoutSec seconds for finalization to complete. In almost all cases, the shuffleClient.finalizeShuffleMerge will be really quick - and so the overhead is fairly low - but for the rare cases where it is not, we will only block that specific sending thread (while all other threads will send finalize message to other merger hosts), and we will always complete within the timeout (and release blocked threads)
Thoughts ?
+CC @venkata91
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @mridulm I don't think we should cancel all the scheduledFutures if there is a TimeoutException. Merger ESS need this finalize RPC to finish its merge process. So we should try our best to send all the RPCs even after shuffleMergeResultsTimeoutSec. Am I right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When either all mergers report success or there is a timeout (whichever is earlier), we fire ShuffleMergeFinalized - at which point, the shuffle is marked finalized - and all subsequent merge statuses from ESS are ignored.
At this point, sending FinalizeShuffleMerge to mergers does not impact the application once ShuffleMergeFinalized is done.
So to answer your question - once there is a timeout, it does not help to send FinalizeShuffleMerge
The current code in master is assuming that sending FinalizeShuffleMerge is really cheap (which would typically be the case if there are no n/w issues connecting to mergers !) - and so does it as a prefix, before starting to wait for mergers to complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a long-running spark application, if no finalize partition RPC is received, ess will not close open data files, meta files, and index files until the application is complete. Too many opening files can be a potential risk.
Or should we add a parameter to control whether to continue sending finalize RPC to the remaining nodes ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, and that is why driver makes a best case effort to send the message.
If we are unable to do so in reasonable time due to network issues (n/w partition, etc), failures, and so on - there could be files which are not yet closed.
On other hand, keeping this pending state around in driver for extended periods of time will cause its own issues.
Note that in general, there will always be some dropped messages (for ex when failure handler is getting invoked) - so some variant of what you described can occur even if we make this specific case robust - what we have to evaluate is how much of an impact it has on shuffle service itself in context of whether it will negatively impact NM stability.
In our environment, this has been in prod for a while now - and we have not seen this specific issue. Having said that, for a different application and network characteristics, it could possibly be an issue : more information will help understand it.
One option could be to evaluate if we move failed sends and cancelled tasks to a different threadpool and retry a "few" times to send the message and mitigate the issue (thoughts on adding a cancel-merge instead of finalize-merge for that shuffleid-shuffleattemptid ?). This will have no performance/functional impact on driver/application, but can mitigate ESS load in terms of open files.
Thoughts @wankunde, @otterc ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ESS does not connect back to the executors or driver - connections/requests are initiated by executors and driver to ESS.
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
|
Hi, @mridulm @otterc
Does this works? |
| // if finalize RPC is not received due to network issues. | ||
| private val shuffleSendFinalizeRPCExecutor: ExecutorService = | ||
| ThreadUtils.newDaemonFixedThreadPool( | ||
| shuffleSendFinalizeRPCThreads, "send-shuffle-merge-finalize-rpc") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix indentation
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
| ConfigBuilder("spark.shuffle.push.sendFinalizeRPCThreads") | ||
| .doc("Number of threads used by the driver to send finalize shuffle RPC to mergers" + | ||
| " location and then get MergeStatus. The thread won't stop" + | ||
| " PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. The merger ESS may open too many files" + | ||
| " if the finalize rpc is not received.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ConfigBuilder("spark.shuffle.push.sendFinalizeRPCThreads") | |
| .doc("Number of threads used by the driver to send finalize shuffle RPC to mergers" + | |
| " location and then get MergeStatus. The thread won't stop" + | |
| " PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. The merger ESS may open too many files" + | |
| " if the finalize rpc is not received.") | |
| ConfigBuilder("spark.shuffle.push.sendFinalizeRPCThreads") | |
| .doc("Number of threads used by the driver to send finalize shuffle RPC to mergers." + | |
| " External shuffle servers initiate merge finalization on receiving this request.") |
Also, mark it as internal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not addressed @wankunde
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
|
I'm sorry for the late reply, I have updated the code. |
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good to me.
| } | ||
|
|
||
| test("SPARK-40096: Send finalize events even if shuffle merger blocks indefinitely " + | ||
| "with registerMergeResults is false") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we merge this test with the previous one ?
Essentially, something like:
Seq(true, false) { registerMergeResults =>
test("SPARK-40096: Send finalize events even if shuffle merger blocks indefinitely " +
s"with registerMergeResults is $registerMergeResults") {
....
myScheduler.finalizeShuffleMerge(shuffleStage, registerMergeResults)
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mridulm I have merged UTs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
otterc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me. Just had a nit.
| " location and then get MergeStatus. The thread won't stop" + | ||
| " PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. The merger ESS may open too many files" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Could you please rephrase this? What do you mean to say by The thread won't stop PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, update the doc to The thread will run for up to PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT
…d length ### What changes were proposed in this pull request? This PR is a followup of #37533 that works around the test failure by explicitly checking the element and length in the test. ### Why are the changes needed? For an unknown reason the test added in is flaky even though both `ArrayBuffer` and `List` are `Sep` and the test should pass up to my best knowledge. See https://github.com/apache/spark/actions/runs/3109851954/jobs/5040465291 ``` [info] - SPARK-40096: Send finalize events even if shuffle merger blocks indefinitely with registerMergeResults is false *** FAILED *** (90 milliseconds) [info] ArrayBuffer("hostB") did not equal List("hostB") (DAGSchedulerSuite.scala:4498) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.scheduler.DAGSchedulerSuite.$anonfun$new$286(DAGSchedulerSuite.scala:4498) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:207) [info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:66) [info] at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) [info] at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) [info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:66) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) [info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) [info] at scala.collection.immutable.List.foreach(List.scala:431) ``` ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? CI in this PR should verify that. Closes #37989 from HyukjinKwon/SPARK-40096-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Sean Owen <[email protected]>
| } | ||
|
|
||
| Seq(true, false).foreach { registerMergeResults => | ||
| test("SPARK-40096: Send finalize events even if shuffle merger blocks indefinitely " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test is flaky. It fails periodically in different PRs. For example,
https://github.com/MaxGekk/spark/actions/runs/3420903796/jobs/5696336947
### What changes were proposed in this pull request? This PR is a followup of #37533 that fix the flaky test case. ### Why are the changes needed? The test case is flaky, and will failure due to some unexpected error. #37989 https://github.com/apache/spark/actions/runs/3145115911/jobs/5112006948 https://github.com/apache/spark/actions/runs/3146198025/jobs/5114387367 ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? CI in this PR should verify that. Closes #38091 from wankunde/SPARK-40096-2. Authored-by: Kun Wan <[email protected]> Signed-off-by: Mridul <mridul<at>gmail.com>
### What changes were proposed in this pull request? This PR is a followup of apache#37533 that fix the flaky test case. ### Why are the changes needed? The test case is flaky, and will failure due to some unexpected error. apache#37989 https://github.com/apache/spark/actions/runs/3145115911/jobs/5112006948 https://github.com/apache/spark/actions/runs/3146198025/jobs/5114387367 ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? CI in this PR should verify that. Closes apache#38091 from wankunde/SPARK-40096-2. Authored-by: Kun Wan <[email protected]> Signed-off-by: Mridul <mridul<at>gmail.com>
What changes were proposed in this pull request?
This PR will run
scheduleShuffleMergeFinalize()and sendfinalizeShuffleMergeRPCs in two threads, and stop all work afterPUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUTregardless of sucess or failure.Now we will only call
removeShufflePushMergerLocationwhen shuffle fetch fails, this PR will also prevent these merger nodes from bing selected as mergeLocations when creating connections fails. Adding those bad merge nodes to finalizeBlackNodes, so subsequent shuffle map stages will not try to connect them.Why are the changes needed?
DAGSchuedler will finalize each shuffle map stage in one
shuffle-merge-finalizerthread, and lockclientPool.locks[clientIndex]when creating connect to the ESS merger node, the othershuffle-merge-finalizerthreads (one stage per thread) will wait forSPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY.Although reducing
SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEYhelps, the total wait time( SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY * lostMergerNodesSize * stageSize ) will still be long.Does this PR introduce any user-facing change?
No
How was this patch tested?
Add UT