Skip to content

Commit 7a5647a

Browse files
wangyumcloud-fan
authored andcommitted
[SPARK-33385][SQL] Support bucket pruning for IsNaN
### What changes were proposed in this pull request? This pr add support bucket pruning on `IsNaN` predicate. ### Why are the changes needed? Improve query performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #30291 from wangyum/SPARK-33385. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 69799c5 commit 7a5647a

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
2525
import org.apache.spark.sql.catalyst.planning.ScanOperation
2626
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2727
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
28+
import org.apache.spark.sql.types.{DoubleType, FloatType}
2829
import org.apache.spark.util.collection.BitSet
2930

3031
/**
@@ -93,6 +94,12 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
9394
getBucketSetFromIterable(a, hset)
9495
case expressions.IsNull(a: Attribute) if a.name == bucketColumnName =>
9596
getBucketSetFromValue(a, null)
97+
case expressions.IsNaN(a: Attribute)
98+
if a.name == bucketColumnName && a.dataType == FloatType =>
99+
getBucketSetFromValue(a, Float.NaN)
100+
case expressions.IsNaN(a: Attribute)
101+
if a.name == bucketColumnName && a.dataType == DoubleType =>
102+
getBucketSetFromValue(a, Double.NaN)
96103
case expressions.And(left, right) =>
97104
getExpressionBuckets(left, bucketColumnName, numBuckets) &
98105
getExpressionBuckets(right, bucketColumnName, numBuckets)

sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
113113
// 2) Verify the final result is the same as the expected one
114114
private def checkPrunedAnswers(
115115
bucketSpec: BucketSpec,
116-
bucketValues: Seq[Integer],
116+
bucketValues: Seq[Any],
117117
filterCondition: Column,
118118
originalDataFrame: DataFrame): Unit = {
119119
// This test verifies parts of the plan. Disable whole stage codegen.
@@ -245,6 +245,25 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
245245
}
246246
}
247247

248+
test("bucket pruning support IsNaN") {
249+
withTable("bucketed_table") {
250+
val numBuckets = NumBucketsForPruningNullDf
251+
val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil)
252+
val naNDF = nullDF.selectExpr("i", "cast(if(isnull(j), 'NaN', j) as double) as j", "k")
253+
// json does not support predicate push-down, and thus json is used here
254+
naNDF.write
255+
.format("json")
256+
.bucketBy(numBuckets, "j")
257+
.saveAsTable("bucketed_table")
258+
259+
checkPrunedAnswers(
260+
bucketSpec,
261+
bucketValues = Double.NaN :: Nil,
262+
filterCondition = $"j".isNaN,
263+
naNDF)
264+
}
265+
}
266+
248267
test("read partitioning bucketed tables having composite filters") {
249268
withTable("bucketed_table") {
250269
val numBuckets = NumBucketsForPruningDF

0 commit comments

Comments
 (0)