Skip to content

Conversation

benrobby
Copy link

@benrobby benrobby commented Aug 18, 2025

What changes were proposed in this pull request?

Make PullOutNonDeterministic use canonicalized expressions to dedup group and aggregate expressions. This affects pyspark udfs in particular. Example:

from pyspark.sql.functions import col, avg, udf

pythonUDF = udf(lambda x: x).asNondeterministic()

spark.range(10)\
.selectExpr("id", "id % 3 as value")\
.groupBy(pythonUDF(col("value")))\
.agg(avg("id"), pythonUDF(col("value")))\
.explain(extended=True)

Currently results in a plan like this:

Aggregate [_nondeterministic#15](#15), [_nondeterministic#15 AS dummyNondeterministicUDF(value)#12, avg(id#0L) AS avg(id)#13, dummyNondeterministicUDF(value#6L)#8 AS dummyNondeterministicUDF(value)#14](#15%20AS%20dummyNondeterministicUDF(value)#12,%20avg(id#0L)%20AS%20avg(id)#13,%20dummyNondeterministicUDF(value#6L)#8%20AS%20dummyNondeterministicUDF(value)#14)
+- Project [id#0L, value#6L, dummyNondeterministicUDF(value#6L)#7 AS _nondeterministic#15](#0L,%20value#6L,%20dummyNondeterministicUDF(value#6L)#7%20AS%20_nondeterministic#15)
   +- Project [id#0L, (id#0L % cast(3 as bigint)) AS value#6L](#0L,%20(id#0L%20%%20cast(3%20as%20bigint))%20AS%20value#6L)
      +- Range (0, 10, step=1, splits=Some(2))

and then it throws:

[[MISSING_AGGREGATION] The non-aggregating expression "value" is based on columns which are not participating in the GROUP BY clause. Add the columns or the expression to the GROUP BY, aggregate the expression, or use "any_value(value)" if you do not care which of the values within a group is returned. SQLSTATE: 42803
  • how canonicalized fixes this:
    • nondeterministic PythonUDF expressions always have distinct resultIds per udf
    • The fix is to canonicalize the expressions when matching. Canonicalized means that we're setting the resultIds to -1, allowing us to dedup the PythonUDF expressions.
  • for deterministic UDFs, this rule does not apply and "Post Analysis" batch extracts and deduplicates the expressions, as expected

Why are the changes needed?

  • the output of the query with the fix applied still makes sense - the nondeterministic UDF is invoked only once, in the project.

Does this PR introduce any user-facing change?

Yes, it's additive, it enables queries to run that previously threw errors.

How was this patch tested?

  • added unit test

Was this patch authored or co-authored using generative AI tooling?

No

@benrobby benrobby changed the title [SPARK-53311] make pulloutnondeterministic use canonicalized expressions [SPARK-53311][SQL][PYTHON][CORE] make PullOutNonDeterministic use canonicalized expressions Aug 18, 2025
@benrobby
Copy link
Author

Hi @cloud-fan, could you take a look?

val newChild = Project(a.child.output ++ nondeterToAttr.values.asScala.toSeq, a.child)
val deterministicAggregate = a.transformExpressions { case e =>
Option(nondeterToAttr.get(e)).map(_.toAttribute).getOrElse(e)
Option(nondeterToAttr.get(e.canonicalized)).map(_.toAttribute).getOrElse(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks risky to me. We should not deduplicate nondeterministic expressions, as they might have side effects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok we are already doing it with non-canonicalized expression. I'm fine with this change now

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick review! Yes, this rule PullOutNondeterministic is already doing that. Let me fix the tests..

@benrobby
Copy link
Author

@cloud-fan all green, the remaining test failure is unrelated

@HyukjinKwon HyukjinKwon changed the title [SPARK-53311][SQL][PYTHON][CORE] make PullOutNonDeterministic use canonicalized expressions [SPARK-53311][SQL][PYTHON][CORE] Make PullOutNonDeterministic use canonicalized expressions Aug 18, 2025
@cloud-fan
Copy link
Contributor

The docker test failure is unrelated, thanks, merging to master!

@cloud-fan cloud-fan closed this in 9a62f7d Aug 19, 2025
mzhang pushed a commit to mzhang/spark that referenced this pull request Aug 21, 2025
…onicalized expressions

### What changes were proposed in this pull request?

Make PullOutNonDeterministic use canonicalized expressions to dedup group and  aggregate expressions. This affects pyspark udfs in particular. Example:

```
from pyspark.sql.functions import col, avg, udf

pythonUDF = udf(lambda x: x).asNondeterministic()

spark.range(10)\
.selectExpr("id", "id % 3 as value")\
.groupBy(pythonUDF(col("value")))\
.agg(avg("id"), pythonUDF(col("value")))\
.explain(extended=True)
```

Currently results in a plan like this:

```
Aggregate [_nondeterministic#15](apache#15), [_nondeterministic#15 AS dummyNondeterministicUDF(value)apache#12, avg(id#0L) AS avg(id)apache#13, dummyNondeterministicUDF(value#6L)apache#8 AS dummyNondeterministicUDF(value)apache#14](apache#15%20AS%20dummyNondeterministicUDF(value)apache#12,%20avg(id#0L)%20AS%20avg(id)apache#13,%20dummyNondeterministicUDF(value#6L)apache#8%20AS%20dummyNondeterministicUDF(value)apache#14)
+- Project [id#0L, value#6L, dummyNondeterministicUDF(value#6L)apache#7 AS _nondeterministic#15](#0L,%20value#6L,%20dummyNondeterministicUDF(value#6L)apache#7%20AS%20_nondeterministic#15)
   +- Project [id#0L, (id#0L % cast(3 as bigint)) AS value#6L](#0L,%20(id#0L%20%%20cast(3%20as%20bigint))%20AS%20value#6L)
      +- Range (0, 10, step=1, splits=Some(2))
```

and then it throws:

```
[[MISSING_AGGREGATION] The non-aggregating expression "value" is based on columns which are not participating in the GROUP BY clause. Add the columns or the expression to the GROUP BY, aggregate the expression, or use "any_value(value)" if you do not care which of the values within a group is returned. SQLSTATE: 42803
```

- how canonicalized fixes this:
  -  nondeterministic PythonUDF expressions always have distinct resultIds per udf
  - The fix is to canonicalize the expressions when matching. Canonicalized means that we're setting the resultIds to -1, allowing us to dedup the PythonUDF expressions.
- for deterministic UDFs, this rule does not apply and "Post Analysis" batch extracts and deduplicates the expressions, as expected

### Why are the changes needed?

- the output of the query with the fix applied still makes sense - the nondeterministic UDF is invoked only once, in the project.

### Does this PR introduce _any_ user-facing change?

Yes, it's additive, it enables queries to run that previously threw errors.

### How was this patch tested?

- added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52061 from benrobby/adhoc-fix-pull-out-nondeterministic.

Authored-by: Ben Hurdelhey <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants