Skip to content

Commit d57f44c

Browse files
committed
ParquetFilters SQLConf.get
1 parent b9b3160 commit d57f44c

File tree

4 files changed

+18
-13
lines changed

4 files changed

+18
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -381,12 +381,12 @@ object SQLConf {
381381
val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
382382
buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
383383
.doc("The maximum number of values to filter push-down optimization for IN predicate. " +
384-
"Large threshold will not provide much better performance. " +
384+
"Large threshold won't necessarily provide much better performance. " +
385+
"The experiment argued that 300 is the limit threshold. " +
385386
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
386387
.internal()
387388
.intConf
388-
.checkValue(threshold => threshold > 0 && threshold <= 300,
389-
"The threshold must be greater than 0 and less than 300.")
389+
.checkValue(threshold => threshold > 0, "The threshold must be greater than 0.")
390390
.createWithDefault(10)
391391

392392
val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,6 @@ class ParquetFileFormat
341341
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
342342
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
343343
val returningBatch = supportBatch(sparkSession, resultSchema)
344-
val pushDownDate = sqlConf.parquetFilterPushDownDate
345-
val inThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
346344

347345
(file: PartitionedFile) => {
348346
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -353,7 +351,7 @@ class ParquetFileFormat
353351
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
354352
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
355353
// is used here.
356-
.flatMap(new ParquetFilters(pushDownDate, inThreshold).createFilter(requiredSchema, _))
354+
.flatMap(new ParquetFilters().createFilter(requiredSchema, _))
357355
.reduceOption(FilterApi.and)
358356
} else {
359357
None

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,19 @@ import org.apache.parquet.io.api.Binary
2525

2626
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2727
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
28+
import org.apache.spark.sql.internal.SQLConf
2829
import org.apache.spark.sql.sources
2930
import org.apache.spark.sql.types._
3031

3132
/**
3233
* Some utility function to convert Spark data source filters to Parquet filters.
3334
*/
34-
private[parquet] class ParquetFilters(pushDownDate: Boolean, inFilterThreshold: Int) {
35+
private[parquet] class ParquetFilters {
36+
37+
val sqlConf: SQLConf = SQLConf.get
38+
39+
val pushDownDate = sqlConf.parquetFilterPushDownDate
40+
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
3541

3642
private def dateToDays(date: Date): SQLDate = {
3743
DateTimeUtils.fromJavaDate(date)
@@ -271,7 +277,7 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, inFilterThreshold:
271277
createFilter(schema, pred).map(FilterApi.not)
272278

273279
case sources.In(name, values)
274-
if canMakeFilterOn(name) && values.distinct.length <= inFilterThreshold =>
280+
if canMakeFilterOn(name) && values.distinct.length <= pushDownInFilterThreshold =>
275281
values.distinct.flatMap { v =>
276282
makeEq.lift(nameToType(name)).map(_(name, v))
277283
}.reduceLeftOption(FilterApi.or)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
5555
*/
5656
class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {
5757

58-
private lazy val parquetFilters =
59-
new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownInFilterThreshold)
58+
private lazy val parquetFilters = new ParquetFilters()
6059

6160
override def beforeEach(): Unit = {
6261
super.beforeEach()
@@ -702,8 +701,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
702701

703702
import testImplicits._
704703
withTempPath { path =>
705-
(0 to 1024).toDF("a").coalesce(1)
706-
.write.option("parquet.block.size", 512)
704+
(0 to 1024).toDF("a").selectExpr("if (a = 1024, null, a) AS a") // convert 1024 to null
705+
.coalesce(1).write.option("parquet.block.size", 512)
707706
.parquet(path.getAbsolutePath)
708707
val df = spark.read.parquet(path.getAbsolutePath)
709708
Seq(true, false).foreach { pushEnabled =>
@@ -712,7 +711,9 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
712711
Seq(1, 5, 10, 11, 1000).foreach { count =>
713712
assert(df.where(s"a in(${Range(0, count).mkString(",")})").count() === count)
714713
}
715-
assert(df.where(s"a in(null)").count() === 0)
714+
assert(df.where("a in(null)").count() === 0)
715+
assert(df.where("a = null").count() === 0)
716+
assert(df.where("a is null").count() === 1)
716717
}
717718
}
718719
}

0 commit comments

Comments
 (0)