Skip to content

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Mar 10, 2023

What changes were proposed in this pull request?

After #37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query:

select (select sum(id) from t1)

fails with:

09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60)
	at scala.runtime.Statics.anyHash(Statics.java:122)
        ...
	at org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249)
	at scala.runtime.Statics.anyHash(Statics.java:122)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416)
	at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44)
	at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149)
	at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148)
	at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44)
	at scala.collection.mutable.HashTable.init(HashTable.scala:110)
	at scala.collection.mutable.HashTable.init$(HashTable.scala:89)
	at scala.collection.mutable.HashMap.init(HashMap.scala:44)
	at scala.collection.mutable.HashMap.readObject(HashMap.scala:195)
        ...
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

when DSv2 is enabled.

This PR proposes to fix BatchScanExec as its equals() and hashCode() as those shouldn't throw NPE in any circumstances.

But if we dig deeper we realize that the NPE orrurs since #37525 and the root cause of the problem is changing AliasAwareOutputExpression.aliasMap from immutable to mutable. The mutable map deserialization invokes the hashCode() of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains the BatchScanExec.
Please note that the mutability of aliasMap shouldn't be an issue as it is a private field of AliasAwareOutputExpression (though adding a simple .toMap would also help to avoid the NPE).
Based on the above findings this PR also proposes making aliasMap to transient as it isn't needed on executors.

A side quiestion is if adding any subqery expressions to AliasAwareOutputExpression.aliasMap makes any sense because AliasAwareOutputExpression.projectExpression() mainly projects child.outputPartitioning and child.outputOrdering that can't contain subquery expressions. But there are a few exceptions (SortAggregateExec, TakeOrderedAndProjectExec) where AliasAwareQueryOutputOrdering.orderingExpressions doesn't come from the child and actually leaving those expressions in the map doesn't do any harm.

Why are the changes needed?

To fix regression introduced with #37525.

Does this PR introduce any user-facing change?

Yes, the query works again.

How was this patch tested?

Added new UT.

@github-actions github-actions bot added the SQL label Mar 10, 2023
@peter-toth peter-toth force-pushed the SPARK-42745-improved-aliasawareoutputexpression-with-dsv2 branch from 18309de to 408770b Compare March 10, 2023 09:55
@peter-toth
Copy link
Contributor Author

cc @cloud-fan, @ulysses-you

@xinrong-meng, this seems to be a regression from 3.3 to 3.4 so could you please wait with 3.4.0-RC4 if possible?

