Skip to content

Commit e609395

Browse files
wangyumviirya
authored andcommitted
[SPARK-34897][SQL] Support reconcile schemas based on index after nested column pruning
### What changes were proposed in this pull request? It will remove `StructField` when [pruning nested columns](https://github.com/apache/spark/blob/0f2c0b53e8fb18c86c67b5dd679c006db93f94a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala#L28-L42). For example: ```scala spark.sql( """ |CREATE TABLE t1 ( | _col0 INT, | _col1 STRING, | _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>) |USING ORC |""".stripMargin) spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))") spark.sql("SELECT _col0, _col2.c1 FROM t1").show ``` Before this pr. The returned schema is: ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` add it will throw exception: ``` java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read. at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160) ``` After this pr. The returned schema is: ``` `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING> ```. The finally schema is ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` after the complete column pruning: https://github.com/apache/spark/blob/7a5647a93aaea9d1d78d9262e24fc8c010db04d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L208-L213 https://github.com/apache/spark/blob/e64eb75aede71a5403a4d4436e63b1fcfdeca14d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala#L96-L97 ### Why are the changes needed? Fix bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #31993 from wangyum/SPARK-34897. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
1 parent 81dbaed commit e609395

File tree

5 files changed

+108
-57
lines changed

5 files changed

+108
-57
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ import org.apache.spark.sql.types._
2222

