Skip to content

Conversation

@zsxwing
Copy link
Member

@zsxwing zsxwing commented Apr 4, 2019

What changes were proposed in this pull request?

This PR updates AppStatusListener to flush LiveEntity if necessary when receiving SparkListenerExecutorMetricsUpdate. This will ensure the staleness of Spark UI doesn't last more than the executor heartbeat interval.

How was this patch tested?

The new unit test.

@zsxwing zsxwing changed the title [SPARK-27394]Flush LiveEntity if necessary when receiving SparkListenerExecutorMetricsUpdate [SPARK-27394][WebUI]Flush LiveEntity if necessary when receiving SparkListenerExecutorMetricsUpdate Apr 4, 2019
@gatorsmile
Copy link
Member

// here to ensure the staleness of Spark UI doesn't last more that the executor heartbeat
// interval.
if (now - lastFlushTimeNs > liveUpdatePeriodNs) {
flush(maybeUpdate(_, now))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... in the bug you mention that job-level data is not being updated. Is that the only case? Because if that's it, then this looks like overkill. You could e.g. update the jobs in the code that handles event.accumUpdates above, or even just flush jobs specifically, instead of everything.

Doing a full flush here seems like overkill and a little expensive when you think about many heartbeats arriving in a short period (even when considering lastFlushTimeNs).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... in the bug you mention that job-level data is not being updated. Is that the only case?

I also noticed that executor active tasks sometimes could be wrong. That's why I decided to flush everything to make sure we don't miss any places. It's also hard to maintain if we need to manually flush in every place.

Ideally, we should flush periodically so that it doesn't depend on receiving a Spark event. But then I will need to add a new event type and post to the listener bus. That's overkilled.

when you think about many heartbeats arriving in a short period

At least there will be at least 100ms between each flush. As long as we process heart beats very fast, most of them won't trigger the flush.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the goal is to use the hearbeats as some trigger for flushing, how about using some ratio of the heartbeat period instead of liveUpdatePeriodNs to control whether to flush everything?

Really large apps can get a little backed up when processing hearbeats from lots and lots of busy executors, and this would make it a little worse.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update only happens in live UI, which should be fine in general. For real large apps, will it help by setting LIVE_ENTITY_UPDATE_PERIOD to a larger value? Setting a ratio of heartbeat period seems a bit complex.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only happens in live UI

The "don't write to the store all the time" thing was added specifically to speed up live UIs, because copying + writing the data (even to the memory store) becomes really expensive when you have event storms (think thousands of tasks starting and stopping in a very short period).

setting LIVE_ENTITY_UPDATE_PERIOD to a larger value

We should avoid requiring configuration tweaks for things not to break, when possible.

@SparkQA
Copy link

SparkQA commented Apr 5, 2019

Test build #104308 has finished for PR 24303 at commit ee53708.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 5, 2019

Test build #104309 has finished for PR 24303 at commit 1f927a2.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}
// Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush
// here to ensure the staleness of Spark UI doesn't last more that the executor heartbeat
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: more than?

// Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush
// here to ensure the staleness of Spark UI doesn't last more that the executor heartbeat
// interval.
if (now - lastFlushTimeNs > liveUpdatePeriodNs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also worried about the case when flush() takes a few milliseconds to finish, and you end up with always dealing with updating all live entities for each ExecutorMetricsUpdate event.
Is it possible to introduce a new config that specifies the live update period for ExecutorMetricsUpdate only? The default value can be the same as liveUpdatePeriodNs, while user can change it to a bigger value when the flush() function become a issue in processing the event.

@zsxwing
Copy link
Member Author

zsxwing commented Apr 5, 2019

@vanzin I added a new separate config for this. It's weird to use a ratio of the heartbeat period since using heartbeat is the implementation detail and we may use a different approach in future.

@SparkQA
Copy link

SparkQA commented Apr 5, 2019

Test build #104328 has finished for PR 24303 at commit 289e996.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running this test the thread audit detects two possible thread leaks (Executor task launch worker for task 0, Executor task launch worker for task 1) and it makes me wonder whether they are just red herrings and killed latter on after the test suite is stopped (in a separate thread like by TaskReaper) or should we take care of them:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.ui.UISeleniumSuite, thread names: Keep-Alive-Timer, Executor task launch worker for task 0, Executor task launch worker for task 1 =====

val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ =>
// Make the task never finish so there won't be any task start/end events after the first 2
// tasks start.
Thread.sleep(300000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: what about a less than 5 minutes sleep here something comparable with the eventually, like:

        Thread.sleep(20.seconds.toMillis)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I turned on SPARK_JOB_INTERRUPT_ON_CANCEL, so it's not needed to change the sleep time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked and the thread leaks are gone.

@zsxwing
Copy link
Member Author

zsxwing commented Apr 5, 2019

Running this test the thread audit detects two possible thread leaks (Executor task launch worker for task 0, Executor task launch worker for task 1) and it makes me wonder whether they are just red herrings and killed latter on after the test suite is stopped (in a separate thread like by TaskReaper) or should we take care of them:

Good catch. Forgot to set a flag...

@SparkQA
Copy link

SparkQA commented Apr 5, 2019

Test build #104324 has finished for PR 24303 at commit 5a04be9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments.

val LIVE_ENTITY_UPDATE_STALENESS_LIMIT = ConfigBuilder("spark.ui.liveUpdate.stalenessLimit")
.internal()
.doc(
"""A time limit before we force to flush all live entities. When the last flush doesn't past
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar: "doesn't past this limit"?

I think this would be easier to explain if you named the config "minFlushPeriod" or something. e.g. "Minimum time elapsed before stale UI data is flushed."

private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L

/**
* A time limit before we force to flush all live entities. When the last flush doesn't past
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same grammar issue. Can't really parse what you wrote.

.createWithDefaultString("100ms")

val LIVE_ENTITY_UPDATE_STALENESS_LIMIT = ConfigBuilder("spark.ui.liveUpdate.stalenessLimit")
.internal()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why internal? Spark doesn't set it itself. If anyone is going to change it, it will be users.

@SparkQA
Copy link

SparkQA commented Apr 6, 2019

Test build #104329 has finished for PR 24303 at commit 2645f35.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 6, 2019

Test build #104332 has finished for PR 24303 at commit 39ea357.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Apr 8, 2019

Looks good pending tests.

@SparkQA
Copy link

SparkQA commented Apr 9, 2019

Test build #104408 has finished for PR 24303 at commit 1c53071.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

LGTM

@attilapiros
Copy link
Contributor

There is typo in the description: "last more that" => ""last more than", otherwise LGTM.

@vanzin
Copy link
Contributor

vanzin commented Apr 9, 2019

Merging to master / 2.4.

@vanzin vanzin closed this in 5ff39cd Apr 9, 2019
@vanzin
Copy link
Contributor

vanzin commented Apr 9, 2019

Doesn't merge cleanly to 2.4, so gave up. Open a new PR blah blah blah...

zsxwing added a commit to zsxwing/spark that referenced this pull request Apr 9, 2019
…rkListenerExecutorMetricsUpdate

This PR updates `AppStatusListener` to flush `LiveEntity` if necessary when receiving `SparkListenerExecutorMetricsUpdate`. This will ensure the staleness of Spark UI doesn't last more than the executor heartbeat interval.

The new unit test.

Closes apache#24303 from zsxwing/SPARK-27394.

Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
asfgit pushed a commit that referenced this pull request Apr 10, 2019
…rkListenerExecutorMetricsUpdate (backport 2.4)

## What changes were proposed in this pull request?

This PR backports #24303 to 2.4.

## How was this patch tested?

Jenkins

Closes #24328 from zsxwing/SPARK-27394-2.4.

Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Shixiong Zhu <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
…rkListenerExecutorMetricsUpdate (backport 2.4)

## What changes were proposed in this pull request?

This PR backports apache#24303 to 2.4.

## How was this patch tested?

Jenkins

Closes apache#24328 from zsxwing/SPARK-27394-2.4.

Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Shixiong Zhu <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 25, 2019
…rkListenerExecutorMetricsUpdate (backport 2.4)

## What changes were proposed in this pull request?

This PR backports apache#24303 to 2.4.

## How was this patch tested?

Jenkins

Closes apache#24328 from zsxwing/SPARK-27394-2.4.

Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Shixiong Zhu <[email protected]>
@zsxwing zsxwing deleted the SPARK-27394 branch July 26, 2019 07:44
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
…rkListenerExecutorMetricsUpdate (backport 2.4)

## What changes were proposed in this pull request?

This PR backports apache#24303 to 2.4.

## How was this patch tested?

Jenkins

Closes apache#24328 from zsxwing/SPARK-27394-2.4.

Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Shixiong Zhu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants