Skip to content

Conversation

@wankunde
Copy link
Contributor

@wankunde wankunde commented Aug 16, 2022

What changes were proposed in this pull request?

This PR will run scheduleShuffleMergeFinalize() and send finalizeShuffleMerge RPCs in two threads, and stop all work after PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT regardless of sucess or failure.

Now we will only call removeShufflePushMergerLocation when shuffle fetch fails, this PR will also prevent these merger nodes from bing selected as mergeLocations when creating connections fails. Adding those bad merge nodes to finalizeBlackNodes, so subsequent shuffle map stages will not try to connect them.

Why are the changes needed?

DAGSchuedler will finalize each shuffle map stage in one shuffle-merge-finalizer thread, and lock clientPool.locks[clientIndex] when creating connect to the ESS merger node, the other shuffle-merge-finalizer threads (one stage per thread) will wait for SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY.
Although reducing SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY helps, the total wait time( SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY * lostMergerNodesSize * stageSize ) will still be long.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Add UT

@github-actions github-actions bot added the CORE label Aug 16, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@wankunde
Copy link
Contributor Author

Hi, @otterc Could you help to review this PR ? thanks

@otterc
Copy link
Contributor

otterc commented Aug 18, 2022

@wankunde

Send finalize RPCs will block the main thread due to creating connection to some unreachable nodes.

Which main thread are you referring to here? Could you please explain which thread is being blocked. AFAICT this is already being done by shuffle-merge-finalizer threads.

Why can't you reduce the SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY default for the cluster? One of the reasons this configuration was introduced was because creating connection should have a lower timeout than connection idle time

@otterc
Copy link
Contributor

otterc commented Aug 18, 2022

Also your solution is adding shuffle service nodes to an excluded list which isn't what the description says. Could you please explain with examples/logs of what problems are you facing and the solution that is being proposed?

@wankunde
Copy link
Contributor Author

@wankunde

Send finalize RPCs will block the main thread due to creating connection to some unreachable nodes.

Which main thread are you referring to here? Could you please explain which thread is being blocked. AFAICT this is already being done by shuffle-merge-finalizer threads.

Why can't you reduce the SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY default for the cluster? One of the reasons this configuration was introduced was because creating connection should have a lower timeout than connection idle time

Yes, DAGSchuedler will finalize each shuffle map stage in one shuffle-merge-finalizer thread, and lock clientPool.locks[clientIndex] when creating connect to the ESS merger node, the other shuffle-merge-finalizer threads (one stage per thread) will wait for SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY.
Although reducing SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY helps, the total wait time( SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY * lostMergerNodesSize * stageSize ) will still be long. This PR will run scheduleShuffleMergeFinalize() and send finalizeShuffleMerge RPCs in two threads, and stop all work after PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT regardless of sucess or failure.

Now we will only call removeShufflePushMergerLocation when shuffle fetch fails, this PR will also prevent these merger nodes from bing selected as mergeLocations when creating connections fails. Adding those bad merge nodes to finalizeBlackNodes, so subsequent shuffle map stages will not try to connect them.

"shuffle-merge-finalizer-4" #1842 daemon prio=5 os_prio=0 tid=0x00007f19440d8000 nid=0x2be822 in Object.wait() [0x00007f19ea7f7000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:460)
	at io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:679)
	- locked <0x00007f3eb8244598> (a io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise)
	at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:298)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:283)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
	- locked <0x00007f1d7b0c0ba8> (a java.lang.Object)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
	at org.apache.spark.network.shuffle.ExternalBlockStoreClient.finalizeShuffleMerge(ExternalBlockStoreClient.java:229)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$finalizeShuffleMerge$5(DAGScheduler.scala:2437)

"shuffle-merge-finalizer-3" #1647 daemon prio=5 os_prio=0 tid=0x00007f19440d2800 nid=0x2be52e waiting for monitor entry [0x00007f1688ff2000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:198)
	- waiting to lock <0x00007f1d7b0c0ba8> (a java.lang.Object)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
	at org.apache.spark.network.shuffle.ExternalBlockStoreClient.finalizeShuffleMerge(ExternalBlockStoreClient.java:229)
	at org.apache.spark.scheduler.DAGScheduler$$anon$7.$anonfun$run$2(DAGScheduler.scala:2419)