2323
object SchemaPruning extends SQLConfHelper {
2424
/**
25-
* Filters the schema by the requested fields. For example, if the schema is struct<a:int, b:int>,
26-
* and given requested field are "a", the field "b" is pruned in the returned schema.
27-
* Note that schema field ordering at original schema is still preserved in pruned schema.
25+
* Prunes the nested schema by the requested fields. For example, if the schema is:
26+
* `id int, s struct<a:int, b:int>`, and given requested field "s.a", the inner field "b"
27+
* is pruned in the returned schema: `id int, s struct<a:int>`.
28+
* Note that:
29+
* 1. The schema field ordering at original schema is still preserved in pruned schema.
30+
* 2. The top-level fields are not pruned here.
2831
*/
2932
def pruneDataSchema(
3033
dataSchema: StructType,
@@ -34,11 +37,10 @@ object SchemaPruning extends SQLConfHelper {
3437
// in the resulting schema may differ from their ordering in the logical relation's
3538
// original schema
3639
val mergedSchema = requestedRootFields
37-
.map { case root: RootField => StructType(Array(root.field)) }
40+
.map { root: RootField => StructType(Array(root.field)) }
3841
.reduceLeft(_ merge _)
39-
val dataSchemaFieldNames = dataSchema.fieldNames.toSet
4042
val mergedDataSchema =
41-
StructType(mergedSchema.filter(f => dataSchemaFieldNames.exists(resolver(_, f.name))))
43+
StructType(dataSchema.map(d => mergedSchema.find(m => resolver(m.name, d.name)).getOrElse(d)))
4244
// Sort the fields of mergedDataSchema according to their order in dataSchema,
4345
// recursively. This makes mergedDataSchema a pruned schema of dataSchema
4446
sortLeftFieldsByRight(mergedDataSchema, dataSchema).asInstanceOf[StructType]

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruningSuite.scala

Lines changed: 76 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,39 +18,36 @@
1818
package org.apache.spark.sql.catalyst.expressions
1919

2020
import org.apache.spark.SparkFunSuite
21-
import org.apache.spark.sql.catalyst.expressions.SchemaPruning.RootField
2221
import org.apache.spark.sql.catalyst.plans.SQLHelper
2322
import org.apache.spark.sql.internal.SQLConf.CASE_SENSITIVE
2423
import org.apache.spark.sql.types._
2524

2625
class SchemaPruningSuite extends SparkFunSuite with SQLHelper {
27-
28-
def getRootFields(requestedFields: StructField*): Seq[RootField] = {
29-
requestedFields.map { f =>
26+
private def testPrunedSchema(
27+
schema: StructType,
28+
requestedFields: Seq[StructField],
29+
expectedSchema: StructType): Unit = {
30+
val requestedRootFields = requestedFields.map { f =>
3031
// `derivedFromAtt` doesn't affect the result of pruned schema.
3132
SchemaPruning.RootField(field = f, derivedFromAtt = true)
3233
}
34+
val prunedSchema = SchemaPruning.pruneDataSchema(schema, requestedRootFields)
35+
assert(prunedSchema === expectedSchema)
3336
}
3437

3538
test("prune schema by the requested fields") {
36-
def testPrunedSchema(
37-
schema: StructType,
38-
requestedFields: StructField*): Unit = {
39-
val requestedRootFields = requestedFields.map { f =>
40-
// `derivedFromAtt` doesn't affect the result of pruned schema.
41-
SchemaPruning.RootField(field = f, derivedFromAtt = true)
42-
}
43-
val expectedSchema = SchemaPruning.pruneDataSchema(schema, requestedRootFields)
44-
assert(expectedSchema == StructType(requestedFields))
45-
}
46-
47-
testPrunedSchema(StructType.fromDDL("a int, b int"), StructField("a", IntegerType))
48-
testPrunedSchema(StructType.fromDDL("a int, b int"), StructField("b", IntegerType))
39+
testPrunedSchema(
40+
StructType.fromDDL("a int, b int"),
41+
Seq(StructField("a", IntegerType)),
42+
StructType.fromDDL("a int, b int"))
4943

5044
val structOfStruct = StructType.fromDDL("a struct<a:int, b:int>, b int")
51-
testPrunedSchema(structOfStruct, StructField("a", StructType.fromDDL("a int, b int")))
52-
testPrunedSchema(structOfStruct, StructField("b", IntegerType))
53-
testPrunedSchema(structOfStruct, StructField("a", StructType.fromDDL("b int")))
45+
testPrunedSchema(structOfStruct,
46+
Seq(StructField("a", StructType.fromDDL("a int")), StructField("b", IntegerType)),
47+
StructType.fromDDL("a struct<a:int>, b int"))
48+
testPrunedSchema(structOfStruct,
49+
Seq(StructField("a", StructType.fromDDL("a int"))),
50+
StructType.fromDDL("a struct<a:int>, b int"))
5451

5552
val arrayOfStruct = StructField("a", ArrayType(StructType.fromDDL("a int, b int, c string")))
5653
val mapOfStruct = StructField("d", MapType(StructType.fromDDL("a int, b int, c string"),
@@ -60,44 +57,76 @@ class SchemaPruningSuite extends SparkFunSuite with SQLHelper {
6057
arrayOfStruct :: StructField("b", structOfStruct) :: StructField("c", IntegerType) ::
6158
mapOfStruct :: Nil)
6259

63-
testPrunedSchema(complexStruct, StructField("a", ArrayType(StructType.fromDDL("b int"))),
64-
StructField("b", StructType.fromDDL("a int")))
6560
testPrunedSchema(complexStruct,
66-
StructField("a", ArrayType(StructType.fromDDL("b int, c string"))),
67-
StructField("b", StructType.fromDDL("b int")))
61+
Seq(StructField("a", ArrayType(StructType.fromDDL("b int"))),
62+
StructField("b", StructType.fromDDL("a int"))),
63+
StructType(
64+
StructField("a", ArrayType(StructType.fromDDL("b int"))) ::
65+
StructField("b", StructType.fromDDL("a int")) ::
66+
StructField("c", IntegerType) ::
67+
mapOfStruct :: Nil))
68+
testPrunedSchema(complexStruct,
69+
Seq(StructField("a", ArrayType(StructType.fromDDL("b int, c string"))),
70+
StructField("b", StructType.fromDDL("b int"))),
71+
StructType(
72+
StructField("a", ArrayType(StructType.fromDDL("b int, c string"))) ::
73+
StructField("b", StructType.fromDDL("b int")) ::
74+
StructField("c", IntegerType) ::
75+
mapOfStruct :: Nil))
6876

6977
val selectFieldInMap = StructField("d", MapType(StructType.fromDDL("a int, b int"),
7078
StructType.fromDDL("e int, f string")))
71-
testPrunedSchema(complexStruct, StructField("c", IntegerType), selectFieldInMap)
79+
testPrunedSchema(complexStruct,
80+
Seq(StructField("c", IntegerType), selectFieldInMap),
81+
StructType(
82+
arrayOfStruct ::
83+
StructField("b", structOfStruct) ::
84+
StructField("c", IntegerType) ::
85+
selectFieldInMap :: Nil))
7286
}
7387

7488
test("SPARK-35096: test case insensitivity of pruned schema") {
75-
Seq(true, false).foreach(isCaseSensitive => {
89+
val upperCaseSchema = StructType.fromDDL("A struct<A:int, B:int>, B int")
90+
val lowerCaseSchema = StructType.fromDDL("a struct<a:int, b:int>, b int")
91+
val upperCaseRequestedFields = Seq(StructField("A", StructType.fromDDL("A int")))
92+
val lowerCaseRequestedFields = Seq(StructField("a", StructType.fromDDL("a int")))
93+
94+
Seq(true, false).foreach { isCaseSensitive =>
7695
withSQLConf(CASE_SENSITIVE.key -> isCaseSensitive.toString) {
7796
if (isCaseSensitive) {
78-
// Schema is case-sensitive
79-
val requestedFields = getRootFields(StructField("id", IntegerType))
80-
val prunedSchema = SchemaPruning.pruneDataSchema(
81-
StructType.fromDDL("ID int, name String"), requestedFields)
82-
assert(prunedSchema == StructType(Seq.empty))
83-
// Root fields are case-sensitive
84-
val rootFieldsSchema = SchemaPruning.pruneDataSchema(
85-
StructType.fromDDL("id int, name String"),
86-
getRootFields(StructField("ID", IntegerType)))
87-
assert(rootFieldsSchema == StructType(StructType(Seq.empty)))
97+
testPrunedSchema(
98+
upperCaseSchema,
99+
upperCaseRequestedFields,
100+
StructType.fromDDL("A struct<A:int>, B int"))
101+
testPrunedSchema(
102+
upperCaseSchema,
103+
lowerCaseRequestedFields,
104+
upperCaseSchema)
105+
106+
testPrunedSchema(
107+
lowerCaseSchema,
108+
upperCaseRequestedFields,
109+
lowerCaseSchema)
110+
testPrunedSchema(
111+
lowerCaseSchema,
112+
lowerCaseRequestedFields,
113+
StructType.fromDDL("a struct<a:int>, b int"))
88114
} else {
89-
// Schema is case-insensitive
90-
val prunedSchema = SchemaPruning.pruneDataSchema(
91-
StructType.fromDDL("ID int, name String"),
92-
getRootFields(StructField("id", IntegerType)))
93-
assert(prunedSchema == StructType(StructField("ID", IntegerType) :: Nil))
94-
// Root fields are case-insensitive
95-
val rootFieldsSchema = SchemaPruning.pruneDataSchema(
96-
StructType.fromDDL("id int, name String"),
97-
getRootFields(StructField("ID", IntegerType)))
98-
assert(rootFieldsSchema == StructType(StructField("id", IntegerType) :: Nil))
115+
Seq(upperCaseRequestedFields, lowerCaseRequestedFields).foreach { requestedFields =>
116+
testPrunedSchema(
117+
upperCaseSchema,
118+
requestedFields,
119+
StructType.fromDDL("A struct<A:int>, B int"))
120+
}
121+
122+
Seq(upperCaseRequestedFields, lowerCaseRequestedFields).foreach { requestedFields =>
123+
testPrunedSchema(
124+
lowerCaseSchema,
125+
requestedFields,
126+
StructType.fromDDL("a struct<a:int>, b int"))
127+
}
99128
}
100129
}
101-
})
130+
}
102131
}
103132
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import org.apache.spark.sql.types.{StructField, StructType}
4040
case class HadoopFsRelation(
4141
location: FileIndex,
4242
partitionSchema: StructType,
43+
// The top-level columns in `dataSchema` should match the actual physical file schema, otherwise
44+
// the ORC data source may not work with the by-ordinal mode.
4345
dataSchema: StructType,
4446
bucketSpec: Option[BucketSpec],
4547
fileFormat: FileFormat,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ object PushDownUtils extends PredicateHelper {
8181
relation: DataSourceV2Relation,
8282
projects: Seq[NamedExpression],
8383
filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
84+
val exprs = projects ++ filters
85+
val requiredColumns = AttributeSet(exprs.flatMap(_.references))
86+
val neededOutput = relation.output.filter(requiredColumns.contains)
87+
8488
scanBuilder match {
8589
case r: SupportsPushDownRequiredColumns if SQLConf.get.nestedSchemaPruningEnabled =>
8690
val rootFields = SchemaPruning.identifyRootFields(projects, filters)
@@ -89,14 +93,12 @@ object PushDownUtils extends PredicateHelper {
8993
} else {
9094
new StructType()
9195
}
92-
r.pruneColumns(prunedSchema)
96+
val neededFieldNames = neededOutput.map(_.name).toSet
97+
r.pruneColumns(StructType(prunedSchema.filter(f => neededFieldNames.contains(f.name))))
9398
val scan = r.build()
9499
scan -> toOutputAttrs(scan.readSchema(), relation)
95100

96101
case r: SupportsPushDownRequiredColumns =>
97-
val exprs = projects ++ filters
98-
val requiredColumns = AttributeSet(exprs.flatMap(_.references))
99-
val neededOutput = relation.output.filter(requiredColumns.contains)
100102
r.pruneColumns(neededOutput.toStructType)
101103
val scan = r.build()
102104
// always project, in case the relation's output has been updated and doesn't match

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,4 +633,20 @@ class OrcSourceSuite extends OrcSuite with SharedSparkSession {
633633
}
634634
}
635635
}
636+
637+
test("SPARK-34897: Support reconcile schemas based on index after nested column pruning") {
638+
withTable("t1") {
639+
spark.sql(
640+
"""
641+
|CREATE TABLE t1 (
642+
| _col0 INT,
643+
| _col1 STRING,
644+
| _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
645+
|USING ORC
646+
|""".stripMargin)
647+
648+
spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))")
649+
checkAnswer(spark.sql("SELECT _col0, _col2.c1 FROM t1"), Seq(Row(1, "a")))
650+
}
651+
}
636652
}

0 commit comments

Comments
 (0)