-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-34897][SQL] Support reconcile schemas based on index after nested column pruning #31993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
isn't it a bug? cc @viirya |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
Outdated
Show resolved
Hide resolved
|
Test build #136644 has finished for PR 31993 at commit
|
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this seems to be a special case where nested column pruning doesn't work for ORC. For the case, it is needed to send entire unpruned data schema to ORC.
|
As nested column pruning rule is far from the point we get the physical information of ORC files, and this should be a narrow case, it looks okay to me to inform users a possible workaround here. |
|
It is a Hive ORC table in our production environment. |
|
Can we automatically disable nested column pruning at executor side when we find the orc file schema is the by-position style? |
|
Can we disable column pruning when it is Hive ORC table? spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala Lines 98 to 100 in 25e7d1c
Update canPruneRelation to:
private def canPruneRelation(fsRelation: HadoopFsRelation) = {
fsRelation.fileFormat match {
case _: ParquetFileFormat => true
case _: OrcFileFormat =>
fsRelation.location match {
case c: CatalogFileIndex =>
!c.table.provider.contains(DDLUtils.HIVE_PROVIDER)
case _ => true
}
}
} |
|
Sorry I may miss something. Why it's only a problem in nested column pruning but not column pruning? |
Nested column pruning removed the field: spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala Lines 28 to 42 in 0f2c0b5
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #137176 has finished for PR 31993 at commit
|
| } | ||
| r.pruneColumns(prunedSchema) | ||
| val neededFieldNames = neededOutput.map(_.name).toSet | ||
| r.pruneColumns(StructType(prunedSchema.filter(f => neededFieldNames.contains(f.name)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move filter logical from SchemaPruning to PushDownUtils to support datasource V2 column pruning.
| val dataSchemaFieldNames = dataSchema.fieldNames.toSet | ||
| val mergedDataSchema = | ||
| StructType(mergedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) | ||
| StructType(dataSchema.map(s => mergedSchema.find(_.name.equals(s.name)).getOrElse(s))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the actual difference? can you give a simple example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we don't prune anything from the root fields now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is the case please update the document of this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.sql(
"""
|CREATE TABLE t1 (
| _col0 INT,
| _col1 STRING,
| _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
|USING ORC
|""".stripMargin)
spark.sql("SELECT _col0, _col2.c1 FROM t1").showThe origin schema is:
`_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING, `c2`: STRING, `c3`: STRING, `c4`: BIGINT>
Before this PR, the pruneDataSchema returns:
`_col0` INT,`_col2` STRUCT<`c1`: STRING>
After this PR, the pruneDataSchema returns:
`_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING>
It only prune nested schemas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's wrong with the previous behavior? We can't sacrifice performance for all the cases only because the ORC by ordinal case is problematic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it because column pruning will be done by other rules so we don't need to consider it here?
Yes.
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
Lines 208 to 213 in 7a5647a
| val readDataColumns = | |
| dataColumns | |
| .filter(requiredAttributes.contains) | |
| .filterNot(partitionColumns.contains) | |
| val outputSchema = readDataColumns.toStructType | |
| logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}") |
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
Lines 96 to 97 in e64eb75
| val neededFieldNames = neededOutput.map(_.name).toSet | |
| r.pruneColumns(StructType(prunedSchema.filter(f => neededFieldNames.contains(f.name)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide the full code workflow to explain why this causes issues in ORC? I'm still not very sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
Prune nested schema:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
Lines 28 to 43 in 0f2c0b5
def pruneDataSchema( dataSchema: StructType, requestedRootFields: Seq[RootField]): StructType = { // Merge the requested root fields into a single schema. Note the ordering of the fields // in the resulting schema may differ from their ordering in the logical relation's // original schema val mergedSchema = requestedRootFields .map { case root: RootField => StructType(Array(root.field)) } .reduceLeft(_ merge _) val dataSchemaFieldNames = dataSchema.fieldNames.toSet val mergedDataSchema = StructType(mergedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) // Sort the fields of mergedDataSchema according to their order in dataSchema, // recursively. This makes mergedDataSchema a pruned schema of dataSchema sortLeftFieldsByRight(mergedDataSchema, dataSchema).asInstanceOf[StructType] } -
Use this pruned nested schema to build the
dataSchemainRelation
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
Lines 81 to 86 in 25e7d1c
if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { val prunedRelation = leafNodeBuilder(prunedDataSchema) val projectionOverSchema = ProjectionOverSchema(prunedDataSchema) Some(buildNewProjection(normalizedProjects, normalizedFilters, prunedRelation, projectionOverSchema)) -
The
readDataColumnsis the complete column pruning:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
Lines 208 to 226 in 7a5647a
val readDataColumns = dataColumns .filter(requiredAttributes.contains) .filterNot(partitionColumns.contains) val outputSchema = readDataColumns.toStructType logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}") val outputAttributes = readDataColumns ++ partitionColumns val scan = FileSourceScanExec( fsRelation, outputAttributes, outputSchema, partitionKeyFilters.toSeq, bucketSet, None, dataFilters, table.map(_.identifier)) -
dataSchemafromrelation.dataSchema. It is the pruned nested schema:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
Lines 398 to 407 in 935aa8c
lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) -
OrcUtils.requestedColumnIdsuse this pruned nested schema:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
Lines 193 to 197 in 1fc66f6
val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, requiredSchema, reader, conf) }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is because requestedColumnIds will check if given data schema has less fields than physical schema in ORC file.
Under nested column pruning, Spark will let data source use pruned schema as data schema to read files. E.g., Spark prune _col1, for the above example. But the ORC file has three top-level fields _col0, _col1, and _col2, so the check in requestedColumnIds will fail on the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it because column pruning will be done by other rules so we don't need to consider it here?
Yes.
Hmm? In PushDownUtils.pruneColumns, if you enable nested column pruning, Spark will only run the path of nested column pruning, not the quoted L96-97.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
Show resolved
Hide resolved
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #137449 has finished for PR 31993 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #137707 has finished for PR 31993 at commit
|
| * and given requested field are "a", the field "b" is pruned in the returned schema. | ||
| * Note that schema field ordering at original schema is still preserved in pruned schema. | ||
| * Prunes the nested schema by the requested fields. For example, if the schema is: | ||
| * `id int, struct<a:int, b:int>`, and given requested field are "a", the field "b" is pruned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
top-level columns need to have a name, id int, s struct<a:int, b:int>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and given requested field are "a" -> and given requested field "s.a"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the field "b" is pruned -> the inner field "b" ...
| * Note that schema field ordering at original schema is still preserved in pruned schema. | ||
| * Prunes the nested schema by the requested fields. For example, if the schema is: | ||
| * `id int, struct<a:int, b:int>`, and given requested field are "a", the field "b" is pruned | ||
| * in the returned schema: `id int, struct<a:int>`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, id int, s struct<a:int>
|
@wangyum there are conflicts |
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
| val upperCaseSchema = StructType.fromDDL("A struct<A:int, B:int>, B int") | ||
| val lowerCaseSchema = StructType.fromDDL("a struct<a:int, b:int>, b int") | ||
| val upperCaseRequestedFields = Seq(StructField("A", StructType.fromDDL("A int"))) | ||
| val lowerCaseRequestedFields = Seq(StructField("a", StructType.fromDDL("a int"))) | ||
|
|
||
| Seq(true, false).foreach { isCaseSensitive => | ||
| withSQLConf(CASE_SENSITIVE.key -> isCaseSensitive.toString) { | ||
| if (isCaseSensitive) { | ||
| // Schema is case-sensitive | ||
| val requestedFields = getRootFields(StructField("id", IntegerType)) | ||
| val prunedSchema = SchemaPruning.pruneDataSchema( | ||
| StructType.fromDDL("ID int, name String"), requestedFields) | ||
| assert(prunedSchema == StructType(Seq.empty)) | ||
| // Root fields are case-sensitive | ||
| val rootFieldsSchema = SchemaPruning.pruneDataSchema( | ||
| StructType.fromDDL("id int, name String"), | ||
| getRootFields(StructField("ID", IntegerType))) | ||
| assert(rootFieldsSchema == StructType(StructType(Seq.empty))) | ||
| testPrunedSchema( | ||
| upperCaseSchema, | ||
| upperCaseRequestedFields, | ||
| StructType.fromDDL("A struct<A:int>, B int")) | ||
| testPrunedSchema( | ||
| upperCaseSchema, | ||
| lowerCaseRequestedFields, | ||
| upperCaseSchema) | ||
|
|
||
| testPrunedSchema( | ||
| lowerCaseSchema, | ||
| upperCaseRequestedFields, | ||
| lowerCaseSchema) | ||
| testPrunedSchema( | ||
| lowerCaseSchema, | ||
| lowerCaseRequestedFields, | ||
| StructType.fromDDL("a struct<a:int>, b int")) | ||
| } else { | ||
| // Schema is case-insensitive | ||
| val prunedSchema = SchemaPruning.pruneDataSchema( | ||
| StructType.fromDDL("ID int, name String"), | ||
| getRootFields(StructField("id", IntegerType))) | ||
| assert(prunedSchema == StructType(StructField("ID", IntegerType) :: Nil)) | ||
| // Root fields are case-insensitive | ||
| val rootFieldsSchema = SchemaPruning.pruneDataSchema( | ||
| StructType.fromDDL("id int, name String"), | ||
| getRootFields(StructField("ID", IntegerType))) | ||
| assert(rootFieldsSchema == StructType(StructField("id", IntegerType) :: Nil)) | ||
| Seq(upperCaseRequestedFields, lowerCaseRequestedFields).foreach { requestedFields => | ||
| testPrunedSchema( | ||
| upperCaseSchema, | ||
| requestedFields, | ||
| StructType.fromDDL("A struct<A:int>, B int")) | ||
| } | ||
|
|
||
| Seq(upperCaseRequestedFields, lowerCaseRequestedFields).foreach { requestedFields => | ||
| testPrunedSchema( | ||
| lowerCaseSchema, | ||
| requestedFields, | ||
| StructType.fromDDL("a struct<a:int>, b int")) | ||
| } | ||
| } | ||
| } | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests LGTM, thanks for add more scenarios
Fixed. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
how far shall we backport? to 3.0? |
|
Yes. to 3.0. |
|
Test build #137730 has finished for PR 31993 at commit
|
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
|
Thanks! Merging to master. |
|
@wangyum There are conflicts in 3.1/3.0. Can you create backport PRs? Thanks. |
…r nested column pruning This PR backports #31993 to branch-3.1. The origin PR description: ### What changes were proposed in this pull request? It will remove `StructField` when [pruning nested columns](https://github.com/apache/spark/blob/0f2c0b53e8fb18c86c67b5dd679c006db93f94a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala#L28-L42). For example: ```scala spark.sql( """ |CREATE TABLE t1 ( | _col0 INT, | _col1 STRING, | _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>) |USING ORC |""".stripMargin) spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))") spark.sql("SELECT _col0, _col2.c1 FROM t1").show ``` Before this pr. The returned schema is: ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` add it will throw exception: ``` java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read. at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160) ``` After this pr. The returned schema is: ``` `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING> ```. The finally schema is ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` after the complete column pruning: https://github.com/apache/spark/blob/7a5647a93aaea9d1d78d9262e24fc8c010db04d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L208-L213 https://github.com/apache/spark/blob/e64eb75aede71a5403a4d4436e63b1fcfdeca14d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala#L96-L97 ### Why are the changes needed? Fix bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #32279 from wangyum/SPARK-34897-3.1. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]>
…r nested column pruning This PR backports #31993 to branch-3.0. The origin PR description: ### What changes were proposed in this pull request? It will remove `StructField` when [pruning nested columns](https://github.com/apache/spark/blob/0f2c0b53e8fb18c86c67b5dd679c006db93f94a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala#L28-L42). For example: ```scala spark.sql( """ |CREATE TABLE t1 ( | _col0 INT, | _col1 STRING, | _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>) |USING ORC |""".stripMargin) spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))") spark.sql("SELECT _col0, _col2.c1 FROM t1").show ``` Before this pr. The returned schema is: ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` add it will throw exception: ``` java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read. at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160) ``` After this pr. The returned schema is: ``` `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING> ```. The finally schema is ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` after the complete column pruning: https://github.com/apache/spark/blob/7a5647a93aaea9d1d78d9262e24fc8c010db04d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L208-L213 https://github.com/apache/spark/blob/e64eb75aede71a5403a4d4436e63b1fcfdeca14d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala#L96-L97 ### Why are the changes needed? Fix bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #32310 from wangyum/SPARK-34897-3.0. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]>
…r nested column pruning This PR backports apache#31993 to branch-3.1. The origin PR description: It will remove `StructField` when [pruning nested columns](https://github.com/apache/spark/blob/0f2c0b53e8fb18c86c67b5dd679c006db93f94a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala#L28-L42). For example: ```scala spark.sql( """ |CREATE TABLE t1 ( | _col0 INT, | _col1 STRING, | _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>) |USING ORC |""".stripMargin) spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))") spark.sql("SELECT _col0, _col2.c1 FROM t1").show ``` Before this pr. The returned schema is: ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` add it will throw exception: ``` java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read. at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160) ``` After this pr. The returned schema is: ``` `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING> ```. The finally schema is ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` after the complete column pruning: https://github.com/apache/spark/blob/7a5647a93aaea9d1d78d9262e24fc8c010db04d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L208-L213 https://github.com/apache/spark/blob/e64eb75aede71a5403a4d4436e63b1fcfdeca14d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala#L96-L97 Fix bug. No. Unit test. Closes apache#32279 from wangyum/SPARK-34897-3.1. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]>
…r nested column pruning This PR backports apache#31993 to branch-3.1. The origin PR description: ### What changes were proposed in this pull request? It will remove `StructField` when [pruning nested columns](https://github.com/apache/spark/blob/0f2c0b53e8fb18c86c67b5dd679c006db93f94a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala#L28-L42). For example: ```scala spark.sql( """ |CREATE TABLE t1 ( | _col0 INT, | _col1 STRING, | _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>) |USING ORC |""".stripMargin) spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))") spark.sql("SELECT _col0, _col2.c1 FROM t1").show ``` Before this pr. The returned schema is: ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` add it will throw exception: ``` java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read. at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160) ``` After this pr. The returned schema is: ``` `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING> ```. The finally schema is ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` after the complete column pruning: https://github.com/apache/spark/blob/7a5647a93aaea9d1d78d9262e24fc8c010db04d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L208-L213 https://github.com/apache/spark/blob/e64eb75aede71a5403a4d4436e63b1fcfdeca14d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala#L96-L97 ### Why are the changes needed? Fix bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes apache#32279 from wangyum/SPARK-34897-3.1. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]>
What changes were proposed in this pull request?
It will remove
StructFieldwhen pruning nested columns. For example:Before this pr. The returned schema is:
`_col0` INT,`_col2` STRUCT<`c1`: STRING>add it will throw exception:After this pr. The returned schema is:
`_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING>.The finally schema is
`_col0` INT,`_col2` STRUCT<`c1`: STRING>after the complete column pruning:spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
Lines 208 to 213 in 7a5647a
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
Lines 96 to 97 in e64eb75
Why are the changes needed?
Fix bug.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test.