-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-39200][CORE] Make Fallback Storage readFully on content #37960
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice bugfix !
+CC @dongjoon-hyun who wrote this initially iirc.
| import org.mockito.Mockito.{mock, never, verify, when} | ||
| import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} | ||
|
|
||
| import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert these whitespace changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for @mridulm 's comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID | ||
| import org.apache.spark.util.Utils.tryWithResource | ||
|
|
||
| import scala.util.Random |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to scala block above
| assert(fallbackStorage.exists(1, ShuffleDataBlockId(1, 2L, NOOP_REDUCE_ID).name)) | ||
|
|
||
| val readResult = FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0)) | ||
| assert(readResult.nioByteBuffer().array().sameElements(content)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is not checking for readFully and would work even for read, depending on whether the read ends up satisfying the request or not (We are relying on what the buffer size might be internally, which is subject to change).
As in, the test could work even without the fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a mock filesystem to read partially on read calls so we can test this behavior.
|
|
||
| val dataFile = resolver.getDataFile(1, 2L) | ||
| tryWithResource(new FileOutputStream(dataFile)) { fos => | ||
| tryWithResource(new DataOutputStream(fos)) { dos => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We dont need the DataOutputStream here
|
|
||
| val indexFile = resolver.getIndexFile(1, 2L) | ||
| tryWithResource(new FileOutputStream(indexFile)) { fos => | ||
| tryWithResource(new DataOutputStream(fos)) { dos => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: tryWithResource is not required for the DataOutputStream - though that seems to be a pattern in rest of this Suite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted tryWithResource
| FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0)) | ||
| } | ||
|
|
||
| test("fallback storage APIs - readFully") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SPARK-39200: fallback storage APIs - readFully
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for @yaooqinn 's comment.
|
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, pending tests.
+CC @dongjoon-hyun
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
|
|
||
| public class ReadPartialFileSystem extends LocalFileSystem { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for asking this, but shall we make this as Scala class like DebugFilesystem, FakeFileSystemSetPermission and FakeFileSystemRequiringDSOption? Also, please put this inside FallbackStorageSuite.scala instead of having an independent file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Let me put that in FallbackStorageSuite.scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
hmm looks like the github status gets stuck at pending but the underlying tests are done and passed. Not sure what's going on. |
### What changes were proposed in this pull request? Looks like from bug description, fallback storage doesn't readFully and then cause `org.apache.spark.shuffle.FetchFailedException: Decompression error: Corrupted block detected`. This is an attempt to fix this by read the underlying stream fully. ### Why are the changes needed? Fix a bug documented in SPARK-39200 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Wrote a unit test Closes #37960 from ukby1234/SPARK-39200. Authored-by: Frank Yin <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 07061f1) Signed-off-by: Dongjoon Hyun <[email protected]>
Looks like from bug description, fallback storage doesn't readFully and then cause `org.apache.spark.shuffle.FetchFailedException: Decompression error: Corrupted block detected`. This is an attempt to fix this by read the underlying stream fully. Fix a bug documented in SPARK-39200 No Wrote a unit test Closes #37960 from ukby1234/SPARK-39200. Authored-by: Frank Yin <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 07061f1) Signed-off-by: Dongjoon Hyun <[email protected]>
|
Merged to master/3.3/3.2. |
|
@dongjoon-hyun @mridulm Do you know when we will release the next version of Spark 3.2 and Spark 3.3? |
|
3.3.1 RC2 will start this weekend. |
|
I take that there isn't a timeline yet for Spark 3.2 release yet? |
|
3.2.2 was released just two month ago on July 17, 2022. According to our regular release cycle, we can have 3.2.3 after November. However, I'd like to recommend 3.3.1. :)
|
Looks like from bug description, fallback storage doesn't readFully and then cause `org.apache.spark.shuffle.FetchFailedException: Decompression error: Corrupted block detected`. This is an attempt to fix this by read the underlying stream fully. Fix a bug documented in SPARK-39200 No Wrote a unit test Closes apache#37960 from ukby1234/SPARK-39200. Authored-by: Frank Yin <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 07061f1) Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
Looks like from bug description, fallback storage doesn't readFully and then cause
org.apache.spark.shuffle.FetchFailedException: Decompression error: Corrupted block detected. This is an attempt to fix this by read the underlying stream fully.Why are the changes needed?
Fix a bug documented in SPARK-39200
Does this PR introduce any user-facing change?
No
How was this patch tested?
Wrote a unit test