Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ private[spark] class EventLoggingListener(
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
// To reduce the size of event logs, we can omit logging all of internal accumulables for metrics.
private val omitInternalAccumulables =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this information useful at all in event logs? If there's nothing that uses it, then not writing it is probably better than having yet another option that has to be documented and that people have to explicitly enable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this information is used to reconstruct job UI. I am not sure how this information got included in event logs, but I think some people might be using it to get internal metrics for a stage from the history server using its REST API. For example, CPU time metrics is not included in stage metrics you can get by querying history server endpoint /applications/[app-id]/stages/[stage-id].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I see CPU time in both stage-level data and task-level data in the REST API...

Do you mind checking the code for when this was introduced and whether it was a conscious decision (as in, it covered some user case we're not seeing)?

If possible it's always better to avoid adding more config options, especially in this kind of situation. For example, if this data is needed for something, the config would be disabling that functionality, and it would be better to instead figure out how to save it in a way that does not waste so much space. And there's always compression.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin I added CPU time because back then I was pulling stage metrics from history server and needed CPU time. Here's the PR for the change: #10212 . Looking at the code, CPU time should be there, so I think there's something on my end. That's a separate problem though, and I don't think CPU time metric should increase size of event logs much. I can't think of a use case for internal accumulables then, so I think it makes sense to delete this. If anyone wants to use internal accumulables for stage metrics, they should be able to catch it after a stage finishes, not from History server.

sparkConf.getBoolean("spark.eventLog.omitInternalAccumulables", false)
// To reduce the size of event logs, we can omit logging "Updated Block Statuses" metric.
private val omitUpdatedBlockStatuses =
sparkConf.getBoolean("spark.eventLog.omitUpdatedBlockStatuses", false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here.

A good way to measure "is it needed?" is to check whether the UI needs the information. If it doesn't, then it's probably something we can live without.

If the UI uses it, the option should be documented and more properly reflect what the user is giving up by disabling it (e.g. "spark.eventLog.simplifiedStorageInfo" or something).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if updated block statuses are used for the UI. At first, I was wondering if the information was used to reconstruct Storage page but I checked the usage of TaskMetrics.updatedBlockStatues and it doesn't seem to be used anywhere except for when the task metrics is converted to JSON object. Actually, I am not sure if Storage tab is working at all unless I am missing something. I don't think /applications/[app-id]/storage/rdd returns any meaningful information.

private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
private val compressionCodec =
if (shouldCompress) {
Expand Down Expand Up @@ -131,7 +137,11 @@ private[spark] class EventLoggingListener(

/** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
val eventJson = JsonProtocol.sparkEventToJson(event)
val eventJson = JsonProtocol.sparkEventToJson(
event,
omitInternalAccumulables,
omitUpdatedBlockStatuses
)
// scalastyle:off println
writer.foreach(_.println(compact(render(eventJson))))
// scalastyle:on println
Expand Down
101 changes: 69 additions & 32 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,23 @@ private[spark] object JsonProtocol {
* JSON serialization methods for SparkListenerEvents |
* -------------------------------------------------- */

def sparkEventToJson(event: SparkListenerEvent): JValue = {
def sparkEventToJson(
event: SparkListenerEvent,
omitInternalAccums: Boolean = false,
omitUpdatedBlockStatuses: Boolean = false): JValue = {
event match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stageSubmitted
stageCompleted
jobStart
jobEnd

You didn't seem to use omitInternalAccums/omitUpdatedBlockStatuses in these cases, although you had changed the underlying methods to support these flags. Intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stageSubmitted/stageCompleted/jobStart should use omitInternalAccums, but not jobEnd. jobEnd's interface hasn't changed. omitUpdatedBlockStatues is intended to be only used for taskEnd because that's when updated block statuses are reported. Thanks for catching, I will add omitInternalAccums to stageSubmitted and jobStart.

case stageSubmitted: SparkListenerStageSubmitted =>
stageSubmittedToJson(stageSubmitted)
stageSubmittedToJson(stageSubmitted, omitInternalAccums)
case stageCompleted: SparkListenerStageCompleted =>
stageCompletedToJson(stageCompleted)
stageCompletedToJson(stageCompleted, omitInternalAccums)
case taskStart: SparkListenerTaskStart =>
taskStartToJson(taskStart)
taskStartToJson(taskStart, omitInternalAccums)
case taskGettingResult: SparkListenerTaskGettingResult =>
taskGettingResultToJson(taskGettingResult)
taskGettingResultToJson(taskGettingResult, omitInternalAccums)
case taskEnd: SparkListenerTaskEnd =>
taskEndToJson(taskEnd)
taskEndToJson(taskEnd, omitInternalAccums, omitUpdatedBlockStatuses)
case jobStart: SparkListenerJobStart =>
jobStartToJson(jobStart)
jobStartToJson(jobStart, omitInternalAccums)
case jobEnd: SparkListenerJobEnd =>
jobEndToJson(jobEnd)
case environmentUpdate: SparkListenerEnvironmentUpdate =>
Expand All @@ -97,61 +100,80 @@ private[spark] object JsonProtocol {
case logStart: SparkListenerLogStart =>
logStartToJson(logStart)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
executorMetricsUpdateToJson(metricsUpdate)
executorMetricsUpdateToJson(metricsUpdate, omitInternalAccums)
case blockUpdated: SparkListenerBlockUpdated =>
throw new MatchError(blockUpdated) // TODO(ekl) implement this
case _ => parse(mapper.writeValueAsString(event))
}
}

def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = {
val stageInfo = stageInfoToJson(stageSubmitted.stageInfo)
def stageSubmittedToJson(
stageSubmitted: SparkListenerStageSubmitted,
omitInternalAccums: Boolean = false): JValue = {
val stageInfo = stageInfoToJson(stageSubmitted.stageInfo, omitInternalAccums)
val properties = propertiesToJson(stageSubmitted.properties)
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) ~
("Stage Info" -> stageInfo) ~
("Properties" -> properties)
}

def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = {
val stageInfo = stageInfoToJson(stageCompleted.stageInfo)
def stageCompletedToJson(
stageCompleted: SparkListenerStageCompleted,
omitInternalAccums: Boolean = false): JValue = {
val stageInfo = stageInfoToJson(stageCompleted.stageInfo, omitInternalAccums)
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) ~
("Stage Info" -> stageInfo)
}

def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
def taskStartToJson(
taskStart: SparkListenerTaskStart,
omitInternalAccums: Boolean = false): JValue = {
val taskInfo = taskStart.taskInfo
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) ~
("Stage ID" -> taskStart.stageId) ~
("Stage Attempt ID" -> taskStart.stageAttemptId) ~
("Task Info" -> taskInfoToJson(taskInfo))
("Task Info" -> taskInfoToJson(taskInfo, omitInternalAccums))
}

def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = {
def taskGettingResultToJson(
taskGettingResult: SparkListenerTaskGettingResult,
omitInternalAccums: Boolean = false): JValue = {
val taskInfo = taskGettingResult.taskInfo
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) ~
("Task Info" -> taskInfoToJson(taskInfo))
("Task Info" -> taskInfoToJson(taskInfo, omitInternalAccums))
}

def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = {
def taskEndToJson(
taskEnd: SparkListenerTaskEnd,
omitInternalAccums: Boolean = false,
omitUpdatedBlockStatuses: Boolean = false): JValue = {
val taskEndReason = taskEndReasonToJson(taskEnd.reason)
val taskInfo = taskEnd.taskInfo
val taskMetrics = taskEnd.taskMetrics
val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
val taskMetricsJson = if (taskMetrics != null) {
taskMetricsToJson(taskMetrics, omitUpdatedBlockStatuses)
} else {
JNothing
}
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) ~
("Stage ID" -> taskEnd.stageId) ~
("Stage Attempt ID" -> taskEnd.stageAttemptId) ~
("Task Type" -> taskEnd.taskType) ~
("Task End Reason" -> taskEndReason) ~
("Task Info" -> taskInfoToJson(taskInfo)) ~
("Task Info" -> taskInfoToJson(taskInfo, omitInternalAccums)) ~
("Task Metrics" -> taskMetricsJson)
}

def jobStartToJson(jobStart: SparkListenerJobStart): JValue = {
def jobStartToJson(
jobStart: SparkListenerJobStart,
omitInternalAccums: Boolean = false
): JValue = {
val properties = propertiesToJson(jobStart.properties)
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) ~
("Job ID" -> jobStart.jobId) ~
("Submission Time" -> jobStart.time) ~
("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in Spark 1.2.0
("Stage Infos" ->
jobStart.stageInfos.map(stageInfoToJson(_, omitInternalAccums))) ~ // Added in Spark 1.2.0
("Stage IDs" -> jobStart.stageIds) ~
("Properties" -> properties)
}
Expand Down Expand Up @@ -231,7 +253,10 @@ private[spark] object JsonProtocol {
("Spark Version" -> SPARK_VERSION)
}

def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
def executorMetricsUpdateToJson(
metricsUpdate: SparkListenerExecutorMetricsUpdate,
omitInternalAccums: Boolean = false
): JValue = {
val execId = metricsUpdate.execId
val accumUpdates = metricsUpdate.accumUpdates
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~
Expand All @@ -240,15 +265,15 @@ private[spark] object JsonProtocol {
("Task ID" -> taskId) ~
("Stage ID" -> stageId) ~
("Stage Attempt ID" -> stageAttemptId) ~
("Accumulator Updates" -> JArray(updates.map(accumulableInfoToJson).toList))
("Accumulator Updates" -> JArray(accumulablesToJson(updates, omitInternalAccums)))
})
}

/** ------------------------------------------------------------------- *
* JSON serialization methods for classes SparkListenerEvents depend on |
* -------------------------------------------------------------------- */

def stageInfoToJson(stageInfo: StageInfo): JValue = {
def stageInfoToJson(stageInfo: StageInfo, omitInternalAccums: Boolean = false): JValue = {
val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList)
val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList)
val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
Expand All @@ -265,10 +290,10 @@ private[spark] object JsonProtocol {
("Completion Time" -> completionTime) ~
("Failure Reason" -> failureReason) ~
("Accumulables" -> JArray(
stageInfo.accumulables.values.map(accumulableInfoToJson).toList))
accumulablesToJson(stageInfo.accumulables.values, omitInternalAccums)))
}

def taskInfoToJson(taskInfo: TaskInfo): JValue = {
def taskInfoToJson(taskInfo: TaskInfo, omitInternalAccums: Boolean = false): JValue = {
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
("Attempt" -> taskInfo.attemptNumber) ~
Expand All @@ -281,7 +306,13 @@ private[spark] object JsonProtocol {
("Finish Time" -> taskInfo.finishTime) ~
("Failed" -> taskInfo.failed) ~
("Killed" -> taskInfo.killed) ~
("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson)))
("Accumulables" -> JArray(accumulablesToJson(taskInfo.accumulables, omitInternalAccums)))
}

def accumulablesToJson(
accumulables: Iterable[AccumulableInfo],
omitInternalAccums: Boolean = false): List[JValue] = {
accumulables.filter(p => !omitInternalAccums || !p.internal).map(accumulableInfoToJson).toList
}

def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = {
Expand Down Expand Up @@ -324,7 +355,9 @@ private[spark] object JsonProtocol {
}
}

def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
def taskMetricsToJson(
taskMetrics: TaskMetrics,
omitUpdatedBlockStatuses: Boolean = false): JValue = {
val shuffleReadMetrics: JValue =
("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~
("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~
Expand All @@ -343,10 +376,14 @@ private[spark] object JsonProtocol {
("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~
("Records Written" -> taskMetrics.outputMetrics.recordsWritten)
val updatedBlocks =
JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) =>
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
})
if (omitUpdatedBlockStatuses) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin @ajbozarth #17412 gets rid of updated block statuses from the accumulable but not from task metrics. If you think it's ok to not to have an option to get rid of updated block statuses, then I can just get rid of updated block statuses here.

JNothing
} else {
JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) =>
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
})
}
("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
("Executor Deserialize CPU Time" -> taskMetrics.executorDeserializeCpuTime) ~
("Executor Run Time" -> taskMetrics.executorRunTime) ~
Expand Down
47 changes: 42 additions & 5 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,40 @@ class JsonProtocolSuite extends SparkFunSuite {
testAccumValue(Some("anything"), 123, JString("123"))
}

test("Omit internal accumulables") {
val accmsWithInternal = Seq(makeAccumulableInfo(0), makeAccumulableInfo(1, internal = true))
val accmsWithoutInternal = Seq(makeAccumulableInfo(0))
assertJsonStringEquals(
compact(render(
JsonProtocol.accumulablesToJson(accmsWithInternal, omitInternalAccums = true))
),
compact(render(
JsonProtocol.accumulablesToJson(accmsWithoutInternal, omitInternalAccums = true))
),
"Accumulables"
)
}

test("Omit updated block statuses") {
val withBlockStatuses =
makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, false, false)
val withoutBlockStatuses =
makeTaskMetrics(
300L, 400L, 500L, 600L, 700, 800, false, false, setUpdatedBlockStatuses = false
)
assertEquals(
JsonProtocol.taskMetricsFromJson(
JsonProtocol.taskMetricsToJson(
withBlockStatuses, omitUpdatedBlockStatuses = true
)
),
JsonProtocol.taskMetricsFromJson(
JsonProtocol.taskMetricsToJson(
withoutBlockStatuses
)
)
)
}
}


Expand Down Expand Up @@ -828,7 +862,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
f: Int,
hasHadoopInput: Boolean,
hasOutput: Boolean,
hasRecords: Boolean = true) = {
hasRecords: Boolean = true,
setUpdatedBlockStatuses: Boolean = true) = {
val t = TaskMetrics.empty
// Set CPU times same as wall times for testing purpose
t.setExecutorDeserializeTime(a)
Expand Down Expand Up @@ -863,10 +898,12 @@ private[spark] object JsonProtocolSuite extends Assertions {
sw.incWriteTime(b + c + d)
sw.incRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
}
// Make at most 6 blocks
t.setUpdatedBlockStatuses((1 to (e % 5 + 1)).map { i =>
(RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i))
}.toSeq)
if (setUpdatedBlockStatuses) {
// Make at most 6 blocks
t.setUpdatedBlockStatuses((1 to (e % 5 + 1)).map { i =>
(RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i))
}.toSeq)
}
t
}

Expand Down
16 changes: 16 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,22 @@ Apart from these, the following properties are also available, and may be useful
finished.
</td>
</tr>
<tr>
<td><code>spark.eventLog.omitInternalAccumulables</code></td>
<td>false</td>
<td>
Whether to log internal accumulables for Spark task metrics, useful for reducing the size
of event logs. Spark task metrics will still be recorded under "Task Metrics" field.
</td>
</tr>
<tr>
<td><code>spark.eventLog.omitUpdatedBlockStatuses</code></td>
<td>false</td>
<td>
Whether to include Updated Block Statuses metrics in Spark event log, useful for reducing
the size of event logs.
</td>
</tr>
<tr>
<td><code>spark.ui.enabled</code></td>
<td>true</td>
Expand Down