Skip to content

Commit 860849d

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-31365][SQL] Enable nested predicate pushdown per data sources
### What changes were proposed in this pull request? This patch proposes to replace `NESTED_PREDICATE_PUSHDOWN_ENABLED` with `NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST` which can configure which v1 data sources are enabled with nested predicate pushdown. ### Why are the changes needed? We added nested predicate pushdown feature that is configured by `NESTED_PREDICATE_PUSHDOWN_ENABLED`. However, this config is all or nothing config, and applies on all data sources. In order to not introduce API breaking change after enabling nested predicate pushdown, we'd like to set nested predicate pushdown per data sources. Please also refer to the comments #27728 (comment). ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added/Modified unit tests. Closes #28366 from viirya/SPARK-31365. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 4952f1a) Signed-off-by: Wenchen Fan <[email protected]>
1 parent fcd566e commit 860849d

File tree

11 files changed

+186
-100
lines changed

11 files changed

+186
-100
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2063,16 +2063,17 @@ object SQLConf {
20632063
.booleanConf
20642064
.createWithDefault(true)
20652065

2066-
val NESTED_PREDICATE_PUSHDOWN_ENABLED =
2067-
buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled")
2068-
.internal()
2069-
.doc("When true, Spark tries to push down predicates for nested columns and or names " +
2070-
"containing `dots` to data sources. Currently, Parquet implements both optimizations " +
2071-
"while ORC only supports predicates for names containing `dots`. The other data sources" +
2072-
"don't support this feature yet.")
2066+
val NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST =
2067+
buildConf("spark.sql.optimizer.nestedPredicatePushdown.supportedFileSources")
2068+
.internal()
2069+
.doc("A comma-separated list of data source short names or fully qualified data source " +
2070+
"implementation class names for which Spark tries to push down predicates for nested " +
2071+
"columns and/or names containing `dots` to data sources. Currently, Parquet implements " +
2072+
"both optimizations while ORC only supports predicates for names containing `dots`. The " +
2073+
"other data sources don't support this feature yet. So the default value is 'parquet,orc'.")
20732074
.version("3.0.0")
2074-
.booleanConf
2075-
.createWithDefault(true)
2075+
.stringConf
2076+
.createWithDefault("parquet,orc")
20762077

20772078
val SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED =
20782079
buildConf("spark.sql.optimizer.serializer.nestedSchemaPruning.enabled")
@@ -3077,8 +3078,6 @@ class SQLConf extends Serializable with Logging {
30773078

30783079
def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED)
30793080

3080-
def nestedPredicatePushdownEnabled: Boolean = getConf(NESTED_PREDICATE_PUSHDOWN_ENABLED)
3081-
30823081
def serializerNestedSchemaPruningEnabled: Boolean =
30833082
getConf(SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED)
30843083

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,10 @@ case class FileSourceScanExec(
326326
}
327327

328328
@transient
329-
private lazy val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
329+
private lazy val pushedDownFilters = {
330+
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
331+
dataFilters.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
332+
}
330333

331334
override lazy val metadata: Map[String, String] = {
332335
def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 62 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -448,60 +448,62 @@ object DataSourceStrategy {
448448
}
449449
}
450450

451-
private def translateLeafNodeFilter(predicate: Expression): Option[Filter] = predicate match {
452-
case expressions.EqualTo(PushableColumn(name), Literal(v, t)) =>
451+
private def translateLeafNodeFilter(
452+
predicate: Expression,
453+
pushableColumn: PushableColumnBase): Option[Filter] = predicate match {
454+
case expressions.EqualTo(pushableColumn(name), Literal(v, t)) =>
453455
Some(sources.EqualTo(name, convertToScala(v, t)))
454-
case expressions.EqualTo(Literal(v, t), PushableColumn(name)) =>
456+
case expressions.EqualTo(Literal(v, t), pushableColumn(name)) =>
455457
Some(sources.EqualTo(name, convertToScala(v, t)))
456458

457-
case expressions.EqualNullSafe(PushableColumn(name), Literal(v, t)) =>
459+
case expressions.EqualNullSafe(pushableColumn(name), Literal(v, t)) =>
458460
Some(sources.EqualNullSafe(name, convertToScala(v, t)))
459-
case expressions.EqualNullSafe(Literal(v, t), PushableColumn(name)) =>
461+
case expressions.EqualNullSafe(Literal(v, t), pushableColumn(name)) =>
460462
Some(sources.EqualNullSafe(name, convertToScala(v, t)))
461463

462-
case expressions.GreaterThan(PushableColumn(name), Literal(v, t)) =>
464+
case expressions.GreaterThan(pushableColumn(name), Literal(v, t)) =>
463465
Some(sources.GreaterThan(name, convertToScala(v, t)))
464-
case expressions.GreaterThan(Literal(v, t), PushableColumn(name)) =>
466+
case expressions.GreaterThan(Literal(v, t), pushableColumn(name)) =>
465467
Some(sources.LessThan(name, convertToScala(v, t)))
466468

467-
case expressions.LessThan(PushableColumn(name), Literal(v, t)) =>
469+
case expressions.LessThan(pushableColumn(name), Literal(v, t)) =>
468470
Some(sources.LessThan(name, convertToScala(v, t)))
469-
case expressions.LessThan(Literal(v, t), PushableColumn(name)) =>
471+
case expressions.LessThan(Literal(v, t), pushableColumn(name)) =>
470472
Some(sources.GreaterThan(name, convertToScala(v, t)))
471473

472-
case expressions.GreaterThanOrEqual(PushableColumn(name), Literal(v, t)) =>
474+
case expressions.GreaterThanOrEqual(pushableColumn(name), Literal(v, t)) =>
473475
Some(sources.GreaterThanOrEqual(name, convertToScala(v, t)))
474-
case expressions.GreaterThanOrEqual(Literal(v, t), PushableColumn(name)) =>
476+
case expressions.GreaterThanOrEqual(Literal(v, t), pushableColumn(name)) =>
475477
Some(sources.LessThanOrEqual(name, convertToScala(v, t)))
476478

477-
case expressions.LessThanOrEqual(PushableColumn(name), Literal(v, t)) =>
479+
case expressions.LessThanOrEqual(pushableColumn(name), Literal(v, t)) =>
478480
Some(sources.LessThanOrEqual(name, convertToScala(v, t)))
479-
case expressions.LessThanOrEqual(Literal(v, t), PushableColumn(name)) =>
481+
case expressions.LessThanOrEqual(Literal(v, t), pushableColumn(name)) =>
480482
Some(sources.GreaterThanOrEqual(name, convertToScala(v, t)))
481483

482-
case expressions.InSet(e @ PushableColumn(name), set) =>
484+
case expressions.InSet(e @ pushableColumn(name), set) =>
483485
val toScala = CatalystTypeConverters.createToScalaConverter(e.dataType)
484486
Some(sources.In(name, set.toArray.map(toScala)))
485487

486488
// Because we only convert In to InSet in Optimizer when there are more than certain
487489
// items. So it is possible we still get an In expression here that needs to be pushed
488490
// down.
489-
case expressions.In(e @ PushableColumn(name), list) if list.forall(_.isInstanceOf[Literal]) =>
491+
case expressions.In(e @ pushableColumn(name), list) if list.forall(_.isInstanceOf[Literal]) =>
490492
val hSet = list.map(_.eval(EmptyRow))
491493
val toScala = CatalystTypeConverters.createToScalaConverter(e.dataType)
492494
Some(sources.In(name, hSet.toArray.map(toScala)))
493495

494-
case expressions.IsNull(PushableColumn(name)) =>
496+
case expressions.IsNull(pushableColumn(name)) =>
495497
Some(sources.IsNull(name))
496-
case expressions.IsNotNull(PushableColumn(name)) =>
498+
case expressions.IsNotNull(pushableColumn(name)) =>
497499
Some(sources.IsNotNull(name))
498-
case expressions.StartsWith(PushableColumn(name), Literal(v: UTF8String, StringType)) =>
500+
case expressions.StartsWith(pushableColumn(name), Literal(v: UTF8String, StringType)) =>
499501
Some(sources.StringStartsWith(name, v.toString))
500502

501-
case expressions.EndsWith(PushableColumn(name), Literal(v: UTF8String, StringType)) =>
503+
case expressions.EndsWith(pushableColumn(name), Literal(v: UTF8String, StringType)) =>
502504
Some(sources.StringEndsWith(name, v.toString))
503505

504-
case expressions.Contains(PushableColumn(name), Literal(v: UTF8String, StringType)) =>
506+
case expressions.Contains(pushableColumn(name), Literal(v: UTF8String, StringType)) =>
505507
Some(sources.StringContains(name, v.toString))
506508

507509
case expressions.Literal(true, BooleanType) =>
@@ -518,8 +520,9 @@ object DataSourceStrategy {
518520
*
519521
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
520522
*/
521-
protected[sql] def translateFilter(predicate: Expression): Option[Filter] = {
522-
translateFilterWithMapping(predicate, None)
523+
protected[sql] def translateFilter(
524+
predicate: Expression, supportNestedPredicatePushdown: Boolean): Option[Filter] = {
525+
translateFilterWithMapping(predicate, None, supportNestedPredicatePushdown)
523526
}
524527

525528
/**
@@ -529,11 +532,13 @@ object DataSourceStrategy {
529532
* @param translatedFilterToExpr An optional map from leaf node filter expressions to its
530533
* translated [[Filter]]. The map is used for rebuilding
531534
* [[Expression]] from [[Filter]].
535+
* @param nestedPredicatePushdownEnabled Whether nested predicate pushdown is enabled.
532536
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
533537
*/
534538
protected[sql] def translateFilterWithMapping(
535539
predicate: Expression,
536-
translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]])
540+
translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]],
541+
nestedPredicatePushdownEnabled: Boolean)
537542
: Option[Filter] = {
538543
predicate match {
539544
case expressions.And(left, right) =>
@@ -547,21 +552,26 @@ object DataSourceStrategy {
547552
// Pushing one leg of AND down is only safe to do at the top level.
548553
// You can see ParquetFilters' createFilter for more details.
549554
for {
550-
leftFilter <- translateFilterWithMapping(left, translatedFilterToExpr)
551-
rightFilter <- translateFilterWithMapping(right, translatedFilterToExpr)
555+
leftFilter <- translateFilterWithMapping(
556+
left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
557+
rightFilter <- translateFilterWithMapping(
558+
right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
552559
} yield sources.And(leftFilter, rightFilter)
553560

554561
case expressions.Or(left, right) =>
555562
for {
556-
leftFilter <- translateFilterWithMapping(left, translatedFilterToExpr)
557-
rightFilter <- translateFilterWithMapping(right, translatedFilterToExpr)
563+
leftFilter <- translateFilterWithMapping(
564+
left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
565+
rightFilter <- translateFilterWithMapping(
566+
right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
558567
} yield sources.Or(leftFilter, rightFilter)
559568

560569
case expressions.Not(child) =>
561-
translateFilterWithMapping(child, translatedFilterToExpr).map(sources.Not)
570+
translateFilterWithMapping(child, translatedFilterToExpr, nestedPredicatePushdownEnabled)
571+
.map(sources.Not)
562572

563573
case other =>
564-
val filter = translateLeafNodeFilter(other)
574+
val filter = translateLeafNodeFilter(other, PushableColumn(nestedPredicatePushdownEnabled))
565575
if (filter.isDefined && translatedFilterToExpr.isDefined) {
566576
translatedFilterToExpr.get(filter.get) = predicate
567577
}
@@ -608,8 +618,9 @@ object DataSourceStrategy {
608618

609619
// A map from original Catalyst expressions to corresponding translated data source filters.
610620
// If a predicate is not in this map, it means it cannot be pushed down.
621+
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
611622
val translatedMap: Map[Expression, Filter] = predicates.flatMap { p =>
612-
translateFilter(p).map(f => p -> f)
623+
translateFilter(p, supportNestedPredicatePushdown).map(f => p -> f)
613624
}.toMap
614625

615626
val pushedFilters: Seq[Filter] = translatedMap.values.toSeq
@@ -650,9 +661,10 @@ object DataSourceStrategy {
650661
/**
651662
* Find the column name of an expression that can be pushed down.
652663
*/
653-
object PushableColumn {
664+
abstract class PushableColumnBase {
665+
val nestedPredicatePushdownEnabled: Boolean
666+
654667
def unapply(e: Expression): Option[String] = {
655-
val nestedPredicatePushdownEnabled = SQLConf.get.nestedPredicatePushdownEnabled
656668
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
657669
def helper(e: Expression): Option[Seq[String]] = e match {
658670
case a: Attribute =>
@@ -668,3 +680,21 @@ object PushableColumn {
668680
helper(e).map(_.quoted)
669681
}
670682
}
683+
684+
object PushableColumn {
685+
def apply(nestedPredicatePushdownEnabled: Boolean): PushableColumnBase = {
686+
if (nestedPredicatePushdownEnabled) {
687+
PushableColumnAndNestedColumn
688+
} else {
689+
PushableColumnWithoutNestedColumn
690+
}
691+
}
692+
}
693+
694+
object PushableColumnAndNestedColumn extends PushableColumnBase {
695+
override val nestedPredicatePushdownEnabled = true
696+
}
697+
698+
object PushableColumnWithoutNestedColumn extends PushableColumnBase {
699+
override val nestedPredicatePushdownEnabled = false
700+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20+
import java.util.Locale
21+
2022
import org.apache.hadoop.fs.Path
2123
import org.json4s.NoTypeHints
2224
import org.json4s.jackson.Serialization
2325

2426
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
2527
import org.apache.spark.sql.AnalysisException
2628
import org.apache.spark.sql.internal.SQLConf
29+
import org.apache.spark.sql.sources.BaseRelation
2730
import org.apache.spark.sql.types._
2831
import org.apache.spark.util.Utils
2932

@@ -68,6 +71,19 @@ object DataSourceUtils {
6871
private[sql] def isDataFile(fileName: String) =
6972
!(fileName.startsWith("_") || fileName.startsWith("."))
7073

74+
/**
75+
* Returns if the given relation's V1 datasource provider supports nested predicate pushdown.
76+
*/
77+
private[sql] def supportNestedPredicatePushdown(relation: BaseRelation): Boolean =
78+
relation match {
79+
case hs: HadoopFsRelation =>
80+
val supportedDatasources =
81+
Utils.stringToSeq(SQLConf.get.getConf(SQLConf.NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST)
82+
.toLowerCase(Locale.ROOT))
83+
supportedDatasources.contains(hs.toString)
84+
case _ => false
85+
}
86+
7187
def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
7288
if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
7389
return Some(false)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,11 @@ object FileSourceStrategy extends Strategy with Logging {
178178
// Partition keys are not available in the statistics of the files.
179179
val dataFilters =
180180
normalizedFiltersWithoutSubqueries.filter(_.references.intersect(partitionSet).isEmpty)
181-
logInfo(s"Pushed Filters: " +
182-
s"${dataFilters.flatMap(DataSourceStrategy.translateFilter).mkString(",")}")
181+
val supportNestedPredicatePushdown =
182+
DataSourceUtils.supportNestedPredicatePushdown(fsRelation)
183+
val pushedFilters = dataFilters
184+
.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
185+
logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}")
183186

184187
// Predicates with both partition keys and attributes need to be evaluated after the scan.
185188
val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
180180
case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) =>
181181
// fail if any filter cannot be converted. correctness depends on removing all matching data.
182182
val filters = splitConjunctivePredicates(deleteExpr).map {
183-
filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse(
184-
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
183+
filter => DataSourceStrategy.translateFilter(deleteExpr,
184+
supportNestedPredicatePushdown = true).getOrElse(
185+
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
185186
}.toArray
186187
r.table.asWritable match {
187188
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
@@ -205,7 +206,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
205206
// correctness depends on removing all matching data.
206207
val filters = DataSourceStrategy.normalizeExprs(condition.toSeq, output)
207208
.flatMap(splitConjunctivePredicates(_).map {
208-
f => DataSourceStrategy.translateFilter(f).getOrElse(
209+
f => DataSourceStrategy.translateFilter(f, true).getOrElse(
209210
throw new AnalysisException(s"Exec update failed:" +
210211
s" cannot translate expression to source filter: $f"))
211212
}).toArray

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ object PushDownUtils extends PredicateHelper {
4848

4949
for (filterExpr <- filters) {
5050
val translated =
51-
DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr))
51+
DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr),
52+
nestedPredicatePushdownEnabled = true)
5253
if (translated.isEmpty) {
5354
untranslatableExprs += filterExpr
5455
} else {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {
5959

6060
val wrappedScan = scan match {
6161
case v1: V1Scan =>
62-
val translated = filters.flatMap(DataSourceStrategy.translateFilter)
62+
val translated = filters.flatMap(DataSourceStrategy.translateFilter(_, true))
6363
V1ScanWrapper(v1, translated, pushedFilters)
6464
case _ => scan
6565
}

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,12 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark {
4848
private def addCase(
4949
benchmark: Benchmark,
5050
inputPath: String,
51-
enableNestedPD: Boolean,
51+
enableNestedPD: String,
5252
name: String,
5353
withFilter: DataFrame => DataFrame): Unit = {
5454
val loadDF = spark.read.parquet(inputPath)
5555
benchmark.addCase(name) { _ =>
56-
withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_ENABLED.key, enableNestedPD.toString)) {
56+
withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST.key, enableNestedPD)) {
5757
withFilter(loadDF).noop()
5858
}
5959
}
@@ -67,13 +67,13 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark {
6767
addCase(
6868
benchmark,
6969
outputPath,
70-
enableNestedPD = false,
70+
enableNestedPD = "",
7171
"Without nested predicate Pushdown",
7272
withFilter)
7373
addCase(
7474
benchmark,
7575
outputPath,
76-
enableNestedPD = true,
76+
enableNestedPD = "parquet",
7777
"With nested predicate Pushdown",
7878
withFilter)
7979
benchmark.run()

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -289,14 +289,26 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession {
289289
test("SPARK-31027 test `PushableColumn.unapply` that finds the column name of " +
290290
"an expression that can be pushed down") {
291291
attrInts.foreach { case (attrInt, colName) =>
292-
assert(PushableColumn.unapply(attrInt) === Some(colName))
292+
assert(PushableColumnAndNestedColumn.unapply(attrInt) === Some(colName))
293+
294+
if (colName.contains(".")) {
295+
assert(PushableColumnWithoutNestedColumn.unapply(attrInt) === None)
296+
} else {
297+
assert(PushableColumnWithoutNestedColumn.unapply(attrInt) === Some(colName))
298+
}
293299
}
294300
attrStrs.foreach { case (attrStr, colName) =>
295-
assert(PushableColumn.unapply(attrStr) === Some(colName))
301+
assert(PushableColumnAndNestedColumn.unapply(attrStr) === Some(colName))
302+
303+
if (colName.contains(".")) {
304+
assert(PushableColumnWithoutNestedColumn.unapply(attrStr) === None)
305+
} else {
306+
assert(PushableColumnWithoutNestedColumn.unapply(attrStr) === Some(colName))
307+
}
296308
}
297309

298310
// `Abs(col)` can not be pushed down, so it returns `None`
299-
assert(PushableColumn.unapply(Abs('col.int)) === None)
311+
assert(PushableColumnAndNestedColumn.unapply(Abs('col.int)) === None)
300312
}
301313

302314
/**
@@ -305,7 +317,7 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession {
305317
*/
306318
def testTranslateFilter(catalystFilter: Expression, result: Option[sources.Filter]): Unit = {
307319
assertResult(result) {
308-
DataSourceStrategy.translateFilter(catalystFilter)
320+
DataSourceStrategy.translateFilter(catalystFilter, true)
309321
}
310322
}
311323
}

0 commit comments

Comments
 (0)