-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-43781][SQL] Fix IllegalStateException when cogrouping two datasets derived from the same source #41554
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-43781][SQL] Fix IllegalStateException when cogrouping two datasets derived from the same source #41554
Conversation
|
The logical plan before this PR. The |
00b8dbf to
0d7cd2f
Compare
|
can we make DeduplicateRelations handle |
Ok for me, before do this, please check #41347 |
0d7cd2f to
83c1382
Compare
|
Hi @cloud-fan , I let |
…sets derived from the same source
2ad8493 to
d45eab2
Compare
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Outdated
Show resolved
Hide resolved
|
kindly ping @cloud-fan |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Outdated
Show resolved
Hide resolved
| case c: CoGroup => | ||
| // SPARK-43781: CoGroup is a special case, as it has different output attributes | ||
| // from its children. We need to update the output attributes of CoGroup manually. | ||
| val newLeftAttr = c.leftAttr.map(attr => attrMap.getOrElse(attr, attr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the left-right attributes does not need to call rewriteAttrsMatchWithSubPlan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just changed by review #41554 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the same apple to newRightAttr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and to left/right group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the same apple to newRightAttr?
In fact it will be more safer, but I can't produce any negetive case.
and to left/right group?
what's meaning? Sorry I don't get it. The left/right group already apply rewriteAttrsMatchWithSubPlan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, if you mean the leftAttr should same to rightAttr and left/right group. I would say yes. Maybe we should add rewriteAttrsMatchWithSubPlan to newLeftAttr. After code refactoring, invoke rewriteAttrsMatchWithSubPlan not a big deal. So I add it back.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Outdated
Show resolved
Hide resolved
|
|
||
| planWithNewChildren match { | ||
| case c: CoGroup => | ||
| // SPARK-43781: CoGroup is a special case, as it has different output attributes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "it has different output attributes"?
There are similar cases where it creates new output attributes (not from the child): MapPartitionsInR, MapPartitionsInRWithArrow, MapElements, MapGroups, FlatMapGroupsWithState, FlatMapGroupsInR, FlatMapGroupsInR, FlatMapGroupsInRWithArrow, FlatMapGroupsInPandas, MapInPandas, PythonMapInArrow, and FlatMapGroupsInPandasWithState.
There's another CoGroup operation in Python (cogroup.apply() that is FlatMapCoGroupsInPandas). Would be great to double check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "it has different output attributes"?
The describe not clear, let me update it.
There are similar cases where it creates new output attributes (not from the child): MapPartitionsInR, MapPartitionsInRWithArrow, MapElements, MapGroups, FlatMapGroupsWithState, FlatMapGroupsInR, FlatMapGroupsInR, FlatMapGroupsInRWithArrow, FlatMapGroupsInPandas, MapInPandas, PythonMapInArrow, and FlatMapGroupsInPandasWithState.
There's another CoGroup operation in Python (cogroup.apply() that is FlatMapCoGroupsInPandas). Would be great to double check.
Can I do it on another PR? Seem like produce negative case will use some time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, that's fine to do them as a followup.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
Show resolved
Hide resolved
|
thanks, merging to master! |
|
Thanks @cloud-fan @HyukjinKwon |
|
Hi @cloud-fan , seem like the jira ticket not update. https://issues.apache.org/jira/browse/SPARK-43781 |
…sets derived from the same source
### What changes were proposed in this pull request?
When cogroup two datasets derived from same source, eg:
```scala
val inputType = StructType(Array(StructField("id", LongType, false),
StructField("type", StringType, false)))
val keyType = StructType(Array(StructField("id", LongType, false)))
val inputRows = new java.util.ArrayList[Row]()
inputRows.add(Row(1L, "foo"))
inputRows.add(Row(1L, "bar"))
inputRows.add(Row(2L, "foo"))
val input = spark.createDataFrame(inputRows, inputType)
val fooGroups = input.filter("type = 'foo'").groupBy("id").as(RowEncoder(keyType),
RowEncoder(inputType))
val barGroups = input.filter("type = 'bar'").groupBy("id").as(RowEncoder(keyType),
RowEncoder(inputType))
val result = fooGroups.cogroup(barGroups) { case (row, iterator, iterator1) =>
iterator.toSeq ++ iterator1.toSeq
}(RowEncoder(inputType)).collect()
```
The error will be reported:
```
21:03:27.651 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.IllegalStateException: Couldn't find id#19L in [id#0L,type#1]
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
...
```
The reason are `DeduplicateRelations` rewrite `LocalRelation` but can't rewrite `left(right)Group` and `left(right)Attr` in `CoGroup`. In fact, the `Join` will face same situation. But `Join` regenerate plan when invoke itself to avoid this situation. Please refer https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1089
This PR let `DeduplicateRelations` handle with `CoGroup` case
### Why are the changes needed?
Fix IllegalStateException when cogrouping two datasets derived from the same source
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add new test
Closes apache#41554 from Hisoka-X/SPARK-43781_cogrouping_two_datasets.
Authored-by: Jia Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…sets derived from the same source
### What changes were proposed in this pull request?
When cogroup two datasets derived from same source, eg:
```scala
val inputType = StructType(Array(StructField("id", LongType, false),
StructField("type", StringType, false)))
val keyType = StructType(Array(StructField("id", LongType, false)))
val inputRows = new java.util.ArrayList[Row]()
inputRows.add(Row(1L, "foo"))
inputRows.add(Row(1L, "bar"))
inputRows.add(Row(2L, "foo"))
val input = spark.createDataFrame(inputRows, inputType)
val fooGroups = input.filter("type = 'foo'").groupBy("id").as(RowEncoder(keyType),
RowEncoder(inputType))
val barGroups = input.filter("type = 'bar'").groupBy("id").as(RowEncoder(keyType),
RowEncoder(inputType))
val result = fooGroups.cogroup(barGroups) { case (row, iterator, iterator1) =>
iterator.toSeq ++ iterator1.toSeq
}(RowEncoder(inputType)).collect()
```
The error will be reported:
```
21:03:27.651 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.IllegalStateException: Couldn't find id#19L in [id#0L,type#1]
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
...
```
The reason are `DeduplicateRelations` rewrite `LocalRelation` but can't rewrite `left(right)Group` and `left(right)Attr` in `CoGroup`. In fact, the `Join` will face same situation. But `Join` regenerate plan when invoke itself to avoid this situation. Please refer https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1089
This PR let `DeduplicateRelations` handle with `CoGroup` case
### Why are the changes needed?
Fix IllegalStateException when cogrouping two datasets derived from the same source
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add new test
Closes apache#41554 from Hisoka-X/SPARK-43781_cogrouping_two_datasets.
Authored-by: Jia Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
When cogroup two datasets derived from same source, eg:
The error will be reported:
The reason are
DeduplicateRelationsrewriteLocalRelationbut can't rewriteleft(right)Groupandleft(right)AttrinCoGroup. In fact, theJoinwill face same situation. ButJoinregenerate plan when invoke itself to avoid this situation. Please refer https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1089This PR let
DeduplicateRelationshandle withCoGroupcaseWhy are the changes needed?
Fix IllegalStateException when cogrouping two datasets derived from the same source
Does this PR introduce any user-facing change?
No
How was this patch tested?
Add new test