Skip to content

Commit 0f43db6

Browse files
committed
push down more parquet filters
1 parent faf73dc commit 0f43db6

File tree

2 files changed

+121
-41
lines changed

2 files changed

+121
-41
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,13 @@ private[parquet] class ParquetFilters(
534534
createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts = false)
535535
} yield FilterApi.or(lhsFilter, rhsFilter)
536536

537+
case sources.Not(sources.Or(lhs, rhs)) if canPartialPushDownConjuncts =>
538+
createFilterHelper(nameToParquetField,
539+
sources.And(sources.Not(lhs), sources.Not(rhs)), canPartialPushDownConjuncts = true)
540+
541+
case sources.Not(sources.Not(pred)) if canPartialPushDownConjuncts =>
542+
createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = true)
543+
537544
case sources.Not(pred) =>
538545
createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = false)
539546
.map(FilterApi.not)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 114 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -770,10 +770,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
770770
sources.GreaterThan("c", 1.5D)))
771771
}
772772

773-
// Testing when `canRemoveOneSideInAnd == true`
773+
// Testing when `canPartialPushDownConjuncts == true`
774774
// case sources.And(lhs, rhs) =>
775775
// ...
776-
// case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
776+
// case (Some(lhsFilter), None) if canPartialPushDownConjuncts => Some(lhsFilter)
777777
assertResult(Some(lt(intColumn("a"), 10: Integer))) {
778778
parquetFilters.createFilter(
779779
parquetSchema,
@@ -782,10 +782,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
782782
sources.StringContains("b", "prefix")))
783783
}
784784

785-
// Testing when `canRemoveOneSideInAnd == true`
785+
// Testing when `canPartialPushDownConjuncts == true`
786786
// case sources.And(lhs, rhs) =>
787787
// ...
788-
// case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
788+
// case (None, Some(rhsFilter)) if canPartialPushDownConjuncts => Some(rhsFilter)
789789
assertResult(Some(lt(intColumn("a"), 10: Integer))) {
790790
parquetFilters.createFilter(
791791
parquetSchema,
@@ -819,11 +819,39 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
819819
sources.LessThan("a", 10)
820820
)))
821821
}
822+
}
823+
824+
test("Converting disjunctions into Parquet filter predicates") {
825+
val schema = StructType(Seq(
826+
StructField("a", IntegerType, nullable = false),
827+
StructField("b", StringType, nullable = true)
828+
))
829+
830+
val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
831+
832+
// Testing
833+
// case sources.Or(lhs, rhs) =>
834+
// ...
835+
// lhsFilter <- createFilterHelper(nameToParquetField, lhs, false)
836+
assertResult(Some(
837+
FilterApi.or(gt(intColumn("a"), 2: Integer), lt(intColumn("a"), 1: Integer)))) {
838+
parquetFilters.createFilter(
839+
parquetSchema,
840+
sources.Or(
841+
sources.GreaterThan("a", 2),
842+
sources.LessThan("a", 1)))
843+
}
822844

823845
// Testing
824846
// case sources.Or(lhs, rhs) =>
825847
// ...
826-
// lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false)
848+
// lhsFilter <- createFilterHelper(nameToParquetField, lhs, false)
849+
//
850+
// and
851+
//
852+
// case sources.And(lhs, rhs) =>
853+
// ...
854+
// case _ => None
827855
assertResult(None) {
828856
parquetFilters.createFilter(
829857
parquetSchema,
@@ -834,10 +862,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
834862
sources.GreaterThan("a", 2)))
835863
}
836864

