Skip to content

Conversation

@Hisoka-X
Copy link
Member

@Hisoka-X Hisoka-X commented Jun 12, 2023

What changes were proposed in this pull request?

When cogroup two datasets derived from same source, eg:

val inputType = StructType(Array(StructField("id", LongType, false),
   StructField("type", StringType, false)))
val keyType = StructType(Array(StructField("id", LongType, false)))

val inputRows = new java.util.ArrayList[Row]()
inputRows.add(Row(1L, "foo"))
inputRows.add(Row(1L, "bar"))
inputRows.add(Row(2L, "foo"))
val input = spark.createDataFrame(inputRows, inputType)
val fooGroups = input.filter("type = 'foo'").groupBy("id").as(RowEncoder(keyType),
   RowEncoder(inputType))
val barGroups = input.filter("type = 'bar'").groupBy("id").as(RowEncoder(keyType),
   RowEncoder(inputType))

val result = fooGroups.cogroup(barGroups) { case (row, iterator, iterator1) =>
   iterator.toSeq ++ iterator1.toSeq
}(RowEncoder(inputType)).collect()

The error will be reported:

21:03:27.651 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.IllegalStateException: Couldn't find id#19L in [id#0L,type#1]
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
	...

The reason are DeduplicateRelations rewrite LocalRelation but can't rewrite left(right)Group and left(right)Attr in CoGroup. In fact, the Join will face same situation. But Join regenerate plan when invoke itself to avoid this situation. Please refer https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1089

This PR let DeduplicateRelations handle with CoGroup case

Why are the changes needed?

Fix IllegalStateException when cogrouping two datasets derived from the same source

Does this PR introduce any user-facing change?

No

How was this patch tested?

Add new test

@github-actions github-actions bot added the SQL label Jun 12, 2023
@Hisoka-X
Copy link
Member Author

Hisoka-X commented Jun 12, 2023

The logical plan before this PR. The left(right)Group and left(right)Attr in CoGroup not be rewrited.

