-
Notifications
You must be signed in to change notification settings - Fork 1
[WIP] Add metadata to MapStatus #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks for the change! Looks great adding a metadata field into MapStatus! Just to see whether there is better way to serialize/deserialize metadata instead of depending on ShuffleManager. Is it possible to add serialize/deserialize methods into the metadata class itself? |
We can do that but the instantiation of the right metadata type is still needed to be done depending on the Shuffle manager implementation. Let me look into that. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
### What changes were proposed in this pull request? This PR introduces sasl retry count in RetryingBlockTransferor. ### Why are the changes needed? Previously a boolean variable, saslTimeoutSeen, was used. However, the boolean variable wouldn't cover the following scenario: 1. SaslTimeoutException 2. IOException 3. SaslTimeoutException 4. IOException Even though IOException at #2 is retried (resulting in increment of retryCount), the retryCount would be cleared at step #4. Since the intention of saslTimeoutSeen is to undo the increment due to retrying SaslTimeoutException, we should keep a counter for SaslTimeoutException retries and subtract the value of this counter from retryCount. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New test is added, courtesy of Mridul. Closes apache#39611 from tedyu/sasl-cnt. Authored-by: Ted Yu <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…edExpression() ### What changes were proposed in this pull request? In `EquivalentExpressions.addExpr()`, add a guard `supportedExpression()` to make it consistent with `addExprTree()` and `getExprState()`. ### Why are the changes needed? This fixes a regression caused by apache#39010 which added the `supportedExpression()` to `addExprTree()` and `getExprState()` but not `addExpr()`. One example of a use case affected by the inconsistency is the `PhysicalAggregation` pattern in physical planning. There, it calls `addExpr()` to deduplicate the aggregate expressions, and then calls `getExprState()` to deduplicate the result expressions. Guarding inconsistently will cause the aggregate and result expressions go out of sync, eventually resulting in query execution error (or whole-stage codegen error). ### Does this PR introduce _any_ user-facing change? This fixes a regression affecting Spark 3.3.2+, where it may manifest as an error running aggregate operators with higher-order functions. Example running the SQL command: ```sql select max(transform(array(id), x -> x)), max(transform(array(id), x -> x)) from range(2) ``` example error message before the fix: ``` java.lang.IllegalStateException: Couldn't find max(transform(array(id#0L), lambdafunction(lambda x#2L, lambda x#2L, false)))#4 in [max(transform(array(id#0L), lambdafunction(lambda x#1L, lambda x#1L, false)))#3] ``` after the fix this error is gone. ### How was this patch tested? Added new test cases to `SubexpressionEliminationSuite` for the immediate issue, and to `DataFrameAggregateSuite` for an example of user-visible symptom. Closes apache#40473 from rednaxelafx/spark-42851. Authored-by: Kris Mok <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
… throw internal error
### What changes were proposed in this pull request?
This PR fixes the error messages and classes when Python UDFs are used in higher order functions.
### Why are the changes needed?
To show the proper user-facing exceptions with error classes.
### Does this PR introduce _any_ user-facing change?
Yes, previously it threw internal error such as:
```python
from pyspark.sql.functions import transform, udf, col, array
spark.range(1).select(transform(array("id"), lambda x: udf(lambda y: y)(x))).collect()
```
Before:
```
py4j.protocol.Py4JJavaError: An error occurred while calling o74.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 0.0 failed 1 times, most recent failure: Lost task 15.0 in stage 0.0 (TID 15) (ip-192-168-123-103.ap-northeast-2.compute.internal executor driver): org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: <lambda>(lambda x_0#3L)#2 SQLSTATE: XX000
at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
```
After:
```
pyspark.errors.exceptions.captured.AnalysisException: [INVALID_LAMBDA_FUNCTION_CALL.UNEVALUABLE] Invalid lambda function call. Python UDFs should be used in a lambda function at a higher order function. However, "<lambda>(lambda x_0#3L)" was a Python UDF. SQLSTATE: 42K0D;
Project [transform(array(id#0L), lambdafunction(<lambda>(lambda x_0#3L)#2, lambda x_0#3L, false)) AS transform(array(id), lambdafunction(<lambda>(lambda x_0#3L), namedlambdavariable()))#4]
+- Range (0, 1, step=1, splits=Some(16))
```
### How was this patch tested?
Unittest was added
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes apache#47079 from HyukjinKwon/SPARK-48706.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
… throw internal error
### What changes were proposed in this pull request?
This PR fixes the error messages and classes when Python UDFs are used in higher order functions.
### Why are the changes needed?
To show the proper user-facing exceptions with error classes.
### Does this PR introduce _any_ user-facing change?
Yes, previously it threw internal error such as:
```python
from pyspark.sql.functions import transform, udf, col, array
spark.range(1).select(transform(array("id"), lambda x: udf(lambda y: y)(x))).collect()
```
Before:
```
py4j.protocol.Py4JJavaError: An error occurred while calling o74.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 0.0 failed 1 times, most recent failure: Lost task 15.0 in stage 0.0 (TID 15) (ip-192-168-123-103.ap-northeast-2.compute.internal executor driver): org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: <lambda>(lambda x_0#3L)#2 SQLSTATE: XX000
at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
```
After:
```
pyspark.errors.exceptions.captured.AnalysisException: [INVALID_LAMBDA_FUNCTION_CALL.UNEVALUABLE] Invalid lambda function call. Python UDFs should be used in a lambda function at a higher order function. However, "<lambda>(lambda x_0#3L)" was a Python UDF. SQLSTATE: 42K0D;
Project [transform(array(id#0L), lambdafunction(<lambda>(lambda x_0#3L)#2, lambda x_0#3L, false)) AS transform(array(id), lambdafunction(<lambda>(lambda x_0#3L), namedlambdavariable()))#4]
+- Range (0, 1, step=1, splits=Some(16))
```
### How was this patch tested?
Unittest was added
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes apache#47079 from HyukjinKwon/SPARK-48706.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
Regarding storing and retrieving of
MapOutputMetadatamy idea was to add the metadata directly into MapStatus and delegate the serialization/deserialization of the metadata into a new class MapOutputMetadataExternalizer which is constructed by the ShuffleManager.This way Uber RSS could fill location by the executor's blockmanger ID where the map was running and store the RSS related block coordinates as a custom
MapOutputMetadata.Advantage:
With solution a single shuffle solution can handle different kind of
MapOutputMetadatas as theMapOutputMetadataExternalizer#writeExternalcould write a type indicator first (single Byte for example depending on the theMapOutputMetadatatype) and the readExternal can create the right instance depending on the indicator readDisadvantage:
At the retrieve I had to bind the
MapStatuslocation and theMapOutputMetadatatogether:spark/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Lines 1651 to 1654 in 72f3af3
This feels bad. One alternative solution is to use only the
MapOutputMetadataand forget the locations in this kind of retrieve...But if we need both the location and
MapOutputMetadatathen a much better solution would be apache#31876 which is stale PR but we can help on that.