override def equals(other: Any): Boolean = other match {
case other: BatchScanExec =>
this.batch == other.batch && this.runtimeFilters == other.runtimeFilters &&
this.batch != null && this.batch == other.batch &&
Copy link
Contributor Author

@peter-toth peter-toth Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to not consider BatchScanExecs equal when batch is not available.

@ulysses-you
Copy link
Contributor

it seems to me that:

  1. we should skip add PlanExpression into aliasMap
  2. make aliasMap as immutable, and use a local mutable map, then merge them after done multiTransform

@peter-toth
Copy link
Contributor Author

it seems to me that:

  1. we should skip add PlanExpression into aliasMap
  2. make aliasMap as immutable, and use a local mutable map, then merge them after done multiTransform

Oh ok, then we see the right fix a bit differently then.
But I'm not entirelly sure I get 2. We can make aliasMap immutable, by just adding a .toMap to here: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala#L61. But what do you mean by

and use a local mutable map, then merge them after done multiTransform

?

@xinrong-meng
Copy link
Member

Unfortunately v3.4.0-rc4 has been cut. Let's ensure it to be in RC5.

@ulysses-you
Copy link
Contributor

@peter-toth nvm, skip adding PlanExpression and make it as immutable are kinds of improvement, the fix in this pr looks good

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.4!

@cloud-fan cloud-fan closed this in 93d5816 Mar 10, 2023
cloud-fan pushed a commit that referenced this pull request Mar 10, 2023
### What changes were proposed in this pull request?

After #37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query:
```
select (select sum(id) from t1)
```
fails with:
```
09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60)
	at scala.runtime.Statics.anyHash(Statics.java:122)
        ...
	at org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249)
	at scala.runtime.Statics.anyHash(Statics.java:122)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416)
	at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44)
	at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149)
	at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148)
	at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44)
	at scala.collection.mutable.HashTable.init(HashTable.scala:110)
	at scala.collection.mutable.HashTable.init$(HashTable.scala:89)
	at scala.collection.mutable.HashMap.init(HashMap.scala:44)
	at scala.collection.mutable.HashMap.readObject(HashMap.scala:195)
        ...
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
```
when DSv2 is enabled.

This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` as those shouldn't throw NPE in any circumstances.

But if we dig deeper we realize that the NPE orrurs since #37525 and the root cause of the problem is changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The mutable map deserialization invokes the `hashCode()` of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains the `BatchScanExec`.
Please note that the mutability of `aliasMap` shouldn't be an issue as it is a `private` field of `AliasAwareOutputExpression` (though adding a simple `.toMap` would also help to avoid the NPE).
Based on the above findings this PR also proposes making `aliasMap` to transient as it isn't needed on executors.

A side quiestion is if adding any subqery expressions to `AliasAwareOutputExpression.aliasMap` makes any sense because `AliasAwareOutputExpression.projectExpression()` mainly projects `child.outputPartitioning` and `child.outputOrdering` that can't contain subquery expressions. But there are a few exceptions (`SortAggregateExec`, `TakeOrderedAndProjectExec`) where `AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the `child` and actually leaving those expressions in the map doesn't do any harm.

### Why are the changes needed?
To fix regression introduced with #37525.

### Does this PR introduce _any_ user-facing change?
Yes, the query works again.

### How was this patch tested?
Added new UT.

Closes #40364 from peter-toth/SPARK-42745-improved-aliasawareoutputexpression-with-dsv2.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 93d5816)
Signed-off-by: Wenchen Fan <[email protected]>
@peter-toth
Copy link
Contributor Author

Thanks for the review!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you all.

snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
### What changes were proposed in this pull request?

After apache#37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query:
```
select (select sum(id) from t1)
```
fails with:
```
09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60)
	at scala.runtime.Statics.anyHash(Statics.java:122)
        ...
	at org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249)
	at scala.runtime.Statics.anyHash(Statics.java:122)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416)
	at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44)
	at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149)
	at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148)
	at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44)
	at scala.collection.mutable.HashTable.init(HashTable.scala:110)
	at scala.collection.mutable.HashTable.init$(HashTable.scala:89)
	at scala.collection.mutable.HashMap.init(HashMap.scala:44)
	at scala.collection.mutable.HashMap.readObject(HashMap.scala:195)
        ...
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
```
when DSv2 is enabled.

This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` as those shouldn't throw NPE in any circumstances.

But if we dig deeper we realize that the NPE orrurs since apache#37525 and the root cause of the problem is changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The mutable map deserialization invokes the `hashCode()` of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains the `BatchScanExec`.
Please note that the mutability of `aliasMap` shouldn't be an issue as it is a `private` field of `AliasAwareOutputExpression` (though adding a simple `.toMap` would also help to avoid the NPE).
Based on the above findings this PR also proposes making `aliasMap` to transient as it isn't needed on executors.

A side quiestion is if adding any subqery expressions to `AliasAwareOutputExpression.aliasMap` makes any sense because `AliasAwareOutputExpression.projectExpression()` mainly projects `child.outputPartitioning` and `child.outputOrdering` that can't contain subquery expressions. But there are a few exceptions (`SortAggregateExec`, `TakeOrderedAndProjectExec`) where `AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the `child` and actually leaving those expressions in the map doesn't do any harm.

### Why are the changes needed?
To fix regression introduced with apache#37525.

### Does this PR introduce _any_ user-facing change?
Yes, the query works again.

### How was this patch tested?
Added new UT.

Closes apache#40364 from peter-toth/SPARK-42745-improved-aliasawareoutputexpression-with-dsv2.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 93d5816)
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants