diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 21ab9c78e53d9..7662c8a5ff60c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -534,6 +534,13 @@ private[parquet] class ParquetFilters( createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts = false) } yield FilterApi.or(lhsFilter, rhsFilter) + case sources.Not(sources.Or(lhs, rhs)) if canPartialPushDownConjuncts => + createFilterHelper(nameToParquetField, + sources.And(sources.Not(lhs), sources.Not(rhs)), canPartialPushDownConjuncts = true) + + case sources.Not(sources.Not(pred)) if canPartialPushDownConjuncts => + createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = true) + case sources.Not(pred) => createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = false) .map(FilterApi.not) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 01e41b3c5df36..5aed758588c10 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -770,10 +770,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.GreaterThan("c", 1.5D))) } - // Testing when `canRemoveOneSideInAnd == true` + // Testing when `canPartialPushDownConjuncts == true` // case sources.And(lhs, rhs) => // ... - // case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) + // case (Some(lhsFilter), None) if canPartialPushDownConjuncts => Some(lhsFilter) assertResult(Some(lt(intColumn("a"), 10: Integer))) { parquetFilters.createFilter( parquetSchema, @@ -782,10 +782,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.StringContains("b", "prefix"))) } - // Testing when `canRemoveOneSideInAnd == true` + // Testing when `canPartialPushDownConjuncts == true` // case sources.And(lhs, rhs) => // ... - // case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + // case (None, Some(rhsFilter)) if canPartialPushDownConjuncts => Some(rhsFilter) assertResult(Some(lt(intColumn("a"), 10: Integer))) { parquetFilters.createFilter( parquetSchema, @@ -819,11 +819,39 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.LessThan("a", 10) ))) } + } + + test("Converting disjunctions into Parquet filter predicates") { + val schema = StructType(Seq( + StructField("a", IntegerType, nullable = false), + StructField("b", StringType, nullable = true) + )) + + val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) + + // Testing + // case sources.Or(lhs, rhs) => + // ... + // lhsFilter <- createFilterHelper(nameToParquetField, lhs, false) + assertResult(Some( + FilterApi.or(gt(intColumn("a"), 2: Integer), lt(intColumn("a"), 1: Integer)))) { + parquetFilters.createFilter( + parquetSchema, + sources.Or( + sources.GreaterThan("a", 2), + sources.LessThan("a", 1))) + } // Testing // case sources.Or(lhs, rhs) => // ... - // lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false) + // lhsFilter <- createFilterHelper(nameToParquetField, lhs, false) + // + // and + // + // case sources.And(lhs, rhs) => + // ... + // case _ => None assertResult(None) { parquetFilters.createFilter( parquetSchema, @@ -834,10 +862,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.GreaterThan("a", 2))) } - // Testing - // case sources.Or(lhs, rhs) => - // ... - // rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false) assertResult(None) { parquetFilters.createFilter( parquetSchema, @@ -847,38 +871,94 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.GreaterThan("a", 1), sources.StringContains("b", "prefix")))) } + } + + test("Converting complements into Parquet filter predicates") { + val schema = StructType(Seq( + StructField("a", IntegerType, nullable = false), + StructField("b", StringType, nullable = true) + )) + + val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) // Testing - // case sources.Not(pred) => - // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) - // .map(FilterApi.not) + // case sources.Not(sources.Or(lhs, rhs)) if canPartialPushDownConjuncts => + // createFilterHelper(nameToParquetField, + // sources.And(sources.Not(lhs), sources.Not(rhs)), true) + assertResult(Some(FilterApi.not(gt(intColumn("a"), 5: Integer)))) { + parquetFilters.createFilter( + parquetSchema, + sources.Not( + sources.Or( + sources.GreaterThan("a", 5), + sources.StringContains("b", "prefix") + ) + ) + ) + } + + assertResult(Some(FilterApi.and(FilterApi.not(gt(intColumn("a"), 5: Integer)), + FilterApi.not(lt(intColumn("a"), 10: Integer))))) { + parquetFilters.createFilter( + parquetSchema, + sources.Not( + sources.Or( + sources.Or( + sources.GreaterThan("a", 5), + sources.StringContains("b", "prefix") + ), + sources.LessThan("a", 10) + ) + ) + ) + } + + // Testing + // case sources.Not(sources.Or(lhs, rhs)) if canPartialPushDownConjuncts => + // createFilterHelper(nameToParquetField, + // sources.And(sources.Not(lhs), sources.Not(rhs)), true) // // and // - // Testing when `canRemoveOneSideInAnd == false` // case sources.And(lhs, rhs) => // ... - // case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) - assertResult(None) { + // case _ => None + + assertResult(Some(FilterApi.not(lt(intColumn("a"), 10: Integer)))) { parquetFilters.createFilter( parquetSchema, sources.Not( - sources.And( - sources.GreaterThan("a", 1), - sources.StringContains("b", "prefix")))) + sources.Or( + sources.And( + sources.GreaterThan("a", 5), + sources.StringContains("b", "prefix") + ), + sources.LessThan("a", 10) + ) + ) + ) } // Testing // case sources.Not(pred) => - // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + // createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = false) // .map(FilterApi.not) // // and // - // Testing when `canRemoveOneSideInAnd == false` + // Testing when `canPartialPushDownConjuncts == false` // case sources.And(lhs, rhs) => // ... - // case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + // case (Some(lhsFilter), None) if canPartialPushDownConjuncts => Some(lhsFilter) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Not( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")))) + } + assertResult(None) { parquetFilters.createFilter( parquetSchema, @@ -888,16 +968,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.GreaterThan("a", 1)))) } - // Testing - // case sources.Not(pred) => - // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) - // .map(FilterApi.not) - // - // and - // - // Testing passing `canRemoveOneSideInAnd = false` into - // case sources.And(lhs, rhs) => - // val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd) assertResult(None) { parquetFilters.createFilter( parquetSchema, @@ -909,16 +979,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.GreaterThan("a", 2)))) } - // Testing - // case sources.Not(pred) => - // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) - // .map(FilterApi.not) - // - // and - // - // Testing passing `canRemoveOneSideInAnd = false` into - // case sources.And(lhs, rhs) => - // val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd) assertResult(None) { parquetFilters.createFilter( parquetSchema, @@ -929,6 +989,19 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.GreaterThan("a", 1), sources.StringContains("b", "prefix"))))) } + + // Testing + // case sources.Not(sources.Not(pred)) if canPartialPushDownConjuncts => + // createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = true) + assertResult(Some(lt(intColumn("a"), 10: Integer))) { + parquetFilters.createFilter( + parquetSchema, + sources.Not( + sources.Not( + sources.And( + sources.StringContains("b", "prefix"), + sources.LessThan("a", 10))))) + } } test("SPARK-16371 Do not push down filters when inner name and outer name are the same") {