-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-45678][CORE] Cover BufferReleasingInputStream.available/reset under tryOrFetchFailedException #43543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
For completeness sake, do you want to do it for reset as well ? We dont use it right now though.
Ah, I missed it. Thanks @mridulm. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this @viirya !
Given CI is still running, please feel free to merge it once green :-)
|
Thank you @mridulm :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…under tryOrFetchFailedException ### What changes were proposed in this pull request? This patch proposes to wrap `BufferReleasingInputStream.available/reset` under `tryOrFetchFailedException`. So `IOException` during `available`/`reset` call will be rethrown as `FetchFailedException`. ### Why are the changes needed? We have encountered shuffle data corruption issue: ``` Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:504) at org.xerial.snappy.Snappy.uncompress(Snappy.java:543) at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:450) at org.xerial.snappy.SnappyInputStream.available(SnappyInputStream.java:497) at org.apache.spark.storage.BufferReleasingInputStream.available(ShuffleBlockFetcherIterator.scala:1356) ``` Spark shuffle has capacity to detect corruption for a few stream op like `read` and `skip`, such `IOException` in the stack trace will be rethrown as `FetchFailedException` that will re-try the failed shuffle task. But in the stack trace it is `available` that is not covered by the mechanism. So no-retry has been happened and the Spark application just failed. As the `available`/`reset` op will also involve data decompression and throw `IOException`, we should be able to check it like `read` and `skip` do. ### Does this PR introduce _any_ user-facing change? Yes. Data corruption during `available`/`reset` op is now causing `FetchFailedException` like `read` and `skip` that can be retried instead of `IOException`. ### How was this patch tested? Added test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43543 from viirya/add_available. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Chao Sun <[email protected]>
…under tryOrFetchFailedException ### What changes were proposed in this pull request? This patch proposes to wrap `BufferReleasingInputStream.available/reset` under `tryOrFetchFailedException`. So `IOException` during `available`/`reset` call will be rethrown as `FetchFailedException`. ### Why are the changes needed? We have encountered shuffle data corruption issue: ``` Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:504) at org.xerial.snappy.Snappy.uncompress(Snappy.java:543) at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:450) at org.xerial.snappy.SnappyInputStream.available(SnappyInputStream.java:497) at org.apache.spark.storage.BufferReleasingInputStream.available(ShuffleBlockFetcherIterator.scala:1356) ``` Spark shuffle has capacity to detect corruption for a few stream op like `read` and `skip`, such `IOException` in the stack trace will be rethrown as `FetchFailedException` that will re-try the failed shuffle task. But in the stack trace it is `available` that is not covered by the mechanism. So no-retry has been happened and the Spark application just failed. As the `available`/`reset` op will also involve data decompression and throw `IOException`, we should be able to check it like `read` and `skip` do. ### Does this PR introduce _any_ user-facing change? Yes. Data corruption during `available`/`reset` op is now causing `FetchFailedException` like `read` and `skip` that can be retried instead of `IOException`. ### How was this patch tested? Added test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43543 from viirya/add_available. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Chao Sun <[email protected]>
|
+1, late LGTM. |
|
Thank you @dongjoon-hyun ! |
…under tryOrFetchFailedException ### What changes were proposed in this pull request? This patch proposes to wrap `BufferReleasingInputStream.available/reset` under `tryOrFetchFailedException`. So `IOException` during `available`/`reset` call will be rethrown as `FetchFailedException`. ### Why are the changes needed? We have encountered shuffle data corruption issue: ``` Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:504) at org.xerial.snappy.Snappy.uncompress(Snappy.java:543) at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:450) at org.xerial.snappy.SnappyInputStream.available(SnappyInputStream.java:497) at org.apache.spark.storage.BufferReleasingInputStream.available(ShuffleBlockFetcherIterator.scala:1356) ``` Spark shuffle has capacity to detect corruption for a few stream op like `read` and `skip`, such `IOException` in the stack trace will be rethrown as `FetchFailedException` that will re-try the failed shuffle task. But in the stack trace it is `available` that is not covered by the mechanism. So no-retry has been happened and the Spark application just failed. As the `available`/`reset` op will also involve data decompression and throw `IOException`, we should be able to check it like `read` and `skip` do. ### Does this PR introduce _any_ user-facing change? Yes. Data corruption during `available`/`reset` op is now causing `FetchFailedException` like `read` and `skip` that can be retried instead of `IOException`. ### How was this patch tested? Added test. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#43543 from viirya/add_available. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Chao Sun <[email protected]>
What changes were proposed in this pull request?
This patch proposes to wrap
BufferReleasingInputStream.available/resetundertryOrFetchFailedException. SoIOExceptionduringavailable/resetcall will be rethrown asFetchFailedException.Why are the changes needed?
We have encountered shuffle data corruption issue:
Spark shuffle has capacity to detect corruption for a few stream op like
readandskip, suchIOExceptionin the stack trace will be rethrown asFetchFailedExceptionthat will re-try the failed shuffle task. But in the stack trace it isavailablethat is not covered by the mechanism. So no-retry has been happened and the Spark application just failed.As the
available/resetop will also involve data decompression and throwIOException, we should be able to check it likereadandskipdo.Does this PR introduce any user-facing change?
Yes. Data corruption during
available/resetop is now causingFetchFailedExceptionlikereadandskipthat can be retried instead ofIOException.How was this patch tested?
Added test.
Was this patch authored or co-authored using generative AI tooling?
No