...

@wankunde
Copy link
Contributor Author

Hi, @otterc could you help me to review this PR? Or should I provide more information on this issue ?

@otterc
Copy link
Contributor

otterc commented Aug 23, 2022

So the issue is that the wait period timer doesn't take into account the time for connection creation which is a bug. However, in this PR you are adding another major change of excluding merger nodes based on this. I don't think we should combine that with the fix for this bug because if we decide to do that then we need to pay more careful consideration of how it interacts with exclusion of executor nodes, etc.

Comment on lines 2283 to 2320
shuffleMergeFinalizeScheduler.schedule(new Runnable {
override def run(): Unit = {
stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
case (shuffleServiceLoc, index)
if finalizeBlackNodes.getIfPresent(shuffleServiceLoc.host) == null =>
// Sends async request to shuffle service to finalize shuffle merge on that host
// TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is cancelled
// TODO: during shuffleMergeFinalizeWaitSec
shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
shuffleServiceLoc.port, shuffleId, shuffleMergeId,
new MergeFinalizerListener {
override def onShuffleMergeSuccess(statuses: MergeStatuses): Unit = {
assert(shuffleId == statuses.shuffleId)
eventProcessLoop.post(RegisterMergeStatuses(stage, MergeStatus.
convertMergeStatusesToMergeStatusArr(statuses, shuffleServiceLoc)))
results(index).set(true)
}

override def onShuffleMergeFailure(e: Throwable): Unit = {
logWarning(s"Exception encountered when trying to finalize shuffle " +
s"merge on ${shuffleServiceLoc.host} for shuffle $shuffleId", e)
// Do not fail the future as this would cause dag scheduler to prematurely
// give up on waiting for merge results from the remaining shuffle services
// if one fails
results(index).set(false)
}
})
}
override def onShuffleMergeFailure(e: Throwable): Unit = {
logWarning(s"Exception encountered when trying to finalize shuffle " +
s"merge on ${shuffleServiceLoc.host} for shuffle $shuffleId", e)
// Do not fail the future as this would cause dag scheduler to prematurely
// give up on waiting for merge results from the remaining shuffle services
// if one fails
if (e.isInstanceOf[IOException]) {
logInfo(s"Failed to connect external shuffle service on " +
s"${shuffleServiceLoc.host} and add it to blacklist")
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
finalizeBlackNodes.put(shuffleServiceLoc.host, shuffleServiceLoc.host)
}
results(index).set(false)
}
})

case (_, index) => results(index).set(true)
}
}
}, 0, TimeUnit.SECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when shuffleMergeFinalizeScheduler has just 1 thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If shuffleMergeFinalizeScheduler has just 1 thread, finalizeShuffleMerge method and send RPCs to merger locations will run one by one. so we won't get any merged results, and stages will behave as if push based shuffle was disabled.

A small UT code

test("test schedule executor with only one thread") {
    import java.util.concurrent.TimeoutException
    import com.google.common.util.concurrent.Futures
    import com.google.common.util.concurrent.SettableFuture
    import org.apache.spark.util.ThreadUtils

    logInfo("UT start")
    val pool =
      ThreadUtils.newDaemonThreadPoolScheduledExecutor("test_thread_pool", 1)

    val workerNum = 2
    pool.schedule(new Runnable() {
      override def run(): Unit = {
        logInfo("run finalizeShuffleMerge method")
        val results = (0 until workerNum).map(_ => SettableFuture.create[Boolean]())
        pool.schedule(new Runnable() {
          override def run(): Unit = {
            logInfo("run finalizeShuffleMerge method")
            (0 until workerNum).map(index => {
              logInfo(s"begin send finalize RPC to ESS $index")
              Thread.sleep(2000)
              logInfo(s"end send finalize RPC to ESS $index")
              try{
                results(index).set(true)
              } catch {
                case ex: Throwable =>
                  logError(s"Fail to set result status")
              }
            })
          }
        }, 0, TimeUnit.SECONDS)

        try {
          Futures.allAsList(results: _*).get(5, TimeUnit.SECONDS)
        } catch {
          case _: TimeoutException =>
            logError(s"Timed out exception from main thread")
        }
      }
    }, 0, TimeUnit.SECONDS)

    Thread.sleep(30000L)
    logInfo("UT end")
  }
