Skip to content

Commit 0130f07

Browse files
committed
sameResult
1 parent 00f2f31 commit 0130f07

File tree

5 files changed

+102
-14
lines changed

5 files changed

+102
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.catalyst.InternalRow
2222
import org.apache.spark.sql.catalyst.expressions._
23+
import org.apache.spark.sql.catalyst.plans.QueryPlan
2324
import org.apache.spark.sql.sources.v2.reader._
2425

2526
/**
@@ -46,4 +47,8 @@ case class BatchScanExec(
4647
override lazy val inputRDD: RDD[InternalRow] = {
4748
new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
4849
}
50+
51+
override def doCanonicalize(): BatchScanExec = {
52+
this.copy(output = output.map(QueryPlan.normalizeExprId(_, output)))
53+
}
4954
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
2222
import org.apache.spark.sql.SparkSession
2323
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
2424
import org.apache.spark.sql.execution.datasources.v2.FileScan
25+
import org.apache.spark.sql.sources.Filter
2526
import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
2627
import org.apache.spark.sql.types.StructType
2728
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -34,7 +35,8 @@ case class OrcScan(
3435
dataSchema: StructType,
3536
readDataSchema: StructType,
3637
readPartitionSchema: StructType,
37-
options: CaseInsensitiveStringMap)
38+
options: CaseInsensitiveStringMap,
39+
pushedFilters: Array[Filter])
3840
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) {
3941
override def isSplitable(path: Path): Boolean = true
4042

@@ -46,4 +48,18 @@ case class OrcScan(
4648
OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
4749
dataSchema, readDataSchema, readPartitionSchema)
4850
}
51+
52+
override def equals(obj: Any): Boolean = obj match {
53+
case o: OrcScan =>
54+
fileIndex == o.fileIndex && dataSchema == o.dataSchema &&
55+
readDataSchema == o.readDataSchema && readPartitionSchema == o.readPartitionSchema &&
56+
options == o.options && equivalentFilters(pushedFilters, o.pushedFilters)
57+
case _ => false
58+
}
59+
60+
override def hashCode(): Int = getClass.hashCode()
61+
62+
private def equivalentFilters(a: Array[Filter], b: Array[Filter]): Boolean = {
63+
a.sortBy(_.hashCode()).sameElements(b.sortBy(_.hashCode()))
64+
}
4965
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ case class OrcScanBuilder(
4545

4646
override def build(): Scan = {
4747
OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema,
48-
readDataSchema(), readPartitionSchema(), options)
48+
readDataSchema(), readPartitionSchema(), options, pushedFilters())
4949
}
5050

5151
private var _pushedFilters: Array[Filter] = Array.empty

sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ package org.apache.spark.sql.execution
2020
import org.apache.spark.sql.{DataFrame, QueryTest}
2121
import org.apache.spark.sql.catalyst.expressions.AttributeReference
2222
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
23+
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
2324
import org.apache.spark.sql.functions._
25+
import org.apache.spark.sql.internal.SQLConf
2426
import org.apache.spark.sql.test.SharedSQLContext
2527
import org.apache.spark.sql.types.IntegerType
2628

@@ -47,6 +49,65 @@ class SameResultSuite extends QueryTest with SharedSQLContext {
4749
}
4850
}
4951

52+
test("FileScan: different orders of data filters and partition filters") {
53+
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") {
54+
Seq("orc", "json", "csv").foreach { format =>
55+
withTempPath { path =>
56+
val tmpDir = path.getCanonicalPath
57+
spark.range(10)
58+
.selectExpr("id as a", "id + 1 as b", "id + 2 as c", "id + 3 as d")
59+
.write
60+
.partitionBy("a", "b")
61+
.format(format)
62+
.option("header", true)
63+
.save(tmpDir)
64+
val df = spark.read.format(format).option("header", true).load(tmpDir)
65+
// partition filters: a > 1 AND b < 9
66+
// data filters: c > 1 AND d < 9
67+
val plan1 = df.where("a > 1 AND b < 9 AND c > 1 AND d < 9").queryExecution.sparkPlan
68+
val plan2 = df.where("b < 9 AND a > 1 AND d < 9 AND c > 1").queryExecution.sparkPlan
69+
assert(plan1.sameResult(plan2))
70+
val scan1 = getBatchScanExec(plan1)
71+
val scan2 = getBatchScanExec(plan2)
72+
assert(scan1.sameResult(scan2))
73+
val plan3 = df.where("b < 9 AND a > 1 AND d < 8 AND c > 1").queryExecution.sparkPlan
74+
assert(!plan1.sameResult(plan3))
75+
// The [[FileScan]]s should have different results if they support filter pushdown.
76+
if (format == "orc") {
77+
val scan3 = getBatchScanExec(plan3)
78+
assert(!scan1.sameResult(scan3))
79+
}
80+
}
81+
}
82+
}
83+
}
84+
85+
test("TextScan") {
86+
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") {
87+
withTempPath { path =>
88+
val tmpDir = path.getCanonicalPath
89+
spark.range(10)
90+
.selectExpr("id as a", "id + 1 as b", "cast(id as string) value")
91+
.write
92+
.partitionBy("a", "b")
93+
.text(tmpDir)
94+
val df = spark.read.text(tmpDir)
95+
// partition filters: a > 1 AND b < 9
96+
// data filters: c > 1 AND d < 9
97+
val plan1 = df.where("a > 1 AND b < 9 AND value == '3'").queryExecution.sparkPlan
98+
val plan2 = df.where("value == '3' AND a > 1 AND b < 9").queryExecution.sparkPlan
99+
assert(plan1.sameResult(plan2))
100+
val scan1 = getBatchScanExec(plan1)
101+
val scan2 = getBatchScanExec(plan2)
102+
assert(scan1.sameResult(scan2))
103+
}
104+
}
105+
}
106+
107+
private def getBatchScanExec(plan: SparkPlan): BatchScanExec = {
108+
plan.find(_.isInstanceOf[BatchScanExec]).get.asInstanceOf[BatchScanExec]
109+
}
110+
50111
private def getFileSourceScanExec(df: DataFrame): FileSourceScanExec = {
51112
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
52113
.asInstanceOf[FileSourceScanExec]

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -414,19 +414,25 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
414414
}
415415

416416
test("[SPARK-16818] partition pruned file scans implement sameResult correctly") {
417-
withTempPath { path =>
418-
val tempDir = path.getCanonicalPath
419-
spark.range(100)
420-
.selectExpr("id", "id as b")
421-
.write
422-
.partitionBy("id")
423-
.parquet(tempDir)
424-
val df = spark.read.parquet(tempDir)
425-
def getPlan(df: DataFrame): SparkPlan = {
426-
df.queryExecution.executedPlan
417+
Seq("orc", "").foreach { useV1ReaderList =>
418+
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1ReaderList) {
419+
withTempPath { path =>
420+
val tempDir = path.getCanonicalPath
421+
spark.range(100)
422+
.selectExpr("id", "id as b")
423+
.write
424+
.partitionBy("id")
425+
.orc(tempDir)
426+
val df = spark.read.orc(tempDir)
427+
428+
def getPlan(df: DataFrame): SparkPlan = {
429+
df.queryExecution.executedPlan
430+
}
431+
432+
assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 2"))))
433+
assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 3"))))
434+
}
427435
}
428-
assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 2"))))
429-
assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 3"))))
430436
}
431437
}
432438

0 commit comments

Comments
 (0)