File tree Expand file tree Collapse file tree 3 files changed +12
-4
lines changed
sql/core/src/main/scala/org/apache/spark/sql/parquet Expand file tree Collapse file tree 3 files changed +12
-4
lines changed Original file line number Diff line number Diff line change @@ -31,6 +31,10 @@ import org.apache.spark.sql.execution.SparkSqlSerializer
3131
3232object ParquetFilters {
3333 val PARQUET_FILTER_DATA = " org.apache.spark.sql.parquet.row.filter"
34+ // set this to false if pushdown should be disabled
35+ // Note: prefix is "spark.hadoop." so that it will be copied from SparkConf
36+ // to Hadoop configuration
37+ val PARQUET_FILTER_PUSHDOWN_ENABLED = " org.apache.spark.sql.parquet.filter.pushdown"
3438
3539 def createFilter (filterExpressions : Seq [Expression ]): UnboundRecordFilter = {
3640 def createEqualityFilter (name : String , literal : Literal ) = literal.dataType match {
Original file line number Diff line number Diff line change @@ -71,10 +71,13 @@ case class ParquetTableScan(
7171 ParquetTypesConverter .convertFromAttributes(output).toString)
7272
7373 // Store record filtering predicate in `Configuration`
74- // Note: the input format ignores all predicates that cannot be expressed
74+ // Note 1 : the input format ignores all predicates that cannot be expressed
7575 // as simple column predicate filters in Parquet. Here we just record
7676 // the whole pruning predicate.
77- if (columnPruningPred.isDefined) {
77+ // Note 2: you can disable filter predicate pushdown by setting
78+ // "org.apache.spark.sql.parquet.filter.pushdown" to false inside SparkConf.
79+ if (columnPruningPred.isDefined &&
80+ sc.conf.getBoolean(ParquetFilters .PARQUET_FILTER_PUSHDOWN_ENABLED , true )) {
7881 ParquetFilters .serializeFilterExpressions(columnPruningPred.get, conf)
7982 }
8083
Original file line number Diff line number Diff line change @@ -130,14 +130,15 @@ private[sql] object ParquetTestData {
130130 writer.close()
131131 }
132132
133- def writeFilterFile () = {
133+ def writeFilterFile (records : Int = 200 ) = {
134+ // for microbenchmark use: records = 300000000
134135 testFilterDir.delete
135136 val path : Path = new Path (new Path (testFilterDir.toURI), new Path (" part-r-0.parquet" ))
136137 val schema : MessageType = MessageTypeParser .parseMessageType(testFilterSchema)
137138 val writeSupport = new TestGroupWriteSupport (schema)
138139 val writer = new ParquetWriter [Group ](path, writeSupport)
139140
140- for (i <- 0 to 200 ) {
141+ for (i <- 0 to records ) {
141142 val record = new SimpleGroup (schema)
142143 if (i % 4 == 0 ) {
143144 record.add(0 , true )
You can’t perform that action at this time.
0 commit comments