Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Oct 26, 2023

What changes were proposed in this pull request?

This patch proposes to wrap BufferReleasingInputStream.available/reset under tryOrFetchFailedException. So IOException during available/reset call will be rethrown as FetchFailedException.

Why are the changes needed?

We have encountered shuffle data corruption issue:

Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:504)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:543)
at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:450)
at org.xerial.snappy.SnappyInputStream.available(SnappyInputStream.java:497)
at org.apache.spark.storage.BufferReleasingInputStream.available(ShuffleBlockFetcherIterator.scala:1356)

Spark shuffle has capacity to detect corruption for a few stream op like read and skip, such IOException in the stack trace will be rethrown as FetchFailedException that will re-try the failed shuffle task. But in the stack trace it is available that is not covered by the mechanism. So no-retry has been happened and the Spark application just failed.

As the available/reset op will also involve data decompression and throw IOException, we should be able to check it like read and skip do.

Does this PR introduce any user-facing change?

Yes. Data corruption during available/reset op is now causing FetchFailedException like read and skip that can be retried instead of IOException.

How was this patch tested?

Added test.

Was this patch authored or co-authored using generative AI tooling?

No

@viirya viirya changed the title SPARK-45678]Cover BufferReleasingInputStream.available under tryOrFetchFailedException [SPARK-45678][CORE] Cover BufferReleasingInputStream.available under tryOrFetchFailedException Oct 26, 2023
@github-actions github-actions bot added the CORE label Oct 26, 2023
@viirya
Copy link
Member Author

viirya commented Oct 27, 2023

cc @dongjoon-hyun @sunchao

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.
For completeness sake, do you want to do it for reset as well ? We dont use it right now though.

@viirya
Copy link
Member Author

viirya commented Oct 27, 2023

For completeness sake, do you want to do it for reset as well ? We dont use it right now though.

Ah, I missed it. reset could possibly throw IOException too.

Thanks @mridulm.

@viirya viirya changed the title [SPARK-45678][CORE] Cover BufferReleasingInputStream.available under tryOrFetchFailedException [SPARK-45678][CORE] Cover BufferReleasingInputStream.available/reset under tryOrFetchFailedException Oct 27, 2023
Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this @viirya !
Given CI is still running, please feel free to merge it once green :-)

@viirya
Copy link
Member Author

viirya commented Oct 27, 2023

Thank you @mridulm :)

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sunchao sunchao closed this in 57e73da Oct 28, 2023
sunchao pushed a commit that referenced this pull request Oct 28, 2023
…under tryOrFetchFailedException

### What changes were proposed in this pull request?

This patch proposes to wrap `BufferReleasingInputStream.available/reset` under `tryOrFetchFailedException`. So `IOException` during `available`/`reset` call will be rethrown as `FetchFailedException`.

### Why are the changes needed?

We have encountered shuffle data corruption issue:

```
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:504)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:543)
at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:450)
at org.xerial.snappy.SnappyInputStream.available(SnappyInputStream.java:497)
at org.apache.spark.storage.BufferReleasingInputStream.available(ShuffleBlockFetcherIterator.scala:1356)
```

Spark shuffle has capacity to detect corruption for a few stream op like `read` and `skip`, such `IOException` in the stack trace will be rethrown as `FetchFailedException` that will re-try the failed shuffle task. But in the stack trace it is `available` that is not covered by the mechanism. So no-retry has been happened and the Spark application just failed.

As the `available`/`reset` op will also involve data decompression and throw `IOException`, we should be able to check it like `read` and `skip` do.

### Does this PR introduce _any_ user-facing change?

Yes. Data corruption during `available`/`reset` op is now causing `FetchFailedException` like `read` and `skip` that can be retried instead of `IOException`.

### How was this patch tested?

Added test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43543 from viirya/add_available.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
sunchao pushed a commit that referenced this pull request Oct 28, 2023
…under tryOrFetchFailedException

### What changes were proposed in this pull request?

This patch proposes to wrap `BufferReleasingInputStream.available/reset` under `tryOrFetchFailedException`. So `IOException` during `available`/`reset` call will be rethrown as `FetchFailedException`.

### Why are the changes needed?

We have encountered shuffle data corruption issue:

```
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:504)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:543)
at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:450)
at org.xerial.snappy.SnappyInputStream.available(SnappyInputStream.java:497)
at org.apache.spark.storage.BufferReleasingInputStream.available(ShuffleBlockFetcherIterator.scala:1356)
```

Spark shuffle has capacity to detect corruption for a few stream op like `read` and `skip`, such `IOException` in the stack trace will be rethrown as `FetchFailedException` that will re-try the failed shuffle task. But in the stack trace it is `available` that is not covered by the mechanism. So no-retry has been happened and the Spark application just failed.

As the `available`/`reset` op will also involve data decompression and throw `IOException`, we should be able to check it like `read` and `skip` do.

### Does this PR introduce _any_ user-facing change?

Yes. Data corruption during `available`/`reset` op is now causing `FetchFailedException` like `read` and `skip` that can be retried instead of `IOException`.

### How was this patch tested?

Added test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43543 from viirya/add_available.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
@sunchao
Copy link
Member

sunchao commented Oct 28, 2023

Merged to master/branch-3.4/branch-3.5. Thanks @viirya @mridulm !

@viirya
Copy link
Member Author

viirya commented Oct 28, 2023

Thank you @mridulm @sunchao !

@dongjoon-hyun
Copy link
Member

+1, late LGTM.

@viirya
Copy link
Member Author

viirya commented Oct 28, 2023

Thank you @dongjoon-hyun !

szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…under tryOrFetchFailedException

### What changes were proposed in this pull request?

This patch proposes to wrap `BufferReleasingInputStream.available/reset` under `tryOrFetchFailedException`. So `IOException` during `available`/`reset` call will be rethrown as `FetchFailedException`.

### Why are the changes needed?

We have encountered shuffle data corruption issue:

```
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:504)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:543)
at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:450)
at org.xerial.snappy.SnappyInputStream.available(SnappyInputStream.java:497)
at org.apache.spark.storage.BufferReleasingInputStream.available(ShuffleBlockFetcherIterator.scala:1356)
```

Spark shuffle has capacity to detect corruption for a few stream op like `read` and `skip`, such `IOException` in the stack trace will be rethrown as `FetchFailedException` that will re-try the failed shuffle task. But in the stack trace it is `available` that is not covered by the mechanism. So no-retry has been happened and the Spark application just failed.

As the `available`/`reset` op will also involve data decompression and throw `IOException`, we should be able to check it like `read` and `skip` do.

### Does this PR introduce _any_ user-facing change?

Yes. Data corruption during `available`/`reset` op is now causing `FetchFailedException` like `read` and `skip` that can be retried instead of `IOException`.

### How was this patch tested?

Added test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#43543 from viirya/add_available.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants