Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,13 @@ private[parquet] class ParquetFilters(
createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts = false)
} yield FilterApi.or(lhsFilter, rhsFilter)

case sources.Not(sources.Or(lhs, rhs)) if canPartialPushDownConjuncts =>
createFilterHelper(nameToParquetField,
sources.And(sources.Not(lhs), sources.Not(rhs)), canPartialPushDownConjuncts = true)

case sources.Not(sources.Not(pred)) if canPartialPushDownConjuncts =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, is this actually reachable?

createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = true)

case sources.Not(pred) =>
createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = false)
.map(FilterApi.not)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,10 +770,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
sources.GreaterThan("c", 1.5D)))
}

// Testing when `canRemoveOneSideInAnd == true`
// Testing when `canPartialPushDownConjuncts == true`
// case sources.And(lhs, rhs) =>
// ...
// case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
// case (Some(lhsFilter), None) if canPartialPushDownConjuncts => Some(lhsFilter)
assertResult(Some(lt(intColumn("a"), 10: Integer))) {
parquetFilters.createFilter(
parquetSchema,
Expand All @@ -782,10 +782,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
sources.StringContains("b", "prefix")))
}

// Testing when `canRemoveOneSideInAnd == true`
// Testing when `canPartialPushDownConjuncts == true`
// case sources.And(lhs, rhs) =>
// ...
// case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
// case (None, Some(rhsFilter)) if canPartialPushDownConjuncts => Some(rhsFilter)
assertResult(Some(lt(intColumn("a"), 10: Integer))) {
parquetFilters.createFilter(
parquetSchema,
Expand Down Expand Up @@ -819,11 +819,39 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
sources.LessThan("a", 10)
)))
}
}

test("Converting disjunctions into Parquet filter predicates") {
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false),
StructField("b", StringType, nullable = true)
))

val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)

// Testing
// case sources.Or(lhs, rhs) =>
// ...
// lhsFilter <- createFilterHelper(nameToParquetField, lhs, false)
assertResult(Some(
FilterApi.or(gt(intColumn("a"), 2: Integer), lt(intColumn("a"), 1: Integer)))) {
parquetFilters.createFilter(
parquetSchema,
sources.Or(
sources.GreaterThan("a", 2),
sources.LessThan("a", 1)))
}

// Testing
// case sources.Or(lhs, rhs) =>
// ...
// lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false)
// lhsFilter <- createFilterHelper(nameToParquetField, lhs, false)
//
// and
//
// case sources.And(lhs, rhs) =>
// ...
// case _ => None
assertResult(None) {
parquetFilters.createFilter(
parquetSchema,
Expand All @@ -834,10 +862,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
sources.GreaterThan("a", 2)))
}

// Testing
// case sources.Or(lhs, rhs) =>
// ...
// rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false)
assertResult(None) {
parquetFilters.createFilter(
parquetSchema,
Expand All @@ -847,38 +871,94 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
sources.GreaterThan("a", 1),
sources.StringContains("b", "prefix"))))
}
}

test("Converting complements into Parquet filter predicates") {
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false),
StructField("b", StringType, nullable = true)
))

val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)

// Testing
// case sources.Not(pred) =>
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
// .map(FilterApi.not)
// case sources.Not(sources.Or(lhs, rhs)) if canPartialPushDownConjuncts =>
// createFilterHelper(nameToParquetField,
// sources.And(sources.Not(lhs), sources.Not(rhs)), true)
assertResult(Some(FilterApi.not(gt(intColumn("a"), 5: Integer)))) {
parquetFilters.createFilter(
parquetSchema,
sources.Not(
sources.Or(
sources.GreaterThan("a", 5),
sources.StringContains("b", "prefix")
)
)
)
}

assertResult(Some(FilterApi.and(FilterApi.not(gt(intColumn("a"), 5: Integer)),
FilterApi.not(lt(intColumn("a"), 10: Integer))))) {
parquetFilters.createFilter(
parquetSchema,
sources.Not(
sources.Or(
sources.Or(
sources.GreaterThan("a", 5),
sources.StringContains("b", "prefix")
),
sources.LessThan("a", 10)
)
)
)
}

// Testing
// case sources.Not(sources.Or(lhs, rhs)) if canPartialPushDownConjuncts =>
// createFilterHelper(nameToParquetField,
// sources.And(sources.Not(lhs), sources.Not(rhs)), true)
//
// and
//
// Testing when `canRemoveOneSideInAnd == false`
// case sources.And(lhs, rhs) =>
// ...
// case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
assertResult(None) {
// case _ => None

assertResult(Some(FilterApi.not(lt(intColumn("a"), 10: Integer)))) {
parquetFilters.createFilter(
parquetSchema,
sources.Not(
sources.And(
sources.GreaterThan("a", 1),
sources.StringContains("b", "prefix"))))
sources.Or(
sources.And(
sources.GreaterThan("a", 5),
sources.StringContains("b", "prefix")
),
sources.LessThan("a", 10)
)
)
)
}

// Testing
// case sources.Not(pred) =>
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
// createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = false)
// .map(FilterApi.not)
//
// and
//
// Testing when `canRemoveOneSideInAnd == false`
// Testing when `canPartialPushDownConjuncts == false`
// case sources.And(lhs, rhs) =>
// ...
// case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
// case (Some(lhsFilter), None) if canPartialPushDownConjuncts => Some(lhsFilter)
assertResult(None) {
parquetFilters.createFilter(
parquetSchema,
sources.Not(
sources.And(
sources.GreaterThan("a", 1),
sources.StringContains("b", "prefix"))))
}

assertResult(None) {
parquetFilters.createFilter(
parquetSchema,
Expand All @@ -888,16 +968,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
sources.GreaterThan("a", 1))))
}

// Testing
// case sources.Not(pred) =>
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
// .map(FilterApi.not)
//
// and
//
// Testing passing `canRemoveOneSideInAnd = false` into
// case sources.And(lhs, rhs) =>
// val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
assertResult(None) {
parquetFilters.createFilter(
parquetSchema,
Expand All @@ -909,16 +979,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
sources.GreaterThan("a", 2))))
}

// Testing
// case sources.Not(pred) =>
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
// .map(FilterApi.not)
//
// and
//
// Testing passing `canRemoveOneSideInAnd = false` into
// case sources.And(lhs, rhs) =>
// val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)
assertResult(None) {
parquetFilters.createFilter(
parquetSchema,
Expand All @@ -929,6 +989,19 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
sources.GreaterThan("a", 1),
sources.StringContains("b", "prefix")))))
}

// Testing
// case sources.Not(sources.Not(pred)) if canPartialPushDownConjuncts =>
// createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = true)
assertResult(Some(lt(intColumn("a"), 10: Integer))) {
parquetFilters.createFilter(
parquetSchema,
sources.Not(
sources.Not(
sources.And(
sources.StringContains("b", "prefix"),
sources.LessThan("a", 10)))))
}
}

test("SPARK-16371 Do not push down filters when inner name and outer name are the same") {
Expand Down