Skip to content

Conversation

@jisookim0513
Copy link
Contributor

@jisookim0513 jisookim0513 commented Jan 26, 2017

What changes were proposed in this pull request?

Starting from Spark 2.0, task metrics are in the form of an accumulator. This is good but also causes excessive event logs because the metrics are logged twice (one under "Accumulators" and one under "Task Metrics"). For applications with lots of tasks, the size of event logs could be tens of GB and it is not feasible for Spark History Server to parse the logs and reconstruct the job UI.

This PR adds an option for EventLoggingListener not to log internal accumulators that are for task metrics. It also adds an option not to log "Update Block Statuses" metric that is quite verbose and might not be needed on some occasions.

After updating to Spark 2.0, a size of the event log of some application with over 50k tasks jumped from ~ 1GB to over 40 GB. With this patch, the size of event logs became similar to the previous sizes with Spark 1.5.2.

How was this patch tested?

Unit tests. Also run in production.

@vanzin
Copy link
Contributor

vanzin commented Jan 26, 2017

ok to test

@SparkQA
Copy link

SparkQA commented Jan 26, 2017

Test build #72043 has finished for PR 16714 at commit b0bebcc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 27, 2017

Test build #72048 has finished for PR 16714 at commit 5d6cf56.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jisookim0513
Copy link
Contributor Author

Not sure why the second test build failed at PySpark unit tests. I only changed the comments.

@SparkQA
Copy link

SparkQA commented Jan 27, 2017

Test build #72058 has finished for PR 16714 at commit f146121.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@drcrallen
Copy link
Contributor

@vanzin can you check this out please?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments. Mostly want to know why we'd even bother with recording this information.

private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
// To reduce the size of event logs, we can omit logging all of internal accumulables for metrics.
private val omitInternalAccumulables =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this information useful at all in event logs? If there's nothing that uses it, then not writing it is probably better than having yet another option that has to be documented and that people have to explicitly enable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this information is used to reconstruct job UI. I am not sure how this information got included in event logs, but I think some people might be using it to get internal metrics for a stage from the history server using its REST API. For example, CPU time metrics is not included in stage metrics you can get by querying history server endpoint /applications/[app-id]/stages/[stage-id].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I see CPU time in both stage-level data and task-level data in the REST API...

Do you mind checking the code for when this was introduced and whether it was a conscious decision (as in, it covered some user case we're not seeing)?

If possible it's always better to avoid adding more config options, especially in this kind of situation. For example, if this data is needed for something, the config would be disabling that functionality, and it would be better to instead figure out how to save it in a way that does not waste so much space. And there's always compression.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin I added CPU time because back then I was pulling stage metrics from history server and needed CPU time. Here's the PR for the change: #10212 . Looking at the code, CPU time should be there, so I think there's something on my end. That's a separate problem though, and I don't think CPU time metric should increase size of event logs much. I can't think of a use case for internal accumulables then, so I think it makes sense to delete this. If anyone wants to use internal accumulables for stage metrics, they should be able to catch it after a stage finishes, not from History server.

sparkConf.getBoolean("spark.eventLog.omitInternalAccumulables", false)
// To reduce the size of event logs, we can omit logging "Updated Block Statuses" metric.
private val omitUpdatedBlockStatuses =
sparkConf.getBoolean("spark.eventLog.omitUpdatedBlockStatuses", false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here.

A good way to measure "is it needed?" is to check whether the UI needs the information. If it doesn't, then it's probably something we can live without.

If the UI uses it, the option should be documented and more properly reflect what the user is giving up by disabling it (e.g. "spark.eventLog.simplifiedStorageInfo" or something).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if updated block statuses are used for the UI. At first, I was wondering if the information was used to reconstruct Storage page but I checked the usage of TaskMetrics.updatedBlockStatues and it doesn't seem to be used anywhere except for when the task metrics is converted to JSON object. Actually, I am not sure if Storage tab is working at all unless I am missing something. I don't think /applications/[app-id]/storage/rdd returns any meaningful information.

@jasonmoore2k
Copy link
Contributor

Would some of the other recent contributors to this area (e.g. @zsxwing or @JoshRosen) be able to comment on any use for these internal accumulables / block status updates, and whether they can be removed from the event log? I couldn't see anything that goes wrong, and my file went from 22 GB to 91 MB so it makes a big difference.

def stageCompletedToJson(
stageCompleted: SparkListenerStageCompleted,
omitInternalAccums: Boolean = false): JValue = {
val stageInfo = stageInfoToJson(stageCompleted.stageInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were you intending to pass omitInternalAccums into this stageInfoToJson call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thank you for catching it. I think it got omitted while I was merging stuff. Will fix this.

event: SparkListenerEvent,
omitInternalAccums: Boolean = false,
omitUpdatedBlockStatuses: Boolean = false): JValue = {
event match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stageSubmitted
stageCompleted
jobStart
jobEnd

You didn't seem to use omitInternalAccums/omitUpdatedBlockStatuses in these cases, although you had changed the underlying methods to support these flags. Intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stageSubmitted/stageCompleted/jobStart should use omitInternalAccums, but not jobEnd. jobEnd's interface hasn't changed. omitUpdatedBlockStatues is intended to be only used for taskEnd because that's when updated block statuses are reported. Thanks for catching, I will add omitInternalAccums to stageSubmitted and jobStart.

@ajbozarth
Copy link
Member

Is this still an issue or did #17412 fix this?

@jisookim0513
Copy link
Contributor Author

jisookim0513 commented Apr 27, 2017

I would still like not to have all internal accumulators in the event logs (not just updatedBlockStatuses), as well as updated block statuses metric. @vanzin would you be ok with eliminating all internal accumulators and have an option to skip logging updated block statues metric?

@vanzin
Copy link
Contributor

vanzin commented Apr 27, 2017

After having played with some of this code for other reasons, at least some of the internal accumulators are needed to rebuild the SQL UI.

As far as logging updated block statuses, I still don't like the idea of an option. It's either needed or it's not needed.

@ajbozarth
Copy link
Member

Didn't #17412 already get rid of block statuses though?

@vanzin
Copy link
Contributor

vanzin commented Apr 27, 2017

I think so. Just replying to the question.

("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
})
if (omitUpdatedBlockStatuses) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin @ajbozarth #17412 gets rid of updated block statuses from the accumulable but not from task metrics. If you think it's ok to not to have an option to get rid of updated block statuses, then I can just get rid of updated block statuses here.

@jisookim0513
Copy link
Contributor Author

@vanzin @ajbozarth if you guys think having an option to skip logging internal accumulators (in my case I don't use the SQL UI) and completely getting rid of updated block statues are not needed, I can close this PR.

@ajbozarth
Copy link
Member

Unless there's still an issue with file size I think I'm good without this, but I'll defer to @vanzin

@vanzin
Copy link
Contributor

vanzin commented Apr 27, 2017

Options put the burden on the user to figure things out (do I need to set this or not?). If you want to investigate whether you can trim more data (e.g. internal metrics that just mirror stuff already in TaskMetrics) then there would be value. But adding an option doesn't really help users, since aside from you, probably nobody will ever even look at those.

@jisookim0513
Copy link
Contributor Author

Ok, not including the updated blocks in task metrics reduced the size of our event logs. But I am closing this PR as the current implementation doesn't seem to be in the right way. Thanks for the inputs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants