-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet #27728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
385738d
22a7a00
b775e02
c40f5d0
e9944e5
44b310e
4883e68
f732664
5fd97c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -652,10 +652,19 @@ object DataSourceStrategy { | |
| */ | ||
| object PushableColumn { | ||
| def unapply(e: Expression): Option[String] = { | ||
| def helper(e: Expression) = e match { | ||
| case a: Attribute => Some(a.name) | ||
| val nestedPredicatePushdownEnabled = SQLConf.get.nestedPredicatePushdownEnabled | ||
| import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper | ||
| def helper(e: Expression): Option[Seq[String]] = e match { | ||
| case a: Attribute => | ||
| if (nestedPredicatePushdownEnabled || !a.name.contains(".")) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you explain what this condition means? |
||
| Some(Seq(a.name)) | ||
| } else { | ||
| None | ||
| } | ||
| case s: GetStructField if nestedPredicatePushdownEnabled => | ||
| helper(s.child).map(_ :+ s.childSchema(s.ordinal).name) | ||
| case _ => None | ||
| } | ||
| helper(e) | ||
| helper(e).map(_.quoted) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, for my late review. This sounds an API breaking change.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we limit this change to some specific data sources? For example, parquet only?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am afraid it might break the released external third-party connectors that might not be able to handle the quoted column names.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this was pointed out and discussed at #27728 (comment) and #27728 (comment).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm sorry I've been working on my own task (prioritized) and am afraid I can't pick up this soon. Please take this over if anyone has the idea of implementing this.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @viirya Are you interested in this follow up?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can try looking at this this week. If anyone picks it up before me, I'm also ok.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for fixing this 3.0 blocker |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,7 @@ import scala.collection.JavaConverters.asScalaBufferConverter | |
| import org.apache.parquet.filter2.predicate._ | ||
| import org.apache.parquet.filter2.predicate.SparkFilterApi._ | ||
| import org.apache.parquet.io.api.Binary | ||
| import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator} | ||
| import org.apache.parquet.schema.{DecimalMetadata, GroupType, MessageType, OriginalType, PrimitiveComparator, PrimitiveType, Type} | ||
| import org.apache.parquet.schema.OriginalType._ | ||
| import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName | ||
| import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ | ||
|
|
@@ -49,15 +49,35 @@ class ParquetFilters( | |
| pushDownInFilterThreshold: Int, | ||
| caseSensitive: Boolean) { | ||
| // A map which contains parquet field name and data type, if predicate push down applies. | ||
| private val nameToParquetField : Map[String, ParquetField] = { | ||
| // Here we don't flatten the fields in the nested schema but just look up through | ||
| // root fields. Currently, accessing to nested fields does not push down filters | ||
| // and it does not support to create filters for them. | ||
| val primitiveFields = | ||
| schema.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => | ||
| f.getName -> ParquetField(f.getName, | ||
| ParquetSchemaType(f.getOriginalType, | ||
| f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata)) | ||
| // | ||
| // Each key in `nameToParquetField` represents a column; `dots` are used as separators for | ||
| // nested columns. If any part of the names contains `dots`, it is quoted to avoid confusion. | ||
| // See `org.apache.spark.sql.connector.catalog.quote` for implementation details. | ||
| private val nameToParquetField : Map[String, ParquetPrimitiveField] = { | ||
|
||
| // Recursively traverse the parquet schema to get primitive fields that can be pushed-down. | ||
| // `parentFieldNames` is used to keep track of the current nested level when traversing. | ||
| def getPrimitiveFields( | ||
| fields: Seq[Type], | ||
| parentFieldNames: Array[String] = Array.empty): Seq[ParquetPrimitiveField] = { | ||
| fields.flatMap { | ||
| case p: PrimitiveType => | ||
| Some(ParquetPrimitiveField(fieldNames = parentFieldNames :+ p.getName, | ||
| fieldType = ParquetSchemaType(p.getOriginalType, | ||
| p.getPrimitiveTypeName, p.getTypeLength, p.getDecimalMetadata))) | ||
| // Note that when g is a `Struct`, `g.getOriginalType` is `null`. | ||
| // When g is a `Map`, `g.getOriginalType` is `MAP`. | ||
| // When g is a `List`, `g.getOriginalType` is `LIST`. | ||
| case g: GroupType if g.getOriginalType == null => | ||
| getPrimitiveFields(g.getFields.asScala, parentFieldNames :+ g.getName) | ||
| // Parquet only supports push-down for primitive types; as a result, Map and List types | ||
| // are removed. | ||
| case _ => None | ||
| } | ||
| } | ||
|
|
||
| val primitiveFields = getPrimitiveFields(schema.getFields.asScala).map { field => | ||
| import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper | ||
|
||
| (field.fieldNames.toSeq.quoted, field) | ||
| } | ||
| if (caseSensitive) { | ||
| primitiveFields.toMap | ||
|
|
@@ -75,13 +95,13 @@ class ParquetFilters( | |
| } | ||
|
|
||
| /** | ||
| * Holds a single field information stored in the underlying parquet file. | ||
| * Holds a single primitive field information stored in the underlying parquet file. | ||
| * | ||
| * @param fieldName field name in parquet file | ||
| * @param fieldNames a field name as an array of string multi-identifier in parquet file | ||
| * @param fieldType field type related info in parquet file | ||
| */ | ||
| private case class ParquetField( | ||
| fieldName: String, | ||
| private case class ParquetPrimitiveField( | ||
| fieldNames: Array[String], | ||
| fieldType: ParquetSchemaType) | ||
|
|
||
| private case class ParquetSchemaType( | ||
|
|
@@ -472,13 +492,8 @@ class ParquetFilters( | |
| case _ => false | ||
| } | ||
|
|
||
| // Parquet does not allow dots in the column name because dots are used as a column path | ||
| // delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates | ||
| // with missing columns. The incorrect results could be got from Parquet when we push down | ||
| // filters for the column having dots in the names. Thus, we do not push down such filters. | ||
| // See SPARK-20364. | ||
| private def canMakeFilterOn(name: String, value: Any): Boolean = { | ||
| nameToParquetField.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value) | ||
| nameToParquetField.contains(name) && valueCanMakeFilterOn(name, value) | ||
|
||
| } | ||
|
|
||
| /** | ||
|
|
@@ -509,38 +524,38 @@ class ParquetFilters( | |
| predicate match { | ||
| case sources.IsNull(name) if canMakeFilterOn(name, null) => | ||
| makeEq.lift(nameToParquetField(name).fieldType) | ||
| .map(_(Array(nameToParquetField(name).fieldName), null)) | ||
| .map(_(nameToParquetField(name).fieldNames, null)) | ||
| case sources.IsNotNull(name) if canMakeFilterOn(name, null) => | ||
| makeNotEq.lift(nameToParquetField(name).fieldType) | ||
| .map(_(Array(nameToParquetField(name).fieldName), null)) | ||
| .map(_(nameToParquetField(name).fieldNames, null)) | ||
|
|
||
| case sources.EqualTo(name, value) if canMakeFilterOn(name, value) => | ||
| makeEq.lift(nameToParquetField(name).fieldType) | ||
| .map(_(Array(nameToParquetField(name).fieldName), value)) | ||
| .map(_(nameToParquetField(name).fieldNames, value)) | ||
| case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) => | ||
| makeNotEq.lift(nameToParquetField(name).fieldType) | ||
| .map(_(Array(nameToParquetField(name).fieldName), value)) | ||
| .map(_(nameToParquetField(name).fieldNames, value)) | ||
|
|
||
| case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) => | ||
| makeEq.lift(nameToParquetField(name).fieldType) | ||
| .map(_(Array(nameToParquetField(name).fieldName), value)) | ||
| .map(_(nameToParquetField(name).fieldNames, value)) | ||
| case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) => | ||
| makeNotEq.lift(nameToParquetField(name).fieldType) | ||
| .map(_(Array(nameToParquetField(name).fieldName), value)) | ||
| .map(_(nameToParquetField(name).fieldNames, value)) | ||
|
|
||
| case sources.LessThan(name, value) if canMakeFilterOn(name, value) => | ||
| makeLt.lift(nameToParquetField(name).fieldType) | ||
| .map(_(Array(nameToParquetField(name).fieldName), value)) | ||
| .map(_(nameToParquetField(name).fieldNames, value)) | ||
| case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name, value) => | ||
| makeLtEq.lift(nameToParquetField(name).fieldType) | ||
| .map(_(Array(nameToParquetField(name).fieldName), value)) | ||
| .map(_(nameToParquetField(name).fieldNames, value)) | ||
|
|
||
| case sources.GreaterThan(name, value) if canMakeFilterOn(name, value) => | ||
| makeGt.lift(nameToParquetField(name).fieldType) | ||
| .map(_(Array(nameToParquetField(name).fieldName), value)) | ||
| .map(_(nameToParquetField(name).fieldNames, value)) | ||
| case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name, value) => | ||
| makeGtEq.lift(nameToParquetField(name).fieldType) | ||
| .map(_(Array(nameToParquetField(name).fieldName), value)) | ||
| .map(_(nameToParquetField(name).fieldNames, value)) | ||
|
|
||
| case sources.And(lhs, rhs) => | ||
| // At here, it is not safe to just convert one side and remove the other side | ||
|
|
@@ -591,13 +606,13 @@ class ParquetFilters( | |
| && values.distinct.length <= pushDownInFilterThreshold => | ||
| values.distinct.flatMap { v => | ||
| makeEq.lift(nameToParquetField(name).fieldType) | ||
| .map(_(Array(nameToParquetField(name).fieldName), v)) | ||
| .map(_(nameToParquetField(name).fieldNames, v)) | ||
| }.reduceLeftOption(FilterApi.or) | ||
|
|
||
| case sources.StringStartsWith(name, prefix) | ||
| if pushDownStartWith && canMakeFilterOn(name, prefix) => | ||
| Option(prefix).map { v => | ||
| FilterApi.userDefined(binaryColumn(Array(nameToParquetField(name).fieldName)), | ||
| FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames), | ||
| new UserDefinedPredicate[Binary] with Serializable { | ||
| private val strToBinary = Binary.fromReusedByteArray(v.getBytes) | ||
| private val size = strToBinary.length | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we port this back to branch-3.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides of #27728 (comment), one more concern about enabling by default is, after this is enabled, we will push down
a.basbinaby default.I don't think DSv1 pushed down non-existent columns before; however, now DSv1 implementations should understand non-existent column named
a.b.I think we shouldn't assume the DSv1 downstream sources handle non-existent column handling by default. Think about constructing query strings from filters like JDBC - it will fail and the implementations have to be fixed, or this configuration has to be disabled. However, this configuration is none or all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making a configuration that takes a list of sources could be done separately in a separate PR to make this PR smaller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the filter apis will be enhanced to support nested columns and column name containing
dots, it will be nice to introduce it in a major release.It's a good idea! We can make another PR to turn this feature on for specific data sources in a separate PR. This PR already grows too big.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @HeartSaVioR, are you interested in this followup out of curiosity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pinging me!
I might not be able to do this very soon, so please go ahead if someone is also interested or planning to. If that would be OK I deal with this in a week (or maybe even a couple of weeks), yeah I'm interested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I think it should be fine.