== Analyzed Logical Plan ==
id: bigint, type: string
SerializeFromObject [assertnotnull(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), LongType, ObjectType(class java.lang.Long)).longValue) AS id#17L, assertnotnull(staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, type), StringType, ObjectType(class java.lang.String)), true, false, true)) AS type#18]
+- CoGroup org.apache.spark.sql.DatasetSuite$$Lambda$1658/631349266@5aa62ee7, createexternalrow(staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true, false, true), StructField(id,LongType,false)), createexternalrow(staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true, false, true), type#1.toString, StructField(id,LongType,false), StructField(type,StringType,false)), createexternalrow(staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true, false, true), type#1.toString, StructField(id,LongType,false), StructField(type,StringType,false)), [id#19L], [id#19L], [id#19L, type#20], [id#19L, type#20], obj#16: org.apache.spark.sql.Row
   :- Project [id#0L, type#1]
   :  +- Filter (type#1 = foo)
   :     +- LocalRelation [id#0L, type#1]
   +- Project [id#19L, type#20]
      +- Filter (type#20 = bar)
         +- LocalRelation [id#19L, type#20]

@Hisoka-X Hisoka-X force-pushed the SPARK-43781_cogrouping_two_datasets branch 2 times, most recently from 00b8dbf to 0d7cd2f Compare June 13, 2023 04:34
@Hisoka-X
Copy link
Member Author

cc @cloud-fan @dongjoon-hyun

@cloud-fan
Copy link
Contributor

can we make DeduplicateRelations handle CoGroup?

@Hisoka-X
Copy link
Member Author

can we make DeduplicateRelations handle CoGroup?

Ok for me, before do this, please check #41347

@Hisoka-X Hisoka-X force-pushed the SPARK-43781_cogrouping_two_datasets branch from 0d7cd2f to 83c1382 Compare July 11, 2023 04:10
@Hisoka-X
Copy link
Member Author

Hi @cloud-fan , I let DeduplicateRelations handle CoGroup at now. Please recheck again. Thanks!

@Hisoka-X Hisoka-X force-pushed the SPARK-43781_cogrouping_two_datasets branch from 2ad8493 to d45eab2 Compare July 20, 2023 02:26
@Hisoka-X
Copy link
Member Author

Hisoka-X commented Aug 7, 2023

kindly ping @cloud-fan

case c: CoGroup =>
// SPARK-43781: CoGroup is a special case, as it has different output attributes
// from its children. We need to update the output attributes of CoGroup manually.
val newLeftAttr = c.leftAttr.map(attr => attrMap.getOrElse(attr, attr))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the left-right attributes does not need to call rewriteAttrsMatchWithSubPlan?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just changed by review #41554 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the same apple to newRightAttr?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and to left/right group?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the same apple to newRightAttr?

In fact it will be more safer, but I can't produce any negetive case.

and to left/right group?

what's meaning? Sorry I don't get it. The left/right group already apply rewriteAttrsMatchWithSubPlan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, if you mean the leftAttr should same to rightAttr and left/right group. I would say yes. Maybe we should add rewriteAttrsMatchWithSubPlan to newLeftAttr. After code refactoring, invoke rewriteAttrsMatchWithSubPlan not a big deal. So I add it back.


planWithNewChildren match {
case c: CoGroup =>
// SPARK-43781: CoGroup is a special case, as it has different output attributes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "it has different output attributes"?

There are similar cases where it creates new output attributes (not from the child): MapPartitionsInR, MapPartitionsInRWithArrow, MapElements, MapGroups, FlatMapGroupsWithState, FlatMapGroupsInR, FlatMapGroupsInR, FlatMapGroupsInRWithArrow, FlatMapGroupsInPandas, MapInPandas, PythonMapInArrow, and FlatMapGroupsInPandasWithState.

There's another CoGroup operation in Python (cogroup.apply() that is FlatMapCoGroupsInPandas). Would be great to double check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "it has different output attributes"?

The describe not clear, let me update it.

There are similar cases where it creates new output attributes (not from the child): MapPartitionsInR, MapPartitionsInRWithArrow, MapElements, MapGroups, FlatMapGroupsWithState, FlatMapGroupsInR, FlatMapGroupsInR, FlatMapGroupsInRWithArrow, FlatMapGroupsInPandas, MapInPandas, PythonMapInArrow, and FlatMapGroupsInPandasWithState.
There's another CoGroup operation in Python (cogroup.apply() that is FlatMapCoGroupsInPandas). Would be great to double check.

Can I do it on another PR? Seem like produce negative case will use some time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's fine to do them as a followup.

@cloud-fan
Copy link
Contributor

cloud-fan commented Aug 10, 2023

thanks, merging to master!

@cloud-fan cloud-fan closed this in 5db8778 Aug 10, 2023
@Hisoka-X
Copy link
Member Author

Thanks @cloud-fan @HyukjinKwon

@Hisoka-X Hisoka-X deleted the SPARK-43781_cogrouping_two_datasets branch August 11, 2023 00:48
@Hisoka-X
Copy link
Member Author

Hi @cloud-fan , seem like the jira ticket not update. https://issues.apache.org/jira/browse/SPARK-43781

hvanhovell pushed a commit to hvanhovell/spark that referenced this pull request Aug 13, 2023
…sets derived from the same source

### What changes were proposed in this pull request?
When cogroup two datasets derived from same source, eg:
```scala
val inputType = StructType(Array(StructField("id", LongType, false),
   StructField("type", StringType, false)))
val keyType = StructType(Array(StructField("id", LongType, false)))

val inputRows = new java.util.ArrayList[Row]()
inputRows.add(Row(1L, "foo"))
inputRows.add(Row(1L, "bar"))
inputRows.add(Row(2L, "foo"))
val input = spark.createDataFrame(inputRows, inputType)
val fooGroups = input.filter("type = 'foo'").groupBy("id").as(RowEncoder(keyType),
   RowEncoder(inputType))
val barGroups = input.filter("type = 'bar'").groupBy("id").as(RowEncoder(keyType),
   RowEncoder(inputType))

val result = fooGroups.cogroup(barGroups) { case (row, iterator, iterator1) =>
   iterator.toSeq ++ iterator1.toSeq
}(RowEncoder(inputType)).collect()
```
The error will be reported:
```
21:03:27.651 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.IllegalStateException: Couldn't find id#19L in [id#0L,type#1]
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
	...
```
The reason are `DeduplicateRelations` rewrite `LocalRelation` but can't rewrite `left(right)Group` and `left(right)Attr` in `CoGroup`. In fact, the `Join` will face same situation. But `Join` regenerate plan when invoke itself to avoid this situation. Please refer https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1089

This PR let `DeduplicateRelations` handle with `CoGroup` case

### Why are the changes needed?
Fix IllegalStateException when cogrouping two datasets derived from the same source

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new test

Closes apache#41554 from Hisoka-X/SPARK-43781_cogrouping_two_datasets.

Authored-by: Jia Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
valentinp17 pushed a commit to valentinp17/spark that referenced this pull request Aug 24, 2023
…sets derived from the same source

### What changes were proposed in this pull request?
When cogroup two datasets derived from same source, eg:
```scala
val inputType = StructType(Array(StructField("id", LongType, false),
   StructField("type", StringType, false)))
val keyType = StructType(Array(StructField("id", LongType, false)))

val inputRows = new java.util.ArrayList[Row]()
inputRows.add(Row(1L, "foo"))
inputRows.add(Row(1L, "bar"))
inputRows.add(Row(2L, "foo"))
val input = spark.createDataFrame(inputRows, inputType)
val fooGroups = input.filter("type = 'foo'").groupBy("id").as(RowEncoder(keyType),
   RowEncoder(inputType))
val barGroups = input.filter("type = 'bar'").groupBy("id").as(RowEncoder(keyType),
   RowEncoder(inputType))

val result = fooGroups.cogroup(barGroups) { case (row, iterator, iterator1) =>
   iterator.toSeq ++ iterator1.toSeq
}(RowEncoder(inputType)).collect()
```
The error will be reported:
```
21:03:27.651 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.IllegalStateException: Couldn't find id#19L in [id#0L,type#1]
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
	...
```
The reason are `DeduplicateRelations` rewrite `LocalRelation` but can't rewrite `left(right)Group` and `left(right)Attr` in `CoGroup`. In fact, the `Join` will face same situation. But `Join` regenerate plan when invoke itself to avoid this situation. Please refer https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1089

This PR let `DeduplicateRelations` handle with `CoGroup` case

### Why are the changes needed?
Fix IllegalStateException when cogrouping two datasets derived from the same source

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new test

Closes apache#41554 from Hisoka-X/SPARK-43781_cogrouping_two_datasets.

Authored-by: Jia Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants