From 3c0f0746245e9f6cd266c12dba0fae6da1d29289 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 10 Dec 2018 23:04:19 +0800 Subject: [PATCH 1/2] Bug fix for file scan metrics --- .../sql/execution/DataSourceScanExec.scala | 28 +++++++++++++------ .../execution/metric/SQLMetricsSuite.scala | 15 ++++++++++ 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 36ed016773b67..8c1f65f7f2294 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -185,19 +185,14 @@ case class FileSourceScanExec( partitionSchema = relation.partitionSchema, relation.sparkSession.sessionState.conf) + private var fileListingTime = 0L + @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = relation.location.listFiles(partitionFilters, dataFilters) val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 - - metrics("numFiles").add(ret.map(_.files.size.toLong).sum) - metrics("metadataTime").add(timeTakenMs) - - val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, - metrics("numFiles") :: metrics("metadataTime") :: Nil) - + fileListingTime = timeTakenMs ret } @@ -308,6 +303,8 @@ case class FileSourceScanExec( } private lazy val inputRDD: RDD[InternalRow] = { + // Update metrics for taking effect in both code generation node and normal node. + updateDriverMetrics() val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, @@ -333,7 +330,7 @@ case class FileSourceScanExec( override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"), - "metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time (ms)"), + "fileListingTime" -> SQLMetrics.createMetric(sparkContext, "file listing time (ms)"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) protected override def doExecute(): RDD[InternalRow] = { @@ -524,6 +521,19 @@ case class FileSourceScanExec( } } + /** + * Send the updated metrics to driver, while this function calling, selectedPartitions has + * been initialized. See SPARK-26327 for more detail. + */ + private def updateDriverMetrics() = { + metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum) + metrics("fileListingTime").add(fileListingTime) + + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, + metrics("numFiles") :: metrics("fileListingTime") :: Nil) + } + override def doCanonicalize(): FileSourceScanExec = { FileSourceScanExec( relation, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 085a445488480..c550bf20b92b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -570,4 +570,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } } } + + test("SPARK-26327: FileSourceScanExec metrics") { + withTable("testDataForScan") { + spark.range(10).selectExpr("id", "id % 3 as p") + .write.partitionBy("p").saveAsTable("testDataForScan") + // The execution plan only has 1 FileScan node. + val df = spark.sql( + "SELECT * FROM testDataForScan WHERE p = 1") + testSparkPlanMetrics(df, 1, Map( + 0L -> (("Scan parquet default.testdataforscan", Map( + "number of output rows" -> 3L, + "number of files" -> 2L)))) + ) + } + } } From 368c7b50cbbb18b4dd6b11d6d4791448a057ffc3 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 11 Dec 2018 21:09:00 +0800 Subject: [PATCH 2/2] change back metrics name --- .../spark/sql/execution/DataSourceScanExec.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 8c1f65f7f2294..5433c30afd6bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -185,14 +185,14 @@ case class FileSourceScanExec( partitionSchema = relation.partitionSchema, relation.sparkSession.sessionState.conf) - private var fileListingTime = 0L + private var metadataTime = 0L @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = relation.location.listFiles(partitionFilters, dataFilters) val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 - fileListingTime = timeTakenMs + metadataTime = timeTakenMs ret } @@ -330,7 +330,7 @@ case class FileSourceScanExec( override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"), - "fileListingTime" -> SQLMetrics.createMetric(sparkContext, "file listing time (ms)"), + "metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time (ms)"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) protected override def doExecute(): RDD[InternalRow] = { @@ -527,11 +527,11 @@ case class FileSourceScanExec( */ private def updateDriverMetrics() = { metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum) - metrics("fileListingTime").add(fileListingTime) + metrics("metadataTime").add(metadataTime) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, - metrics("numFiles") :: metrics("fileListingTime") :: Nil) + metrics("numFiles") :: metrics("metadataTime") :: Nil) } override def doCanonicalize(): FileSourceScanExec = {