22/08/24 15:52:39.584 ScalaTest-run-running-DAGSchedulerSuite INFO DAGSchedulerSuite: UT start
22/08/24 15:52:39.595 test_thread_pool-0 INFO DAGSchedulerSuite: run finalizeShuffleMerge method
22/08/24 15:52:44.614 test_thread_pool-0 ERROR DAGSchedulerSuite: Timed out exception from main thread
22/08/24 15:52:44.614 test_thread_pool-0 INFO DAGSchedulerSuite: run finalizeShuffleMerge method
22/08/24 15:52:44.615 test_thread_pool-0 INFO DAGSchedulerSuite: begin send finalize RPC to ESS 0
22/08/24 15:52:46.621 test_thread_pool-0 INFO DAGSchedulerSuite: end send finalize RPC to ESS 0
22/08/24 15:52:46.623 test_thread_pool-0 INFO DAGSchedulerSuite: begin send finalize RPC to ESS 1
22/08/24 15:52:48.625 test_thread_pool-0 INFO DAGSchedulerSuite: end send finalize RPC to ESS 1
22/08/24 15:53:09.597 ScalaTest-run-running-DAGSchedulerSuite INFO DAGSchedulerSuite: UT end

@wankunde
Copy link
Contributor Author

So the issue is that the wait period timer doesn't take into account the time for connection creation which is a bug. However, in this PR you are adding another major change of excluding merger nodes based on this. I don't think we should combine that with the fix for this bug because if we decide to do that then we need to pay more careful consideration of how it interacts with exclusion of executor nodes, etc.

Hi, @otterc thanks for your review. I have remove the logical for adding merger nodes into blacklist if an IOException was thrown. Now each stage needs to wait for min(SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT * lostMergerNodesSize, PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT), maybe we can optimize this later.

