Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,14 @@ case class FileSourceScanExec(
partitionSchema = relation.partitionSchema,
relation.sparkSession.sessionState.conf)

private var metadataTime = 0L

@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000

metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
metrics("metadataTime").add(timeTakenMs)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
metrics("numFiles") :: metrics("metadataTime") :: Nil)

metadataTime = timeTakenMs
ret
}

Expand Down Expand Up @@ -308,6 +303,8 @@ case class FileSourceScanExec(
}

private lazy val inputRDD: RDD[InternalRow] = {
// Update metrics for taking effect in both code generation node and normal node.
updateDriverMetrics()
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
Expand Down Expand Up @@ -524,6 +521,19 @@ case class FileSourceScanExec(
}
}

/**
* Send the updated metrics to driver, while this function calling, selectedPartitions has
* been initialized. See SPARK-26327 for more detail.
*/
private def updateDriverMetrics() = {
metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
metrics("metadataTime").add(metadataTime)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
metrics("numFiles") :: metrics("metadataTime") :: Nil)
}

override def doCanonicalize(): FileSourceScanExec = {
FileSourceScanExec(
relation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,4 +570,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
}
}
}

test("SPARK-26327: FileSourceScanExec metrics") {
withTable("testDataForScan") {
spark.range(10).selectExpr("id", "id % 3 as p")
.write.partitionBy("p").saveAsTable("testDataForScan")
// The execution plan only has 1 FileScan node.
val df = spark.sql(
"SELECT * FROM testDataForScan WHERE p = 1")
testSparkPlanMetrics(df, 1, Map(
0L -> (("Scan parquet default.testdataforscan", Map(
"number of output rows" -> 3L,
"number of files" -> 2L))))
)
}
}
}