diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index af9bdefc967ef..2b61fd7ed3a69 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -64,6 +64,12 @@ private[spark] class EventLoggingListener( private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 + // To reduce the size of event logs, we can omit logging all of internal accumulables for metrics. + private val omitInternalAccumulables = + sparkConf.getBoolean("spark.eventLog.omitInternalAccumulables", false) + // To reduce the size of event logs, we can omit logging "Updated Block Statuses" metric. + private val omitUpdatedBlockStatuses = + sparkConf.getBoolean("spark.eventLog.omitUpdatedBlockStatuses", false) private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) private val compressionCodec = if (shouldCompress) { @@ -131,7 +137,11 @@ private[spark] class EventLoggingListener( /** Log the event as JSON. */ private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) { - val eventJson = JsonProtocol.sparkEventToJson(event) + val eventJson = JsonProtocol.sparkEventToJson( + event, + omitInternalAccumulables, + omitUpdatedBlockStatuses + ) // scalastyle:off println writer.foreach(_.println(compact(render(eventJson)))) // scalastyle:on println diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 4b4d2d10cbf8d..3f1bc2cef37c5 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -62,20 +62,23 @@ private[spark] object JsonProtocol { * JSON serialization methods for SparkListenerEvents | * -------------------------------------------------- */ - def sparkEventToJson(event: SparkListenerEvent): JValue = { + def sparkEventToJson( + event: SparkListenerEvent, + omitInternalAccums: Boolean = false, + omitUpdatedBlockStatuses: Boolean = false): JValue = { event match { case stageSubmitted: SparkListenerStageSubmitted => - stageSubmittedToJson(stageSubmitted) + stageSubmittedToJson(stageSubmitted, omitInternalAccums) case stageCompleted: SparkListenerStageCompleted => - stageCompletedToJson(stageCompleted) + stageCompletedToJson(stageCompleted, omitInternalAccums) case taskStart: SparkListenerTaskStart => - taskStartToJson(taskStart) + taskStartToJson(taskStart, omitInternalAccums) case taskGettingResult: SparkListenerTaskGettingResult => - taskGettingResultToJson(taskGettingResult) + taskGettingResultToJson(taskGettingResult, omitInternalAccums) case taskEnd: SparkListenerTaskEnd => - taskEndToJson(taskEnd) + taskEndToJson(taskEnd, omitInternalAccums, omitUpdatedBlockStatuses) case jobStart: SparkListenerJobStart => - jobStartToJson(jobStart) + jobStartToJson(jobStart, omitInternalAccums) case jobEnd: SparkListenerJobEnd => jobEndToJson(jobEnd) case environmentUpdate: SparkListenerEnvironmentUpdate => @@ -97,61 +100,80 @@ private[spark] object JsonProtocol { case logStart: SparkListenerLogStart => logStartToJson(logStart) case metricsUpdate: SparkListenerExecutorMetricsUpdate => - executorMetricsUpdateToJson(metricsUpdate) + executorMetricsUpdateToJson(metricsUpdate, omitInternalAccums) case blockUpdated: SparkListenerBlockUpdated => throw new MatchError(blockUpdated) // TODO(ekl) implement this case _ => parse(mapper.writeValueAsString(event)) } } - def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = { - val stageInfo = stageInfoToJson(stageSubmitted.stageInfo) + def stageSubmittedToJson( + stageSubmitted: SparkListenerStageSubmitted, + omitInternalAccums: Boolean = false): JValue = { + val stageInfo = stageInfoToJson(stageSubmitted.stageInfo, omitInternalAccums) val properties = propertiesToJson(stageSubmitted.properties) ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) ~ ("Stage Info" -> stageInfo) ~ ("Properties" -> properties) } - def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = { - val stageInfo = stageInfoToJson(stageCompleted.stageInfo) + def stageCompletedToJson( + stageCompleted: SparkListenerStageCompleted, + omitInternalAccums: Boolean = false): JValue = { + val stageInfo = stageInfoToJson(stageCompleted.stageInfo, omitInternalAccums) ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) ~ ("Stage Info" -> stageInfo) } - def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = { + def taskStartToJson( + taskStart: SparkListenerTaskStart, + omitInternalAccums: Boolean = false): JValue = { val taskInfo = taskStart.taskInfo ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) ~ ("Stage ID" -> taskStart.stageId) ~ ("Stage Attempt ID" -> taskStart.stageAttemptId) ~ - ("Task Info" -> taskInfoToJson(taskInfo)) + ("Task Info" -> taskInfoToJson(taskInfo, omitInternalAccums)) } - def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = { + def taskGettingResultToJson( + taskGettingResult: SparkListenerTaskGettingResult, + omitInternalAccums: Boolean = false): JValue = { val taskInfo = taskGettingResult.taskInfo ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) ~ - ("Task Info" -> taskInfoToJson(taskInfo)) + ("Task Info" -> taskInfoToJson(taskInfo, omitInternalAccums)) } - def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = { + def taskEndToJson( + taskEnd: SparkListenerTaskEnd, + omitInternalAccums: Boolean = false, + omitUpdatedBlockStatuses: Boolean = false): JValue = { val taskEndReason = taskEndReasonToJson(taskEnd.reason) val taskInfo = taskEnd.taskInfo val taskMetrics = taskEnd.taskMetrics - val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing + val taskMetricsJson = if (taskMetrics != null) { + taskMetricsToJson(taskMetrics, omitUpdatedBlockStatuses) + } else { + JNothing + } ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) ~ ("Stage ID" -> taskEnd.stageId) ~ ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~ ("Task Type" -> taskEnd.taskType) ~ ("Task End Reason" -> taskEndReason) ~ - ("Task Info" -> taskInfoToJson(taskInfo)) ~ + ("Task Info" -> taskInfoToJson(taskInfo, omitInternalAccums)) ~ ("Task Metrics" -> taskMetricsJson) } - def jobStartToJson(jobStart: SparkListenerJobStart): JValue = { + def jobStartToJson( + jobStart: SparkListenerJobStart, + omitInternalAccums: Boolean = false + ): JValue = { val properties = propertiesToJson(jobStart.properties) ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) ~ ("Job ID" -> jobStart.jobId) ~ ("Submission Time" -> jobStart.time) ~ - ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in Spark 1.2.0 + ("Stage Infos" -> + jobStart.stageInfos.map(stageInfoToJson(_, omitInternalAccums))) ~ // Added in Spark 1.2.0 ("Stage IDs" -> jobStart.stageIds) ~ ("Properties" -> properties) } @@ -231,7 +253,10 @@ private[spark] object JsonProtocol { ("Spark Version" -> SPARK_VERSION) } - def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = { + def executorMetricsUpdateToJson( + metricsUpdate: SparkListenerExecutorMetricsUpdate, + omitInternalAccums: Boolean = false + ): JValue = { val execId = metricsUpdate.execId val accumUpdates = metricsUpdate.accumUpdates ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~ @@ -240,7 +265,7 @@ private[spark] object JsonProtocol { ("Task ID" -> taskId) ~ ("Stage ID" -> stageId) ~ ("Stage Attempt ID" -> stageAttemptId) ~ - ("Accumulator Updates" -> JArray(updates.map(accumulableInfoToJson).toList)) + ("Accumulator Updates" -> JArray(accumulablesToJson(updates, omitInternalAccums))) }) } @@ -248,7 +273,7 @@ private[spark] object JsonProtocol { * JSON serialization methods for classes SparkListenerEvents depend on | * -------------------------------------------------------------------- */ - def stageInfoToJson(stageInfo: StageInfo): JValue = { + def stageInfoToJson(stageInfo: StageInfo, omitInternalAccums: Boolean = false): JValue = { val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList) val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList) val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing) @@ -265,10 +290,10 @@ private[spark] object JsonProtocol { ("Completion Time" -> completionTime) ~ ("Failure Reason" -> failureReason) ~ ("Accumulables" -> JArray( - stageInfo.accumulables.values.map(accumulableInfoToJson).toList)) + accumulablesToJson(stageInfo.accumulables.values, omitInternalAccums))) } - def taskInfoToJson(taskInfo: TaskInfo): JValue = { + def taskInfoToJson(taskInfo: TaskInfo, omitInternalAccums: Boolean = false): JValue = { ("Task ID" -> taskInfo.taskId) ~ ("Index" -> taskInfo.index) ~ ("Attempt" -> taskInfo.attemptNumber) ~ @@ -281,7 +306,13 @@ private[spark] object JsonProtocol { ("Finish Time" -> taskInfo.finishTime) ~ ("Failed" -> taskInfo.failed) ~ ("Killed" -> taskInfo.killed) ~ - ("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson))) + ("Accumulables" -> JArray(accumulablesToJson(taskInfo.accumulables, omitInternalAccums))) + } + + def accumulablesToJson( + accumulables: Iterable[AccumulableInfo], + omitInternalAccums: Boolean = false): List[JValue] = { + accumulables.filter(p => !omitInternalAccums || !p.internal).map(accumulableInfoToJson).toList } def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = { @@ -324,7 +355,9 @@ private[spark] object JsonProtocol { } } - def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { + def taskMetricsToJson( + taskMetrics: TaskMetrics, + omitUpdatedBlockStatuses: Boolean = false): JValue = { val shuffleReadMetrics: JValue = ("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~ ("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~ @@ -343,10 +376,14 @@ private[spark] object JsonProtocol { ("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~ ("Records Written" -> taskMetrics.outputMetrics.recordsWritten) val updatedBlocks = - JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) => - ("Block ID" -> id.toString) ~ - ("Status" -> blockStatusToJson(status)) - }) + if (omitUpdatedBlockStatuses) { + JNothing + } else { + JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) => + ("Block ID" -> id.toString) ~ + ("Status" -> blockStatusToJson(status)) + }) + } ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~ ("Executor Deserialize CPU Time" -> taskMetrics.executorDeserializeCpuTime) ~ ("Executor Run Time" -> taskMetrics.executorRunTime) ~ diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 9f76c74bce89e..358f21e727919 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -432,6 +432,40 @@ class JsonProtocolSuite extends SparkFunSuite { testAccumValue(Some("anything"), 123, JString("123")) } + test("Omit internal accumulables") { + val accmsWithInternal = Seq(makeAccumulableInfo(0), makeAccumulableInfo(1, internal = true)) + val accmsWithoutInternal = Seq(makeAccumulableInfo(0)) + assertJsonStringEquals( + compact(render( + JsonProtocol.accumulablesToJson(accmsWithInternal, omitInternalAccums = true)) + ), + compact(render( + JsonProtocol.accumulablesToJson(accmsWithoutInternal, omitInternalAccums = true)) + ), + "Accumulables" + ) + } + + test("Omit updated block statuses") { + val withBlockStatuses = + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, false, false) + val withoutBlockStatuses = + makeTaskMetrics( + 300L, 400L, 500L, 600L, 700, 800, false, false, setUpdatedBlockStatuses = false + ) + assertEquals( + JsonProtocol.taskMetricsFromJson( + JsonProtocol.taskMetricsToJson( + withBlockStatuses, omitUpdatedBlockStatuses = true + ) + ), + JsonProtocol.taskMetricsFromJson( + JsonProtocol.taskMetricsToJson( + withoutBlockStatuses + ) + ) + ) + } } @@ -828,7 +862,8 @@ private[spark] object JsonProtocolSuite extends Assertions { f: Int, hasHadoopInput: Boolean, hasOutput: Boolean, - hasRecords: Boolean = true) = { + hasRecords: Boolean = true, + setUpdatedBlockStatuses: Boolean = true) = { val t = TaskMetrics.empty // Set CPU times same as wall times for testing purpose t.setExecutorDeserializeTime(a) @@ -863,10 +898,12 @@ private[spark] object JsonProtocolSuite extends Assertions { sw.incWriteTime(b + c + d) sw.incRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1) } - // Make at most 6 blocks - t.setUpdatedBlockStatuses((1 to (e % 5 + 1)).map { i => - (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i)) - }.toSeq) + if (setUpdatedBlockStatuses) { + // Make at most 6 blocks + t.setUpdatedBlockStatuses((1 to (e % 5 + 1)).map { i => + (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i)) + }.toSeq) + } t } diff --git a/docs/configuration.md b/docs/configuration.md index a6b1f15fdabfc..1843762196e39 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -659,6 +659,22 @@ Apart from these, the following properties are also available, and may be useful finished. + + spark.eventLog.omitInternalAccumulables + false + + Whether to log internal accumulables for Spark task metrics, useful for reducing the size + of event logs. Spark task metrics will still be recorded under "Task Metrics" field. + + + + spark.eventLog.omitUpdatedBlockStatuses + false + + Whether to include Updated Block Statuses metrics in Spark event log, useful for reducing + the size of event logs. + + spark.ui.enabled true