convertMergeStatusesToMergeStatusArr(statuses, shuffleServiceLoc)))
results(index).set(true)
}
shuffleMergeFinalizeScheduler.schedule(new Runnable {
Copy link
Contributor

@otterc otterc Aug 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering just 1 thread in the shuffleMergeFinalizerScheduler, that thread will submit another task (task to send RPCs) to pool here but then it will wait on line 2312 for the results. It will timeout because the other task is pending. This is a problem. I don't think we can use the same threadpool to run finalizeMethod and send the RPCs.

Copy link
Contributor

@mridulm mridulm Aug 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on pushing finalization send into the threadpool instead ?

        val scheduledFutures = stage.shuffleDep.getMergerLocs.zipWithIndex.map {
          case (shuffleServiceLoc, index) =>
            shuffleMergeFinalizeScheduler.schedule(new Runnable() {
              override def run(): Unit = {
                    // existing code within the "case"
               
              }
            }, 0, TimeUnit.SECONDS));
       }.toList

And if there is a TimeoutException, we cancel all the scheduledFutures (scheduledFutures.map(_.cancel(true)))

We should also bump up the default number of threads in this threadpool.

This will make sure that we wait utmost for shuffleMergeResultsTimeoutSec seconds for finalization to complete. In almost all cases, the shuffleClient.finalizeShuffleMerge will be really quick - and so the overhead is fairly low - but for the rare cases where it is not, we will only block that specific sending thread (while all other threads will send finalize message to other merger hosts), and we will always complete within the timeout (and release blocked threads)

Thoughts ?
+CC @venkata91

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @mridulm I don't think we should cancel all the scheduledFutures if there is a TimeoutException. Merger ESS need this finalize RPC to finish its merge process. So we should try our best to send all the RPCs even after shuffleMergeResultsTimeoutSec. Am I right ?

Copy link
Contributor

@mridulm mridulm Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When either all mergers report success or there is a timeout (whichever is earlier), we fire ShuffleMergeFinalized - at which point, the shuffle is marked finalized - and all subsequent merge statuses from ESS are ignored.

At this point, sending FinalizeShuffleMerge to mergers does not impact the application once ShuffleMergeFinalized is done.
So to answer your question - once there is a timeout, it does not help to send FinalizeShuffleMerge

The current code in master is assuming that sending FinalizeShuffleMerge is really cheap (which would typically be the case if there are no n/w issues connecting to mergers !) - and so does it as a prefix, before starting to wait for mergers to complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a long-running spark application, if no finalize partition RPC is received, ess will not close open data files, meta files, and index files until the application is complete. Too many opening files can be a potential risk.

Or should we add a parameter to control whether to continue sending finalize RPC to the remaining nodes ?

Copy link
Contributor

@mridulm mridulm Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, and that is why driver makes a best case effort to send the message.
If we are unable to do so in reasonable time due to network issues (n/w partition, etc), failures, and so on - there could be files which are not yet closed.
On other hand, keeping this pending state around in driver for extended periods of time will cause its own issues.

Note that in general, there will always be some dropped messages (for ex when failure handler is getting invoked) - so some variant of what you described can occur even if we make this specific case robust - what we have to evaluate is how much of an impact it has on shuffle service itself in context of whether it will negatively impact NM stability.

In our environment, this has been in prod for a while now - and we have not seen this specific issue. Having said that, for a different application and network characteristics, it could possibly be an issue : more information will help understand it.

One option could be to evaluate if we move failed sends and cancelled tasks to a different threadpool and retry a "few" times to send the message and mitigate the issue (thoughts on adding a cancel-merge instead of finalize-merge for that shuffleid-shuffleattemptid ?). This will have no performance/functional impact on driver/application, but can mitigate ESS load in terms of open files.
Thoughts @wankunde, @otterc ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea, could we stop sending finalize RPCs after shuffleMergeResultsTimeoutSec. And start a thread in ESS that periodically checks and closes the opened files which the shuffle has already finalized?
Do that in another PR?
What do you think? @mridulm @otterc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ESS does not connect back to the executors or driver - connections/requests are initiated by executors and driver to ESS.

@wankunde
Copy link
Contributor Author

Hi, @otterc @mridulm , I updated the code, could you help to review the new code?

@wankunde
Copy link
Contributor Author

Hi, @mridulm @otterc
New processing flow:

  • Send all the finalize RPC tasks and wait for merged status in a new thread pool.
  • Wait for shuffleMergeResultsTimeoutSec in main thread if registerMergeResults = true, and then cancel all sending rpc tasks
  • Wait for shuffleMergeResultsTimeoutSec asynchronously if registerMergeResults = false, and then cancel all sending rpc tasks

Does this works?

// if finalize RPC is not received due to network issues.
private val shuffleSendFinalizeRPCExecutor: ExecutorService =
ThreadUtils.newDaemonFixedThreadPool(
shuffleSendFinalizeRPCThreads, "send-shuffle-merge-finalize-rpc")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix indentation

@wankunde wankunde requested review from mridulm and removed request for mridulm September 2, 2022 10:02
@wankunde wankunde requested review from mridulm and otterc and removed request for otterc September 2, 2022 10:16
Comment on lines 2315 to 2319
ConfigBuilder("spark.shuffle.push.sendFinalizeRPCThreads")
.doc("Number of threads used by the driver to send finalize shuffle RPC to mergers" +
" location and then get MergeStatus. The thread won't stop" +
" PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. The merger ESS may open too many files" +
" if the finalize rpc is not received.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ConfigBuilder("spark.shuffle.push.sendFinalizeRPCThreads")
.doc("Number of threads used by the driver to send finalize shuffle RPC to mergers" +
" location and then get MergeStatus. The thread won't stop" +
" PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. The merger ESS may open too many files" +
" if the finalize rpc is not received.")
ConfigBuilder("spark.shuffle.push.sendFinalizeRPCThreads")
.doc("Number of threads used by the driver to send finalize shuffle RPC to mergers." +
" External shuffle servers initiate merge finalization on receiving this request.")

Also, mark it as internal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not addressed @wankunde

@wankunde
Copy link
Contributor Author

wankunde commented Sep 9, 2022

I'm sorry for the late reply, I have updated the code.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good to me.

}

test("SPARK-40096: Send finalize events even if shuffle merger blocks indefinitely " +
"with registerMergeResults is false") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge this test with the previous one ?
Essentially, something like:


Seq(true, false) { registerMergeResults => 
  test("SPARK-40096: Send finalize events even if shuffle merger blocks indefinitely " +
      s"with registerMergeResults is $registerMergeResults") {
   ....

   myScheduler.finalizeShuffleMerge(shuffleStage, registerMergeResults)

  ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mridulm I have merged UTs

@mridulm
Copy link
Contributor

mridulm commented Sep 16, 2022

+CC @otterc, @Ngone51 PTAL

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, thanks for working on this @wankunde !
Will leave the PR open in case @otterc or @Ngone51 want to review - before I merge it tomorrow.

Also, will wait for tests to pass :-)

Copy link
Contributor

@otterc otterc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me. Just had a nit.

Comment on lines 2318 to 2319
" location and then get MergeStatus. The thread won't stop" +
" PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. The merger ESS may open too many files" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could you please rephrase this? What do you mean to say by The thread won't stop PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, update the doc to The thread will run for up to PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT

@mridulm mridulm closed this in 500f309 Sep 23, 2022
@mridulm
Copy link
Contributor

mridulm commented Sep 23, 2022

Merged to master.
Thanks for working on this @wankunde !
Thanks for the review @otterc :-)

srowen pushed a commit that referenced this pull request Sep 25, 2022
…d length

### What changes were proposed in this pull request?

This PR is a followup of #37533 that works around the test failure by explicitly checking the element and length in the test.

### Why are the changes needed?

For an unknown reason the test added in is flaky even though both `ArrayBuffer` and `List` are `Sep` and the test should pass up to my best knowledge. See https://github.com/apache/spark/actions/runs/3109851954/jobs/5040465291

```
[info] - SPARK-40096: Send finalize events even if shuffle merger blocks indefinitely with registerMergeResults is false *** FAILED *** (90 milliseconds)
[info]   ArrayBuffer("hostB") did not equal List("hostB") (DAGSchedulerSuite.scala:4498)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.scheduler.DAGSchedulerSuite.$anonfun$new$286(DAGSchedulerSuite.scala:4498)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:207)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:66)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:66)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
```

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

CI in this PR should verify that.

Closes #37989 from HyukjinKwon/SPARK-40096-followup.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
}

