-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27394][WebUI]Flush LiveEntity if necessary when receiving SparkListenerExecutorMetricsUpdate #24303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| // here to ensure the staleness of Spark UI doesn't last more that the executor heartbeat | ||
| // interval. | ||
| if (now - lastFlushTimeNs > liveUpdatePeriodNs) { | ||
| flush(maybeUpdate(_, now)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... in the bug you mention that job-level data is not being updated. Is that the only case? Because if that's it, then this looks like overkill. You could e.g. update the jobs in the code that handles event.accumUpdates above, or even just flush jobs specifically, instead of everything.
Doing a full flush here seems like overkill and a little expensive when you think about many heartbeats arriving in a short period (even when considering lastFlushTimeNs).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... in the bug you mention that job-level data is not being updated. Is that the only case?
I also noticed that executor active tasks sometimes could be wrong. That's why I decided to flush everything to make sure we don't miss any places. It's also hard to maintain if we need to manually flush in every place.
Ideally, we should flush periodically so that it doesn't depend on receiving a Spark event. But then I will need to add a new event type and post to the listener bus. That's overkilled.
when you think about many heartbeats arriving in a short period
At least there will be at least 100ms between each flush. As long as we process heart beats very fast, most of them won't trigger the flush.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the goal is to use the hearbeats as some trigger for flushing, how about using some ratio of the heartbeat period instead of liveUpdatePeriodNs to control whether to flush everything?
Really large apps can get a little backed up when processing hearbeats from lots and lots of busy executors, and this would make it a little worse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The update only happens in live UI, which should be fine in general. For real large apps, will it help by setting LIVE_ENTITY_UPDATE_PERIOD to a larger value? Setting a ratio of heartbeat period seems a bit complex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only happens in live UI
The "don't write to the store all the time" thing was added specifically to speed up live UIs, because copying + writing the data (even to the memory store) becomes really expensive when you have event storms (think thousands of tasks starting and stopping in a very short period).
setting LIVE_ENTITY_UPDATE_PERIOD to a larger value
We should avoid requiring configuration tweaks for things not to break, when possible.
|
Test build #104308 has finished for PR 24303 at commit
|
|
Test build #104309 has finished for PR 24303 at commit
|
| } | ||
| } | ||
| // Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush | ||
| // here to ensure the staleness of Spark UI doesn't last more that the executor heartbeat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: more than?
| // Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush | ||
| // here to ensure the staleness of Spark UI doesn't last more that the executor heartbeat | ||
| // interval. | ||
| if (now - lastFlushTimeNs > liveUpdatePeriodNs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also worried about the case when flush() takes a few milliseconds to finish, and you end up with always dealing with updating all live entities for each ExecutorMetricsUpdate event.
Is it possible to introduce a new config that specifies the live update period for ExecutorMetricsUpdate only? The default value can be the same as liveUpdatePeriodNs, while user can change it to a bigger value when the flush() function become a issue in processing the event.
|
@vanzin I added a new separate config for this. It's weird to use a ratio of the heartbeat period since using heartbeat is the implementation detail and we may use a different approach in future. |
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Outdated
Show resolved
Hide resolved
|
Test build #104328 has finished for PR 24303 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Running this test the thread audit detects two possible thread leaks (Executor task launch worker for task 0, Executor task launch worker for task 1) and it makes me wonder whether they are just red herrings and killed latter on after the test suite is stopped (in a separate thread like by TaskReaper) or should we take care of them:
===== POSSIBLE THREAD LEAK IN SUITE o.a.s.ui.UISeleniumSuite, thread names: Keep-Alive-Timer, Executor task launch worker for task 0, Executor task launch worker for task 1 =====
| val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ => | ||
| // Make the task never finish so there won't be any task start/end events after the first 2 | ||
| // tasks start. | ||
| Thread.sleep(300000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: what about a less than 5 minutes sleep here something comparable with the eventually, like:
Thread.sleep(20.seconds.toMillis)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I turned on SPARK_JOB_INTERRUPT_ON_CANCEL, so it's not needed to change the sleep time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have checked and the thread leaks are gone.
Good catch. Forgot to set a flag... |
|
Test build #104324 has finished for PR 24303 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments.
| val LIVE_ENTITY_UPDATE_STALENESS_LIMIT = ConfigBuilder("spark.ui.liveUpdate.stalenessLimit") | ||
| .internal() | ||
| .doc( | ||
| """A time limit before we force to flush all live entities. When the last flush doesn't past |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grammar: "doesn't past this limit"?
I think this would be easier to explain if you named the config "minFlushPeriod" or something. e.g. "Minimum time elapsed before stale UI data is flushed."
| private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L | ||
|
|
||
| /** | ||
| * A time limit before we force to flush all live entities. When the last flush doesn't past |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same grammar issue. Can't really parse what you wrote.
| .createWithDefaultString("100ms") | ||
|
|
||
| val LIVE_ENTITY_UPDATE_STALENESS_LIMIT = ConfigBuilder("spark.ui.liveUpdate.stalenessLimit") | ||
| .internal() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why internal? Spark doesn't set it itself. If anyone is going to change it, it will be users.
|
Test build #104329 has finished for PR 24303 at commit
|
|
Test build #104332 has finished for PR 24303 at commit
|
|
Looks good pending tests. |
|
Test build #104408 has finished for PR 24303 at commit
|
|
LGTM |
|
There is typo in the description: "last more that" => ""last more than", otherwise LGTM. |
|
Merging to master / 2.4. |
|
Doesn't merge cleanly to 2.4, so gave up. Open a new PR blah blah blah... |
…rkListenerExecutorMetricsUpdate This PR updates `AppStatusListener` to flush `LiveEntity` if necessary when receiving `SparkListenerExecutorMetricsUpdate`. This will ensure the staleness of Spark UI doesn't last more than the executor heartbeat interval. The new unit test. Closes apache#24303 from zsxwing/SPARK-27394. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
…rkListenerExecutorMetricsUpdate (backport 2.4) ## What changes were proposed in this pull request? This PR backports #24303 to 2.4. ## How was this patch tested? Jenkins Closes #24328 from zsxwing/SPARK-27394-2.4. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
…rkListenerExecutorMetricsUpdate (backport 2.4) ## What changes were proposed in this pull request? This PR backports apache#24303 to 2.4. ## How was this patch tested? Jenkins Closes apache#24328 from zsxwing/SPARK-27394-2.4. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
…rkListenerExecutorMetricsUpdate (backport 2.4) ## What changes were proposed in this pull request? This PR backports apache#24303 to 2.4. ## How was this patch tested? Jenkins Closes apache#24328 from zsxwing/SPARK-27394-2.4. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
…rkListenerExecutorMetricsUpdate (backport 2.4) ## What changes were proposed in this pull request? This PR backports apache#24303 to 2.4. ## How was this patch tested? Jenkins Closes apache#24328 from zsxwing/SPARK-27394-2.4. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
What changes were proposed in this pull request?
This PR updates
AppStatusListenerto flushLiveEntityif necessary when receivingSparkListenerExecutorMetricsUpdate. This will ensure the staleness of Spark UI doesn't last more than the executor heartbeat interval.How was this patch tested?
The new unit test.