Skip to content

Conversation

@ukby1234
Copy link
Contributor

@ukby1234 ukby1234 commented Sep 22, 2022

What changes were proposed in this pull request?

Looks like from bug description, fallback storage doesn't readFully and then cause org.apache.spark.shuffle.FetchFailedException: Decompression error: Corrupted block detected. This is an attempt to fix this by read the underlying stream fully.

Why are the changes needed?

Fix a bug documented in SPARK-39200

Does this PR introduce any user-facing change?

No

How was this patch tested?

Wrote a unit test

@github-actions github-actions bot added the CORE label Sep 22, 2022
Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice bugfix !

+CC @dongjoon-hyun who wrote this initially iirc.

import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert these whitespace changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for @mridulm 's comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.util.Utils.tryWithResource

import scala.util.Random
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to scala block above

assert(fallbackStorage.exists(1, ShuffleDataBlockId(1, 2L, NOOP_REDUCE_ID).name))

val readResult = FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
assert(readResult.nioByteBuffer().array().sameElements(content))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is not checking for readFully and would work even for read, depending on whether the read ends up satisfying the request or not (We are relying on what the buffer size might be internally, which is subject to change).
As in, the test could work even without the fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a mock filesystem to read partially on read calls so we can test this behavior.


val dataFile = resolver.getDataFile(1, 2L)
tryWithResource(new FileOutputStream(dataFile)) { fos =>
tryWithResource(new DataOutputStream(fos)) { dos =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont need the DataOutputStream here


val indexFile = resolver.getIndexFile(1, 2L)
tryWithResource(new FileOutputStream(indexFile)) { fos =>
tryWithResource(new DataOutputStream(fos)) { dos =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: tryWithResource is not required for the DataOutputStream - though that seems to be a pattern in rest of this Suite

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted tryWithResource

FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
}

test("fallback storage APIs - readFully") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-39200: fallback storage APIs - readFully

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for @yaooqinn 's comment.

@dongjoon-hyun
Copy link
Member

Thank you, @ukby1234 and @mridulm .

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, pending tests.
+CC @dongjoon-hyun

import java.io.IOException;
import java.io.InputStream;

public class ReadPartialFileSystem extends LocalFileSystem {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Sep 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for asking this, but shall we make this as Scala class like DebugFilesystem, FakeFileSystemSetPermission and FakeFileSystemRequiringDSOption? Also, please put this inside FallbackStorageSuite.scala instead of having an independent file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Let me put that in FallbackStorageSuite.scala.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Pending CIs.
Thank you, @ukby1234 and @mridulm .

@ukby1234
Copy link
Contributor Author

hmm looks like the github status gets stuck at pending but the underlying tests are done and passed. Not sure what's going on.

dongjoon-hyun pushed a commit that referenced this pull request Sep 23, 2022
### What changes were proposed in this pull request?

Looks like from bug description, fallback storage doesn't readFully and then cause `org.apache.spark.shuffle.FetchFailedException: Decompression error: Corrupted block detected`. This is an attempt to fix this by read the underlying stream fully.

### Why are the changes needed?

Fix a bug documented in SPARK-39200

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Wrote a unit test

Closes #37960 from ukby1234/SPARK-39200.

Authored-by: Frank Yin <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 07061f1)
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Sep 23, 2022
Looks like from bug description, fallback storage doesn't readFully and then cause `org.apache.spark.shuffle.FetchFailedException: Decompression error: Corrupted block detected`. This is an attempt to fix this by read the underlying stream fully.

Fix a bug documented in SPARK-39200

No

Wrote a unit test

Closes #37960 from ukby1234/SPARK-39200.

Authored-by: Frank Yin <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 07061f1)
Signed-off-by: Dongjoon Hyun <[email protected]>
@dongjoon-hyun
Copy link
Member

Merged to master/3.3/3.2.

@ukby1234
Copy link
Contributor Author

@dongjoon-hyun @mridulm Do you know when we will release the next version of Spark 3.2 and Spark 3.3?

@dongjoon-hyun
Copy link
Member

3.3.1 RC2 will start this weekend.

@ukby1234
Copy link
Contributor Author

I take that there isn't a timeline yet for Spark 3.2 release yet?

@dongjoon-hyun
Copy link
Member

3.2.2 was released just two month ago on July 17, 2022. According to our regular release cycle, we can have 3.2.3 after November. However, I'd like to recommend 3.3.1. :)

Feature release branches will, generally, be maintained with bug fix releases for a period of 18 months.

sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
Looks like from bug description, fallback storage doesn't readFully and then cause `org.apache.spark.shuffle.FetchFailedException: Decompression error: Corrupted block detected`. This is an attempt to fix this by read the underlying stream fully.

Fix a bug documented in SPARK-39200

No

Wrote a unit test

Closes apache#37960 from ukby1234/SPARK-39200.

Authored-by: Frank Yin <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 07061f1)
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants