-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-16333][Core] Enable EventLoggingListener to log less #16714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
30cb5d0
e51667c
b0bebcc
5d6cf56
f146121
ed67116
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,6 +64,12 @@ private[spark] class EventLoggingListener( | |
| private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) | ||
| private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) | ||
| private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 | ||
| // To reduce the size of event logs, we can omit logging all of internal accumulables for metrics. | ||
| private val omitInternalAccumulables = | ||
| sparkConf.getBoolean("spark.eventLog.omitInternalAccumulables", false) | ||
| // To reduce the size of event logs, we can omit logging "Updated Block Statuses" metric. | ||
| private val omitUpdatedBlockStatuses = | ||
| sparkConf.getBoolean("spark.eventLog.omitUpdatedBlockStatuses", false) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly here. A good way to measure "is it needed?" is to check whether the UI needs the information. If it doesn't, then it's probably something we can live without. If the UI uses it, the option should be documented and more properly reflect what the user is giving up by disabling it (e.g. "spark.eventLog.simplifiedStorageInfo" or something).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if updated block statuses are used for the UI. At first, I was wondering if the information was used to reconstruct Storage page but I checked the usage of |
||
| private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) | ||
| private val compressionCodec = | ||
| if (shouldCompress) { | ||
|
|
@@ -131,7 +137,11 @@ private[spark] class EventLoggingListener( | |
|
|
||
| /** Log the event as JSON. */ | ||
| private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) { | ||
| val eventJson = JsonProtocol.sparkEventToJson(event) | ||
| val eventJson = JsonProtocol.sparkEventToJson( | ||
| event, | ||
| omitInternalAccumulables, | ||
| omitUpdatedBlockStatuses | ||
| ) | ||
| // scalastyle:off println | ||
| writer.foreach(_.println(compact(render(eventJson)))) | ||
| // scalastyle:on println | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,20 +62,23 @@ private[spark] object JsonProtocol { | |
| * JSON serialization methods for SparkListenerEvents | | ||
| * -------------------------------------------------- */ | ||
|
|
||
| def sparkEventToJson(event: SparkListenerEvent): JValue = { | ||
| def sparkEventToJson( | ||
| event: SparkListenerEvent, | ||
| omitInternalAccums: Boolean = false, | ||
| omitUpdatedBlockStatuses: Boolean = false): JValue = { | ||
| event match { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. stageSubmitted You didn't seem to use omitInternalAccums/omitUpdatedBlockStatuses in these cases, although you had changed the underlying methods to support these flags. Intended?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. stageSubmitted/stageCompleted/jobStart should use |
||
| case stageSubmitted: SparkListenerStageSubmitted => | ||
| stageSubmittedToJson(stageSubmitted) | ||
| stageSubmittedToJson(stageSubmitted, omitInternalAccums) | ||
| case stageCompleted: SparkListenerStageCompleted => | ||
| stageCompletedToJson(stageCompleted) | ||
| stageCompletedToJson(stageCompleted, omitInternalAccums) | ||
| case taskStart: SparkListenerTaskStart => | ||
| taskStartToJson(taskStart) | ||
| taskStartToJson(taskStart, omitInternalAccums) | ||
| case taskGettingResult: SparkListenerTaskGettingResult => | ||
| taskGettingResultToJson(taskGettingResult) | ||
| taskGettingResultToJson(taskGettingResult, omitInternalAccums) | ||
| case taskEnd: SparkListenerTaskEnd => | ||
| taskEndToJson(taskEnd) | ||
| taskEndToJson(taskEnd, omitInternalAccums, omitUpdatedBlockStatuses) | ||
| case jobStart: SparkListenerJobStart => | ||
| jobStartToJson(jobStart) | ||
| jobStartToJson(jobStart, omitInternalAccums) | ||
| case jobEnd: SparkListenerJobEnd => | ||
| jobEndToJson(jobEnd) | ||
| case environmentUpdate: SparkListenerEnvironmentUpdate => | ||
|
|
@@ -97,61 +100,80 @@ private[spark] object JsonProtocol { | |
| case logStart: SparkListenerLogStart => | ||
| logStartToJson(logStart) | ||
| case metricsUpdate: SparkListenerExecutorMetricsUpdate => | ||
| executorMetricsUpdateToJson(metricsUpdate) | ||
| executorMetricsUpdateToJson(metricsUpdate, omitInternalAccums) | ||
| case blockUpdated: SparkListenerBlockUpdated => | ||
| throw new MatchError(blockUpdated) // TODO(ekl) implement this | ||
| case _ => parse(mapper.writeValueAsString(event)) | ||
| } | ||
| } | ||
|
|
||
| def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = { | ||
| val stageInfo = stageInfoToJson(stageSubmitted.stageInfo) | ||
| def stageSubmittedToJson( | ||
| stageSubmitted: SparkListenerStageSubmitted, | ||
| omitInternalAccums: Boolean = false): JValue = { | ||
| val stageInfo = stageInfoToJson(stageSubmitted.stageInfo, omitInternalAccums) | ||
| val properties = propertiesToJson(stageSubmitted.properties) | ||
| ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) ~ | ||
| ("Stage Info" -> stageInfo) ~ | ||
| ("Properties" -> properties) | ||
| } | ||
|
|
||
| def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = { | ||
| val stageInfo = stageInfoToJson(stageCompleted.stageInfo) | ||
| def stageCompletedToJson( | ||
| stageCompleted: SparkListenerStageCompleted, | ||
| omitInternalAccums: Boolean = false): JValue = { | ||
| val stageInfo = stageInfoToJson(stageCompleted.stageInfo, omitInternalAccums) | ||
| ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) ~ | ||
| ("Stage Info" -> stageInfo) | ||
| } | ||
|
|
||
| def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = { | ||
| def taskStartToJson( | ||
| taskStart: SparkListenerTaskStart, | ||
| omitInternalAccums: Boolean = false): JValue = { | ||
| val taskInfo = taskStart.taskInfo | ||
| ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) ~ | ||
| ("Stage ID" -> taskStart.stageId) ~ | ||
| ("Stage Attempt ID" -> taskStart.stageAttemptId) ~ | ||
| ("Task Info" -> taskInfoToJson(taskInfo)) | ||
| ("Task Info" -> taskInfoToJson(taskInfo, omitInternalAccums)) | ||
| } | ||
|
|
||
| def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = { | ||
| def taskGettingResultToJson( | ||
| taskGettingResult: SparkListenerTaskGettingResult, | ||
| omitInternalAccums: Boolean = false): JValue = { | ||
| val taskInfo = taskGettingResult.taskInfo | ||
| ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) ~ | ||
| ("Task Info" -> taskInfoToJson(taskInfo)) | ||
| ("Task Info" -> taskInfoToJson(taskInfo, omitInternalAccums)) | ||
| } | ||
|
|
||
| def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = { | ||
| def taskEndToJson( | ||
| taskEnd: SparkListenerTaskEnd, | ||
| omitInternalAccums: Boolean = false, | ||
| omitUpdatedBlockStatuses: Boolean = false): JValue = { | ||
| val taskEndReason = taskEndReasonToJson(taskEnd.reason) | ||
| val taskInfo = taskEnd.taskInfo | ||
| val taskMetrics = taskEnd.taskMetrics | ||
| val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing | ||
| val taskMetricsJson = if (taskMetrics != null) { | ||
| taskMetricsToJson(taskMetrics, omitUpdatedBlockStatuses) | ||
| } else { | ||
| JNothing | ||
| } | ||
| ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) ~ | ||
| ("Stage ID" -> taskEnd.stageId) ~ | ||
| ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~ | ||
| ("Task Type" -> taskEnd.taskType) ~ | ||
| ("Task End Reason" -> taskEndReason) ~ | ||
| ("Task Info" -> taskInfoToJson(taskInfo)) ~ | ||
| ("Task Info" -> taskInfoToJson(taskInfo, omitInternalAccums)) ~ | ||
| ("Task Metrics" -> taskMetricsJson) | ||
| } | ||
|
|
||
| def jobStartToJson(jobStart: SparkListenerJobStart): JValue = { | ||
| def jobStartToJson( | ||
| jobStart: SparkListenerJobStart, | ||
| omitInternalAccums: Boolean = false | ||
| ): JValue = { | ||
| val properties = propertiesToJson(jobStart.properties) | ||
| ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) ~ | ||
| ("Job ID" -> jobStart.jobId) ~ | ||
| ("Submission Time" -> jobStart.time) ~ | ||
| ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in Spark 1.2.0 | ||
| ("Stage Infos" -> | ||
| jobStart.stageInfos.map(stageInfoToJson(_, omitInternalAccums))) ~ // Added in Spark 1.2.0 | ||
| ("Stage IDs" -> jobStart.stageIds) ~ | ||
| ("Properties" -> properties) | ||
| } | ||
|
|
@@ -231,7 +253,10 @@ private[spark] object JsonProtocol { | |
| ("Spark Version" -> SPARK_VERSION) | ||
| } | ||
|
|
||
| def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = { | ||
| def executorMetricsUpdateToJson( | ||
| metricsUpdate: SparkListenerExecutorMetricsUpdate, | ||
| omitInternalAccums: Boolean = false | ||
| ): JValue = { | ||
| val execId = metricsUpdate.execId | ||
| val accumUpdates = metricsUpdate.accumUpdates | ||
| ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~ | ||
|
|
@@ -240,15 +265,15 @@ private[spark] object JsonProtocol { | |
| ("Task ID" -> taskId) ~ | ||
| ("Stage ID" -> stageId) ~ | ||
| ("Stage Attempt ID" -> stageAttemptId) ~ | ||
| ("Accumulator Updates" -> JArray(updates.map(accumulableInfoToJson).toList)) | ||
| ("Accumulator Updates" -> JArray(accumulablesToJson(updates, omitInternalAccums))) | ||
| }) | ||
| } | ||
|
|
||
| /** ------------------------------------------------------------------- * | ||
| * JSON serialization methods for classes SparkListenerEvents depend on | | ||
| * -------------------------------------------------------------------- */ | ||
|
|
||
| def stageInfoToJson(stageInfo: StageInfo): JValue = { | ||
| def stageInfoToJson(stageInfo: StageInfo, omitInternalAccums: Boolean = false): JValue = { | ||
| val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList) | ||
| val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList) | ||
| val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing) | ||
|
|
@@ -265,10 +290,10 @@ private[spark] object JsonProtocol { | |
| ("Completion Time" -> completionTime) ~ | ||
| ("Failure Reason" -> failureReason) ~ | ||
| ("Accumulables" -> JArray( | ||
| stageInfo.accumulables.values.map(accumulableInfoToJson).toList)) | ||
| accumulablesToJson(stageInfo.accumulables.values, omitInternalAccums))) | ||
| } | ||
|
|
||
| def taskInfoToJson(taskInfo: TaskInfo): JValue = { | ||
| def taskInfoToJson(taskInfo: TaskInfo, omitInternalAccums: Boolean = false): JValue = { | ||
| ("Task ID" -> taskInfo.taskId) ~ | ||
| ("Index" -> taskInfo.index) ~ | ||
| ("Attempt" -> taskInfo.attemptNumber) ~ | ||
|
|
@@ -281,7 +306,13 @@ private[spark] object JsonProtocol { | |
| ("Finish Time" -> taskInfo.finishTime) ~ | ||
| ("Failed" -> taskInfo.failed) ~ | ||
| ("Killed" -> taskInfo.killed) ~ | ||
| ("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson))) | ||
| ("Accumulables" -> JArray(accumulablesToJson(taskInfo.accumulables, omitInternalAccums))) | ||
| } | ||
|
|
||
| def accumulablesToJson( | ||
| accumulables: Iterable[AccumulableInfo], | ||
| omitInternalAccums: Boolean = false): List[JValue] = { | ||
| accumulables.filter(p => !omitInternalAccums || !p.internal).map(accumulableInfoToJson).toList | ||
| } | ||
|
|
||
| def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = { | ||
|
|
@@ -324,7 +355,9 @@ private[spark] object JsonProtocol { | |
| } | ||
| } | ||
|
|
||
| def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { | ||
| def taskMetricsToJson( | ||
| taskMetrics: TaskMetrics, | ||
| omitUpdatedBlockStatuses: Boolean = false): JValue = { | ||
| val shuffleReadMetrics: JValue = | ||
| ("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~ | ||
| ("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~ | ||
|
|
@@ -343,10 +376,14 @@ private[spark] object JsonProtocol { | |
| ("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~ | ||
| ("Records Written" -> taskMetrics.outputMetrics.recordsWritten) | ||
| val updatedBlocks = | ||
| JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) => | ||
| ("Block ID" -> id.toString) ~ | ||
| ("Status" -> blockStatusToJson(status)) | ||
| }) | ||
| if (omitUpdatedBlockStatuses) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vanzin @ajbozarth #17412 gets rid of updated block statuses from the accumulable but not from task metrics. If you think it's ok to not to have an option to get rid of updated block statuses, then I can just get rid of updated block statuses here. |
||
| JNothing | ||
| } else { | ||
| JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) => | ||
| ("Block ID" -> id.toString) ~ | ||
| ("Status" -> blockStatusToJson(status)) | ||
| }) | ||
| } | ||
| ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~ | ||
| ("Executor Deserialize CPU Time" -> taskMetrics.executorDeserializeCpuTime) ~ | ||
| ("Executor Run Time" -> taskMetrics.executorRunTime) ~ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this information useful at all in event logs? If there's nothing that uses it, then not writing it is probably better than having yet another option that has to be documented and that people have to explicitly enable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this information is used to reconstruct job UI. I am not sure how this information got included in event logs, but I think some people might be using it to get internal metrics for a stage from the history server using its REST API. For example, CPU time metrics is not included in stage metrics you can get by querying history server endpoint
/applications/[app-id]/stages/[stage-id].There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I see CPU time in both stage-level data and task-level data in the REST API...
Do you mind checking the code for when this was introduced and whether it was a conscious decision (as in, it covered some user case we're not seeing)?
If possible it's always better to avoid adding more config options, especially in this kind of situation. For example, if this data is needed for something, the config would be disabling that functionality, and it would be better to instead figure out how to save it in a way that does not waste so much space. And there's always compression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin I added CPU time because back then I was pulling stage metrics from history server and needed CPU time. Here's the PR for the change: #10212 . Looking at the code, CPU time should be there, so I think there's something on my end. That's a separate problem though, and I don't think CPU time metric should increase size of event logs much. I can't think of a use case for internal accumulables then, so I think it makes sense to delete this. If anyone wants to use internal accumulables for stage metrics, they should be able to catch it after a stage finishes, not from History server.