-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-16333][Core] Enable EventLoggingListener to log less #16714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ok to test |
|
Test build #72043 has finished for PR 16714 at commit
|
|
Test build #72048 has finished for PR 16714 at commit
|
|
Not sure why the second test build failed at PySpark unit tests. I only changed the comments. |
|
Test build #72058 has finished for PR 16714 at commit
|
|
@vanzin can you check this out please? |
|
Can one of the admins verify this patch? |
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments. Mostly want to know why we'd even bother with recording this information.
| private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) | ||
| private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 | ||
| // To reduce the size of event logs, we can omit logging all of internal accumulables for metrics. | ||
| private val omitInternalAccumulables = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this information useful at all in event logs? If there's nothing that uses it, then not writing it is probably better than having yet another option that has to be documented and that people have to explicitly enable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this information is used to reconstruct job UI. I am not sure how this information got included in event logs, but I think some people might be using it to get internal metrics for a stage from the history server using its REST API. For example, CPU time metrics is not included in stage metrics you can get by querying history server endpoint /applications/[app-id]/stages/[stage-id].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I see CPU time in both stage-level data and task-level data in the REST API...
Do you mind checking the code for when this was introduced and whether it was a conscious decision (as in, it covered some user case we're not seeing)?
If possible it's always better to avoid adding more config options, especially in this kind of situation. For example, if this data is needed for something, the config would be disabling that functionality, and it would be better to instead figure out how to save it in a way that does not waste so much space. And there's always compression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin I added CPU time because back then I was pulling stage metrics from history server and needed CPU time. Here's the PR for the change: #10212 . Looking at the code, CPU time should be there, so I think there's something on my end. That's a separate problem though, and I don't think CPU time metric should increase size of event logs much. I can't think of a use case for internal accumulables then, so I think it makes sense to delete this. If anyone wants to use internal accumulables for stage metrics, they should be able to catch it after a stage finishes, not from History server.
| sparkConf.getBoolean("spark.eventLog.omitInternalAccumulables", false) | ||
| // To reduce the size of event logs, we can omit logging "Updated Block Statuses" metric. | ||
| private val omitUpdatedBlockStatuses = | ||
| sparkConf.getBoolean("spark.eventLog.omitUpdatedBlockStatuses", false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here.
A good way to measure "is it needed?" is to check whether the UI needs the information. If it doesn't, then it's probably something we can live without.
If the UI uses it, the option should be documented and more properly reflect what the user is giving up by disabling it (e.g. "spark.eventLog.simplifiedStorageInfo" or something).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if updated block statuses are used for the UI. At first, I was wondering if the information was used to reconstruct Storage page but I checked the usage of TaskMetrics.updatedBlockStatues and it doesn't seem to be used anywhere except for when the task metrics is converted to JSON object. Actually, I am not sure if Storage tab is working at all unless I am missing something. I don't think /applications/[app-id]/storage/rdd returns any meaningful information.
|
Would some of the other recent contributors to this area (e.g. @zsxwing or @JoshRosen) be able to comment on any use for these internal accumulables / block status updates, and whether they can be removed from the event log? I couldn't see anything that goes wrong, and my file went from 22 GB to 91 MB so it makes a big difference. |
| def stageCompletedToJson( | ||
| stageCompleted: SparkListenerStageCompleted, | ||
| omitInternalAccums: Boolean = false): JValue = { | ||
| val stageInfo = stageInfoToJson(stageCompleted.stageInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were you intending to pass omitInternalAccums into this stageInfoToJson call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thank you for catching it. I think it got omitted while I was merging stuff. Will fix this.
| event: SparkListenerEvent, | ||
| omitInternalAccums: Boolean = false, | ||
| omitUpdatedBlockStatuses: Boolean = false): JValue = { | ||
| event match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stageSubmitted
stageCompleted
jobStart
jobEnd
You didn't seem to use omitInternalAccums/omitUpdatedBlockStatuses in these cases, although you had changed the underlying methods to support these flags. Intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stageSubmitted/stageCompleted/jobStart should use omitInternalAccums, but not jobEnd. jobEnd's interface hasn't changed. omitUpdatedBlockStatues is intended to be only used for taskEnd because that's when updated block statuses are reported. Thanks for catching, I will add omitInternalAccums to stageSubmitted and jobStart.
|
Is this still an issue or did #17412 fix this? |
|
I would still like not to have all internal accumulators in the event logs (not just |
|
After having played with some of this code for other reasons, at least some of the internal accumulators are needed to rebuild the SQL UI. As far as logging updated block statuses, I still don't like the idea of an option. It's either needed or it's not needed. |
|
Didn't #17412 already get rid of block statuses though? |
|
I think so. Just replying to the question. |
| ("Block ID" -> id.toString) ~ | ||
| ("Status" -> blockStatusToJson(status)) | ||
| }) | ||
| if (omitUpdatedBlockStatuses) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin @ajbozarth #17412 gets rid of updated block statuses from the accumulable but not from task metrics. If you think it's ok to not to have an option to get rid of updated block statuses, then I can just get rid of updated block statuses here.
|
@vanzin @ajbozarth if you guys think having an option to skip logging internal accumulators (in my case I don't use the SQL UI) and completely getting rid of updated block statues are not needed, I can close this PR. |
|
Unless there's still an issue with file size I think I'm good without this, but I'll defer to @vanzin |
|
Options put the burden on the user to figure things out (do I need to set this or not?). If you want to investigate whether you can trim more data (e.g. internal metrics that just mirror stuff already in TaskMetrics) then there would be value. But adding an option doesn't really help users, since aside from you, probably nobody will ever even look at those. |
|
Ok, not including the updated blocks in task metrics reduced the size of our event logs. But I am closing this PR as the current implementation doesn't seem to be in the right way. Thanks for the inputs. |
What changes were proposed in this pull request?
Starting from Spark 2.0, task metrics are in the form of an accumulator. This is good but also causes excessive event logs because the metrics are logged twice (one under "Accumulators" and one under "Task Metrics"). For applications with lots of tasks, the size of event logs could be tens of GB and it is not feasible for Spark History Server to parse the logs and reconstruct the job UI.
This PR adds an option for EventLoggingListener not to log internal accumulators that are for task metrics. It also adds an option not to log "Update Block Statuses" metric that is quite verbose and might not be needed on some occasions.
After updating to Spark 2.0, a size of the event log of some application with over 50k tasks jumped from ~ 1GB to over 40 GB. With this patch, the size of event logs became similar to the previous sizes with Spark 1.5.2.
How was this patch tested?
Unit tests. Also run in production.