-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-42745][SQL] Improved AliasAwareOutputExpression works with DSv2 #40364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-42745][SQL] Improved AliasAwareOutputExpression works with DSv2 #40364
Conversation
18309de
to
408770b
Compare
@xinrong-meng, this seems to be a regression from 3.3 to 3.4 so could you please wait with 3.4.0-RC4 if possible? |
override def equals(other: Any): Boolean = other match { | ||
case other: BatchScanExec => | ||
this.batch == other.batch && this.runtimeFilters == other.runtimeFilters && | ||
this.batch != null && this.batch == other.batch && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to not consider BatchScanExec
s equal when batch
is not available.
it seems to me that:
|
Oh ok, then we see the right fix a bit differently then.
? |
Unfortunately v3.4.0-rc4 has been cut. Let's ensure it to be in RC5. |
@peter-toth nvm, skip adding PlanExpression and make it as immutable are kinds of |
thanks, merging to master/3.4! |
### What changes were proposed in this pull request? After #37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query: ``` select (select sum(id) from t1) ``` fails with: ``` 09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.NullPointerException at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60) at scala.runtime.Statics.anyHash(Statics.java:122) ... at org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249) at scala.runtime.Statics.anyHash(Statics.java:122) at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416) at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416) at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44) at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149) at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148) at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44) at scala.collection.mutable.HashTable.init(HashTable.scala:110) at scala.collection.mutable.HashTable.init$(HashTable.scala:89) at scala.collection.mutable.HashMap.init(HashMap.scala:44) at scala.collection.mutable.HashMap.readObject(HashMap.scala:195) ... at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` when DSv2 is enabled. This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` as those shouldn't throw NPE in any circumstances. But if we dig deeper we realize that the NPE orrurs since #37525 and the root cause of the problem is changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The mutable map deserialization invokes the `hashCode()` of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains the `BatchScanExec`. Please note that the mutability of `aliasMap` shouldn't be an issue as it is a `private` field of `AliasAwareOutputExpression` (though adding a simple `.toMap` would also help to avoid the NPE). Based on the above findings this PR also proposes making `aliasMap` to transient as it isn't needed on executors. A side quiestion is if adding any subqery expressions to `AliasAwareOutputExpression.aliasMap` makes any sense because `AliasAwareOutputExpression.projectExpression()` mainly projects `child.outputPartitioning` and `child.outputOrdering` that can't contain subquery expressions. But there are a few exceptions (`SortAggregateExec`, `TakeOrderedAndProjectExec`) where `AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the `child` and actually leaving those expressions in the map doesn't do any harm. ### Why are the changes needed? To fix regression introduced with #37525. ### Does this PR introduce _any_ user-facing change? Yes, the query works again. ### How was this patch tested? Added new UT. Closes #40364 from peter-toth/SPARK-42745-improved-aliasawareoutputexpression-with-dsv2. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 93d5816) Signed-off-by: Wenchen Fan <[email protected]>
Thanks for the review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you all.
### What changes were proposed in this pull request? After apache#37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query: ``` select (select sum(id) from t1) ``` fails with: ``` 09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.NullPointerException at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60) at scala.runtime.Statics.anyHash(Statics.java:122) ... at org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249) at scala.runtime.Statics.anyHash(Statics.java:122) at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416) at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416) at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44) at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149) at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148) at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44) at scala.collection.mutable.HashTable.init(HashTable.scala:110) at scala.collection.mutable.HashTable.init$(HashTable.scala:89) at scala.collection.mutable.HashMap.init(HashMap.scala:44) at scala.collection.mutable.HashMap.readObject(HashMap.scala:195) ... at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` when DSv2 is enabled. This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` as those shouldn't throw NPE in any circumstances. But if we dig deeper we realize that the NPE orrurs since apache#37525 and the root cause of the problem is changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The mutable map deserialization invokes the `hashCode()` of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains the `BatchScanExec`. Please note that the mutability of `aliasMap` shouldn't be an issue as it is a `private` field of `AliasAwareOutputExpression` (though adding a simple `.toMap` would also help to avoid the NPE). Based on the above findings this PR also proposes making `aliasMap` to transient as it isn't needed on executors. A side quiestion is if adding any subqery expressions to `AliasAwareOutputExpression.aliasMap` makes any sense because `AliasAwareOutputExpression.projectExpression()` mainly projects `child.outputPartitioning` and `child.outputOrdering` that can't contain subquery expressions. But there are a few exceptions (`SortAggregateExec`, `TakeOrderedAndProjectExec`) where `AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the `child` and actually leaving those expressions in the map doesn't do any harm. ### Why are the changes needed? To fix regression introduced with apache#37525. ### Does this PR introduce _any_ user-facing change? Yes, the query works again. ### How was this patch tested? Added new UT. Closes apache#40364 from peter-toth/SPARK-42745-improved-aliasawareoutputexpression-with-dsv2. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 93d5816) Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
After #37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query:
fails with:
when DSv2 is enabled.
This PR proposes to fix
BatchScanExec
as itsequals()
andhashCode()
as those shouldn't throw NPE in any circumstances.But if we dig deeper we realize that the NPE orrurs since #37525 and the root cause of the problem is changing
AliasAwareOutputExpression.aliasMap
from immutable to mutable. The mutable map deserialization invokes thehashCode()
of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains theBatchScanExec
.Please note that the mutability of
aliasMap
shouldn't be an issue as it is aprivate
field ofAliasAwareOutputExpression
(though adding a simple.toMap
would also help to avoid the NPE).Based on the above findings this PR also proposes making
aliasMap
to transient as it isn't needed on executors.A side quiestion is if adding any subqery expressions to
AliasAwareOutputExpression.aliasMap
makes any sense becauseAliasAwareOutputExpression.projectExpression()
mainly projectschild.outputPartitioning
andchild.outputOrdering
that can't contain subquery expressions. But there are a few exceptions (SortAggregateExec
,TakeOrderedAndProjectExec
) whereAliasAwareQueryOutputOrdering.orderingExpressions
doesn't come from thechild
and actually leaving those expressions in the map doesn't do any harm.Why are the changes needed?
To fix regression introduced with #37525.
Does this PR introduce any user-facing change?
Yes, the query works again.
How was this patch tested?
Added new UT.