837-
// Testing
838-
// case sources.Or(lhs, rhs) =>
839-
// ...
840-
// rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false)
841865
assertResult(None) {
842866
parquetFilters.createFilter(
843867
parquetSchema,
@@ -847,38 +871,94 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
847871
sources.GreaterThan("a", 1),
848872
sources.StringContains("b", "prefix"))))
849873
}
874+
}
875+
876+
test("Converting complements into Parquet filter predicates") {
877+
val schema = StructType(Seq(
878+
StructField("a", IntegerType, nullable = false),
879+
StructField("b", StringType, nullable = true)
880+
))
881+
882+
val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
850883

851884
// Testing
852-
// case sources.Not(pred) =>
853-
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
854-
// .map(FilterApi.not)
885+
// case sources.Not(sources.Or(lhs, rhs)) if canPartialPushDownConjuncts =>
886+
// createFilterHelper(nameToParquetField,
887+
// sources.And(sources.Not(lhs), sources.Not(rhs)), true)
888+
assertResult(Some(FilterApi.not(gt(intColumn("a"), 5: Integer)))) {
889+
parquetFilters.createFilter(
890+
parquetSchema,
891+
sources.Not(
892+
sources.Or(
893+
sources.GreaterThan("a", 5),
894+
sources.StringContains("b", "prefix")
895+
)
896+
)
897+
)
898+
}
899+
900+
assertResult(Some(FilterApi.and(FilterApi.not(gt(intColumn("a"), 5: Integer)),
901+
FilterApi.not(lt(intColumn("a"), 10: Integer))))) {
902+
parquetFilters.createFilter(
903+
parquetSchema,
904+
sources.Not(
905+
sources.Or(
906+
sources.Or(
907+
sources.GreaterThan("a", 5),
908+
sources.StringContains("b", "prefix")
909+
),
910+
sources.LessThan("a", 10)
911+
)
912+
)
913+
)
914+
}
915+
916+
// Testing
917+
// case sources.Not(sources.Or(lhs, rhs)) if canPartialPushDownConjuncts =>
918+
// createFilterHelper(nameToParquetField,
919+
// sources.And(sources.Not(lhs), sources.Not(rhs)), true)
855920
//
856921
// and
857922
//
858-
// Testing when `canRemoveOneSideInAnd == false`
859923
// case sources.And(lhs, rhs) =>
860924
// ...
861-
// case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
862-
assertResult(None) {
925+
// case _ => None
926+
927+
assertResult(Some(FilterApi.not(lt(intColumn("a"), 10: Integer)))) {
863928
parquetFilters.createFilter(
864929
parquetSchema,
865930
sources.Not(
866-
sources.And(
867-
sources.GreaterThan("a", 1),
868-
sources.StringContains("b", "prefix"))))
931+
sources.Or(
932+
sources.And(
933+
sources.GreaterThan("a", 5),
934+
sources.StringContains("b", "prefix")
935+
),
936+
sources.LessThan("a", 10)
937+
)
938+
)
939+
)
869940
}
870941

871942
// Testing
872943
// case sources.Not(pred) =>
873-
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
944+
// createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = false)
874945
// .map(FilterApi.not)
875946
//
876947
// and
877948
//
878-
// Testing when `canRemoveOneSideInAnd == false`
949+
// Testing when `canPartialPushDownConjuncts == false`
879950
// case sources.And(lhs, rhs) =>
880951
// ...
881-
// case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
952+
// case (Some(lhsFilter), None) if canPartialPushDownConjuncts => Some(lhsFilter)
953+
assertResult(None) {
954+
parquetFilters.createFilter(
955+
parquetSchema,
956+
sources.Not(
957+
sources.And(
958+
sources.GreaterThan("a", 1),
959+
sources.StringContains("b", "prefix"))))
960+
}
961+
882962
assertResult(None) {
883963
parquetFilters.createFilter(
884964
parquetSchema,
@@ -888,16 +968,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
888968
sources.GreaterThan("a", 1))))
889969
}
890970

891-
// Testing
892-
// case sources.Not(pred) =>
893-
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
894-
// .map(FilterApi.not)
895-
//
896-
// and
897-
//
898-
// Testing passing `canRemoveOneSideInAnd = false` into
899-
// case sources.And(lhs, rhs) =>
900-
// val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
901971
assertResult(None) {
902972
parquetFilters.createFilter(
903973
parquetSchema,
@@ -909,16 +979,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
909979
sources.GreaterThan("a", 2))))
910980
}
911981

912-
// Testing
913-
// case sources.Not(pred) =>
914-
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
915-
// .map(FilterApi.not)
916-
//
917-
// and
918-
//
919-
// Testing passing `canRemoveOneSideInAnd = false` into
920-
// case sources.And(lhs, rhs) =>
921-
// val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)
922982
assertResult(None) {
923983
parquetFilters.createFilter(
924984
parquetSchema,
@@ -929,6 +989,19 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
929989
sources.GreaterThan("a", 1),
930990
sources.StringContains("b", "prefix")))))
931991
}
992+
993+
// Testing
994+
// case sources.Not(sources.Not(pred)) if canPartialPushDownConjuncts =>
995+
// createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = true)
996+
assertResult(Some(lt(intColumn("a"), 10: Integer))) {
997+
parquetFilters.createFilter(
998+
parquetSchema,
999+
sources.Not(
1000+
sources.Not(
1001+
sources.And(
1002+
sources.StringContains("b", "prefix"),
1003+
sources.LessThan("a", 10)))))
1004+
}
9321005
}
9331006

9341007
test("SPARK-16371 Do not push down filters when inner name and outer name are the same") {

0 commit comments

Comments
 (0)