-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-4105] retry the fetch or stage if shuffle block is corrupt #15923
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
logDebug("Number of requests in flight " + reqsInFlight) | ||
} | ||
case _ => | ||
var result: FetchResult = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add documentation explaining what's going on here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw is there a way to refactor this function so it is testable? i do worry some of the logic here won't be tested at all.
Test build #68813 has finished for PR 15923 at commit
|
Test build #68870 has finished for PR 15923 at commit
|
@JoshRosen @zsxwing Could you help to review this one ? |
|
||
input = streamWrapper(blockId, in) | ||
// Only copy the stream if it's wrapped by compression or encryption, also the size of | ||
// block is small (the decompressed block is smaller than maxBytesInFlight) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this issue only happen for small blocks? Otherwise, only check small blocks seems not very helpful. Why not add shuffle block checksum instead? Then we can just check the compressed block and retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of this PR is to reduce the possibility that failed job caused by network/disk corruption, without introduce other regression (OOM). Typically, the shuffle blocks are small, so we can have parallel fetching even with this maxBytesInFlight limit. For those few blocks (for example, data skew), we does not check that for now (at least, it's not worse than before).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to add checksum for shuffle blocks in #15894, that will have much more complexity and overhead, so in favor of this lighter one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we start explicitly managing the memory and support spilling, will it be safe to do this for large blocks, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JoshRosen I think so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a couple of comments regarding cleanup of decompression buffers and logging of exceptions.
private[spark] | ||
final class ShuffleBlockFetcherIterator( | ||
context: TaskContext, | ||
shuffleClient: ShuffleClient, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update the Scaladoc to document the two new parameters here? I understand what streamWrapper
means from context but it might be useful for new readers of this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// TODO: manage the memory used here, and spill it into disk in case of OOM. | ||
Utils.copyStream(input, out) | ||
out.close() | ||
input = out.toChunkedByteBuffer.toInputStream(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you put dispose = true
here to make the boolean parameter clearer?
|| corruptedBlocks.contains(blockId)) { | ||
throwFetchFailedException(blockId, address, e) | ||
} else { | ||
logWarning(s"got an corrupted block $blockId from $address, fetch again") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we log the IOException
here? It looks like the exception isn't logged or rethrown from this branch and I think we'll need that information to help debug problems here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the IOException would be set as the cause of the FetchFailedException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lins05 It's already set for FetchFailedException
// Decompress the whole block at once to detect any corruption, which could increase | ||
// the memory usage tne potential increase the chance of OOM. | ||
// TODO: manage the memory used here, and spill it into disk in case of OOM. | ||
Utils.copyStream(input, out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to close the input stream here? There might be resources in the decompressor which need to be freed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
input = streamWrapper(blockId, in) | ||
// Only copy the stream if it's wrapped by compression or encryption, also the size of | ||
// block is small (the decompressed block is smaller than maxBytesInFlight) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we start explicitly managing the memory and support spilling, will it be safe to do this for large blocks, too?
serializerManager.wrapStream(blockId, inputStream) | ||
} | ||
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue), | ||
SparkEnv.get.conf.getBoolean("spark.shuffle.tryDecompress", true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe detectCorrupt
is slightly better than tryDecompress
?
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient} | ||
import org.apache.spark.shuffle.FetchFailedException | ||
import org.apache.spark.util.Utils | ||
import org.apache.spark.util.io.{ChunkedByteBufferInputStream, ChunkedByteBufferOutputStream} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems ChunkedByteBufferInputStream
is not used here.
/** Current number of requests in flight */ | ||
private[this] var reqsInFlight = 0 | ||
|
||
/** The blocks that can't be decompressed successfully */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about add more explanation, for example:
/** The blocks that can't be decompressed successfully.
** It is used to guarantee that we retry at most once for those corrupted blocks.
**/
|| corruptedBlocks.contains(blockId)) { | ||
throwFetchFailedException(blockId, address, e) | ||
} else { | ||
logWarning(s"got an corrupted block $blockId from $address, fetch again") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the IOException would be set as the cause of the FetchFailedException
.
Manually test this patch with a job that usually failed because of corrupt stream, as the logging said:
The shuffle fetcher got some corrupt blocks for partition 275, it retried once, then the task finally succeeded. But the retry can not protect all the tasks, some failed as FetchFailed, then the stage is retried:
The stage 26 succeeded after retried twice. Another thing is that all the corruption happened only in 2 nodes out of 26. Also a few broadcast block is corrupt on them. They seems that the corruption happens on the receive (fetcher) side of network. I will update the patch to address comments. |
Test build #69261 has finished for PR 15923 at commit
|
Test build #3443 has started for PR 15923 at commit |
Test build #3444 has started for PR 15923 at commit |
Test build #69422 has finished for PR 15923 at commit
|
Test build #3448 has finished for PR 15923 at commit
|
cc @zsxwing @JoshRosen does this look good? |
ping @JoshRosen |
This looks good overall, but one nit: it looks like we don't have any test coverage for the case where |
@JoshRosen Added a test for |
Test build #69825 has finished for PR 15923 at commit
|
Test build #3475 has finished for PR 15923 at commit
|
Test build #3480 has started for PR 15923 at commit |
Test build #3481 has finished for PR 15923 at commit
|
LGTM |
Merging to master. |
## What changes were proposed in this pull request? There is an outstanding issue that existed for a long time: Sometimes the shuffle blocks are corrupt and can't be decompressed. We recently hit this in three different workloads, sometimes we can reproduce it by every try, sometimes can't. I also found that when the corruption happened, the beginning and end of the blocks are correct, the corruption happen in the middle. There was one case that the string of block id is corrupt by one character. It seems that it's very likely the corruption is introduced by some weird machine/hardware, also the checksum (16 bits) in TCP is not strong enough to identify all the corruption. Unfortunately, Spark does not have checksum for shuffle blocks or broadcast, the job will fail if any corruption happen in the shuffle block from disk, or broadcast blocks during network. This PR try to detect the corruption after fetching shuffle blocks by decompressing them, because most of the compression already have checksum in them. It will retry the block, or failed with FetchFailure, so the previous stage could be retried on different (still random) machines. Checksum for broadcast will be added by another PR. ## How was this patch tested? Added unit tests Author: Davies Liu <[email protected]> Closes apache#15923 from davies/detect_corrupt.
var result: FetchResult = null | ||
var input: InputStream = null | ||
// Take the next fetched result and try to decompress it to detect data corruption, | ||
// then fetch it one more time if it's corrupt, throw FailureFetchResult if the second fetch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies Could you elaborate a bit here? In my mind TCP provides pretty robust data transfer, which means that if there is an error, then it's been written to disk corrupted and fetch it one more time won't help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have observed in production a few failures related to this on virtualized environments. It is entirely possible there is a bug in the underlying networking stack, or a bug in Spark's networking stack. But either this way eliminates those issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fathersson The checksum in TCP is only 16 bits, it's not strong enough for large traffic, usually DFS or other system with heavy TCP traffic will have another application level checksum. Adding to @rxin 's point, we did see this retry helped in production to work around temporary corrupt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is netty/shuffle data being compressed using Snappy algorithm by default? If so, might be good to idea to enable checksum checking at Netty level too?
https://netty.io/4.0/api/io/netty/handler/codec/compression/SnappyFramedDecoder.html
Note that by default, validation of the checksum header in each chunk is DISABLED for performance improvements. If performance is less of an issue, or if you would prefer the safety that checksum validation brings, please use the SnappyFramedDecoder(boolean) constructor with the argument set to true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Tagar Spark doesn't use Netty's Snappy compression.
## What changes were proposed in this pull request? There is an outstanding issue that existed for a long time: Sometimes the shuffle blocks are corrupt and can't be decompressed. We recently hit this in three different workloads, sometimes we can reproduce it by every try, sometimes can't. I also found that when the corruption happened, the beginning and end of the blocks are correct, the corruption happen in the middle. There was one case that the string of block id is corrupt by one character. It seems that it's very likely the corruption is introduced by some weird machine/hardware, also the checksum (16 bits) in TCP is not strong enough to identify all the corruption. Unfortunately, Spark does not have checksum for shuffle blocks or broadcast, the job will fail if any corruption happen in the shuffle block from disk, or broadcast blocks during network. This PR try to detect the corruption after fetching shuffle blocks by decompressing them, because most of the compression already have checksum in them. It will retry the block, or failed with FetchFailure, so the previous stage could be retried on different (still random) machines. Checksum for broadcast will be added by another PR. ## How was this patch tested? Added unit tests Author: Davies Liu <[email protected]> Closes apache#15923 from davies/detect_corrupt.
…lock is corrupt (backport from upstream master to databricks branch 2.1) ## What changes were proposed in this pull request? There is an outstanding issue that existed for a long time: Sometimes the shuffle blocks are corrupt and can't be decompressed. We recently hit this in three different workloads, sometimes we can reproduce it by every try, sometimes can't. I also found that when the corruption happened, the beginning and end of the blocks are correct, the corruption happen in the middle. There was one case that the string of block id is corrupt by one character. It seems that it's very likely the corruption is introduced by some weird machine/hardware, also the checksum (16 bits) in TCP is not strong enough to identify all the corruption. Unfortunately, Spark does not have checksum for shuffle blocks or broadcast, the job will fail if any corruption happen in the shuffle block from disk, or broadcast blocks during network. This PR try to detect the corruption after fetching shuffle blocks by decompressing them, because most of the compression already have checksum in them. It will retry the block, or failed with FetchFailure, so the previous stage could be retried on different (still random) machines. Checksum for broadcast will be added by another PR. ## How was this patch tested? Added unit tests Author: Davies Liu <daviesdatabricks.com> Closes apache#15923 from davies/detect_corrupt. Author: Davies Liu <[email protected]> Closes apache#159 from ericl/sc-5362.
## What changes were proposed in this pull request? There is an outstanding issue that existed for a long time: Sometimes the shuffle blocks are corrupt and can't be decompressed. We recently hit this in three different workloads, sometimes we can reproduce it by every try, sometimes can't. I also found that when the corruption happened, the beginning and end of the blocks are correct, the corruption happen in the middle. There was one case that the string of block id is corrupt by one character. It seems that it's very likely the corruption is introduced by some weird machine/hardware, also the checksum (16 bits) in TCP is not strong enough to identify all the corruption. Unfortunately, Spark does not have checksum for shuffle blocks or broadcast, the job will fail if any corruption happen in the shuffle block from disk, or broadcast blocks during network. This PR try to detect the corruption after fetching shuffle blocks by decompressing them, because most of the compression already have checksum in them. It will retry the block, or failed with FetchFailure, so the previous stage could be retried on different (still random) machines. Checksum for broadcast will be added by another PR. ## How was this patch tested? Added unit tests Author: Davies Liu <[email protected]> Closes apache#15923 from davies/detect_corrupt.
What changes were proposed in this pull request?
There is an outstanding issue that existed for a long time: Sometimes the shuffle blocks are corrupt and can't be decompressed. We recently hit this in three different workloads, sometimes we can reproduce it by every try, sometimes can't. I also found that when the corruption happened, the beginning and end of the blocks are correct, the corruption happen in the middle. There was one case that the string of block id is corrupt by one character. It seems that it's very likely the corruption is introduced by some weird machine/hardware, also the checksum (16 bits) in TCP is not strong enough to identify all the corruption.
Unfortunately, Spark does not have checksum for shuffle blocks or broadcast, the job will fail if any corruption happen in the shuffle block from disk, or broadcast blocks during network. This PR try to detect the corruption after fetching shuffle blocks by decompressing them, because most of the compression already have checksum in them. It will retry the block, or failed with FetchFailure, so the previous stage could be retried on different (still random) machines.
Checksum for broadcast will be added by another PR.
How was this patch tested?
Added unit tests