Skip to content

Commit d2b5f67

Browse files
AndreSchumacherpdeyhim
authored andcommitted
SPARK-1487 [SQL] Support record filtering via predicate pushdown in Parquet
Simple filter predicates such as LessThan, GreaterThan, etc., where one side is a literal and the other one a NamedExpression are now pushed down to the underlying ParquetTableScan. Here are some results for a microbenchmark with a simple schema of six fields of different types where most records failed the test: | Uncompressed | Compressed -------------| ------------- | ------------- File size | 10 GB | 2 GB Speedup | 2 | 1.8 Since mileage may vary I added a new option to SparkConf: `org.apache.spark.sql.parquet.filter.pushdown` Default value would be `true` and setting it to `false` disables the pushdown. When most rows are expected to pass the filter or when there are few fields performance can be better when pushdown is disabled. The default should fit situations with a reasonable number of (possibly nested) fields where not too many records on average pass the filter. Because of an issue with Parquet ([see here](https://github.com/Parquet/parquet-mr/issues/371])) currently only predicates on non-nullable attributes are pushed down. If one would know that for a given table no optional fields have missing values one could also allow overriding this. Author: Andre Schumacher <[email protected]> Closes apache#511 from AndreSchumacher/parquet_filter and squashes the following commits: 16bfe83 [Andre Schumacher] Removing leftovers from merge during rebase 7b304ca [Andre Schumacher] Fixing formatting c36d5cb [Andre Schumacher] Scalastyle 3da98db [Andre Schumacher] Second round of review feedback 7a78265 [Andre Schumacher] Fixing broken formatting in ParquetFilter a86553b [Andre Schumacher] First round of code review feedback b0f7806 [Andre Schumacher] Optimizing imports in ParquetTestData 85fea2d [Andre Schumacher] Adding SparkConf setting to disable filter predicate pushdown f0ad3cf [Andre Schumacher] Undoing changes not needed for this PR 210e9cb [Andre Schumacher] Adding disjunctive filter predicates a93a588 [Andre Schumacher] Adding unit test for filtering 6d22666 [Andre Schumacher] Extending ParquetFilters 93e8192 [Andre Schumacher] First commit Parquet record filtering
1 parent 50fd07a commit d2b5f67

File tree

5 files changed

+731
-51
lines changed

5 files changed

+731
-51
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,35 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
140140
InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
141141
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
142142
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
143-
case PhysicalOperation(projectList, filters, relation: ParquetRelation) =>
144-
// TODO: Should be pushing down filters as well.
143+
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => {
144+
val remainingFilters =
145+
if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
146+
filters.filter {
147+
// Note: filters cannot be pushed down to Parquet if they contain more complex
148+
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove
149+
// all filters that have been pushed down. Note that a predicate such as
150+
// "(A AND B) OR C" can result in "A OR C" being pushed down.
151+
filter =>
152+
val recordFilter = ParquetFilters.createFilter(filter)
153+
if (!recordFilter.isDefined) {
154+
// First case: the pushdown did not result in any record filter.
155+
true
156+
} else {
157+
// Second case: a record filter was created; here we are conservative in
158+
// the sense that even if "A" was pushed and we check for "A AND B" we
159+
// still want to keep "A AND B" in the higher-level filter, not just "B".
160+
!ParquetFilters.findExpression(recordFilter.get, filter).isDefined
161+
}
162+
}
163+
} else {
164+
filters
165+
}
145166
pruneFilterProject(
146167
projectList,
147-
filters,
148-
ParquetTableScan(_, relation, None)(sparkContext)) :: Nil
168+
remainingFilters,
169+
ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil
170+
}
171+
149172
case _ => Nil
150173
}
151174
}

0 commit comments

Comments
 (0)