Seq(true, false).foreach { registerMergeResults =>
test("SPARK-40096: Send finalize events even if shuffle merger blocks indefinitely " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is flaky. It fails periodically in different PRs. For example,
https://github.com/MaxGekk/spark/actions/runs/3420903796/jobs/5696336947

asfgit pushed a commit that referenced this pull request Nov 8, 2022
### What changes were proposed in this pull request?

This PR is a followup of #37533 that fix the flaky test case.

### Why are the changes needed?

The test case is flaky, and will failure due to some unexpected error.

#37989
https://github.com/apache/spark/actions/runs/3145115911/jobs/5112006948
https://github.com/apache/spark/actions/runs/3146198025/jobs/5114387367

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

CI in this PR should verify that.

Closes #38091 from wankunde/SPARK-40096-2.

Authored-by: Kun Wan <[email protected]>
Signed-off-by: Mridul <mridul<at>gmail.com>
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
### What changes were proposed in this pull request?

This PR is a followup of apache#37533 that fix the flaky test case.

### Why are the changes needed?

The test case is flaky, and will failure due to some unexpected error.

apache#37989
https://github.com/apache/spark/actions/runs/3145115911/jobs/5112006948
https://github.com/apache/spark/actions/runs/3146198025/jobs/5114387367

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

CI in this PR should verify that.

Closes apache#38091 from wankunde/SPARK-40096-2.

Authored-by: Kun Wan <[email protected]>
Signed-off-by: Mridul <mridul<at>gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants