Skip to content

Conversation

davies
Copy link
Contributor

@davies davies commented Nov 18, 2016

What changes were proposed in this pull request?

There is an outstanding issue that existed for a long time: Sometimes the shuffle blocks are corrupt and can't be decompressed. We recently hit this in three different workloads, sometimes we can reproduce it by every try, sometimes can't. I also found that when the corruption happened, the beginning and end of the blocks are correct, the corruption happen in the middle. There was one case that the string of block id is corrupt by one character. It seems that it's very likely the corruption is introduced by some weird machine/hardware, also the checksum (16 bits) in TCP is not strong enough to identify all the corruption.

Unfortunately, Spark does not have checksum for shuffle blocks or broadcast, the job will fail if any corruption happen in the shuffle block from disk, or broadcast blocks during network. This PR try to detect the corruption after fetching shuffle blocks by decompressing them, because most of the compression already have checksum in them. It will retry the block, or failed with FetchFailure, so the previous stage could be retried on different (still random) machines.

Checksum for broadcast will be added by another PR.

How was this patch tested?

Added unit tests

logDebug("Number of requests in flight " + reqsInFlight)
}
case _ =>
var result: FetchResult = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add documentation explaining what's going on here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw is there a way to refactor this function so it is testable? i do worry some of the logic here won't be tested at all.

@SparkQA
Copy link

SparkQA commented Nov 18, 2016

Test build #68813 has finished for PR 15923 at commit 5c93aaf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 18, 2016

Test build #68870 has finished for PR 15923 at commit c85a216.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Nov 18, 2016

@JoshRosen @zsxwing Could you help to review this one ?


input = streamWrapper(blockId, in)
// Only copy the stream if it's wrapped by compression or encryption, also the size of
// block is small (the decompressed block is smaller than maxBytesInFlight)
Copy link
Member

@zsxwing zsxwing Nov 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this issue only happen for small blocks? Otherwise, only check small blocks seems not very helpful. Why not add shuffle block checksum instead? Then we can just check the compressed block and retry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this PR is to reduce the possibility that failed job caused by network/disk corruption, without introduce other regression (OOM). Typically, the shuffle blocks are small, so we can have parallel fetching even with this maxBytesInFlight limit. For those few blocks (for example, data skew), we does not check that for now (at least, it's not worse than before).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to add checksum for shuffle blocks in #15894, that will have much more complexity and overhead, so in favor of this lighter one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we start explicitly managing the memory and support spilling, will it be safe to do this for large blocks, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen I think so.

Copy link
Contributor

@JoshRosen JoshRosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a couple of comments regarding cleanup of decompression buffers and logging of exceptions.

private[spark]
final class ShuffleBlockFetcherIterator(
context: TaskContext,
shuffleClient: ShuffleClient,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update the Scaladoc to document the two new parameters here? I understand what streamWrapper means from context but it might be useful for new readers of this code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// TODO: manage the memory used here, and spill it into disk in case of OOM.
Utils.copyStream(input, out)
out.close()
input = out.toChunkedByteBuffer.toInputStream(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you put dispose = true here to make the boolean parameter clearer?

|| corruptedBlocks.contains(blockId)) {
throwFetchFailedException(blockId, address, e)
} else {
logWarning(s"got an corrupted block $blockId from $address, fetch again")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log the IOException here? It looks like the exception isn't logged or rethrown from this branch and I think we'll need that information to help debug problems here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the IOException would be set as the cause of the FetchFailedException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lins05 It's already set for FetchFailedException

// Decompress the whole block at once to detect any corruption, which could increase
// the memory usage tne potential increase the chance of OOM.
// TODO: manage the memory used here, and spill it into disk in case of OOM.
Utils.copyStream(input, out)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to close the input stream here? There might be resources in the decompressor which need to be freed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


input = streamWrapper(blockId, in)
// Only copy the stream if it's wrapped by compression or encryption, also the size of
// block is small (the decompressed block is smaller than maxBytesInFlight)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we start explicitly managing the memory and support spilling, will it be safe to do this for large blocks, too?

serializerManager.wrapStream(blockId, inputStream)
}
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.getBoolean("spark.shuffle.tryDecompress", true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe detectCorrupt is slightly better than tryDecompress ?

import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.util.Utils
import org.apache.spark.util.io.{ChunkedByteBufferInputStream, ChunkedByteBufferOutputStream}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems ChunkedByteBufferInputStream is not used here.

/** Current number of requests in flight */
private[this] var reqsInFlight = 0

/** The blocks that can't be decompressed successfully */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about add more explanation, for example:

 /** The blocks that can't be decompressed successfully. 
  ** It is used to guarantee that we retry at most once for those corrupted blocks. 
  **/

|| corruptedBlocks.contains(blockId)) {
throwFetchFailedException(blockId, address, e)
} else {
logWarning(s"got an corrupted block $blockId from $address, fetch again")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the IOException would be set as the cause of the FetchFailedException.

@davies
Copy link
Contributor Author

davies commented Nov 28, 2016

Manually test this patch with a job that usually failed because of corrupt stream, as the logging said:

16/11/20 08:32:07 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_613_275 from BlockManagerId(6, 10.1.109.163, 34744), fetch again
16/11/20 08:32:07 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_688_275 from BlockManagerId(6, 10.1.109.163, 34744), fetch again
16/11/20 08:32:07 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_2434_275 from BlockManagerId(6, 10.1.109.163, 34744), fetch again
16/11/20 08:32:26 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_878_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again
16/11/20 08:32:26 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_1042_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again
16/11/20 08:32:26 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_2301_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again
16/11/20 08:32:26 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_2546_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again
16/11/20 08:32:27 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_3160_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again
16/11/20 08:32:27 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_3601_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again
...
16/11/20 08:32:41 INFO Executor: Finished task 275.0 in stage 26.0 (TID 22187). 5219 bytes result sent to driver

The shuffle fetcher got some corrupt blocks for partition 275, it retried once, then the task finally succeeded.

But the retry can not protect all the tasks, some failed as FetchFailed, then the stage is retried:

26	   2016/11/20 08:31:24	1.0min	403/1000 (2 failed)			205.6 GB	29.5 GB org.apache.spark.shuffle.FetchFailedException: Stream is corrupted

26 (retry 1)  2016/11/20 08:34:00	34s	200/629 (2 failed)			102.0 GB	14.6 GB	org.apache.spark.shuffle.FetchFailedException: Stream is corrupted

26 (retry 2)  2016/11/20 08:35:25	1.8min	461/461			235.1 GB	33.7 GB

The stage 26 succeeded after retried twice.

Another thing is that all the corruption happened only in 2 nodes out of 26. Also a few broadcast block is corrupt on them. They seems that the corruption happens on the receive (fetcher) side of network.

I will update the patch to address comments.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69261 has finished for PR 15923 at commit b3e1786.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #3443 has started for PR 15923 at commit b3e1786.

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #3444 has started for PR 15923 at commit b3e1786.

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69422 has finished for PR 15923 at commit 28340ef.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 1, 2016

Test build #3448 has finished for PR 15923 at commit 28340ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Dec 3, 2016

cc @zsxwing @JoshRosen does this look good?

@davies
Copy link
Contributor Author

davies commented Dec 6, 2016

ping @JoshRosen

@JoshRosen
Copy link
Contributor

This looks good overall, but one nit: it looks like we don't have any test coverage for the case where detectCorrupt is false. We should probably add a test to make sure that the feature flag works properly.

@davies
Copy link
Contributor Author

davies commented Dec 7, 2016

@JoshRosen Added a test for detectCorrupt is false.

@SparkQA
Copy link

SparkQA commented Dec 7, 2016

Test build #69825 has finished for PR 15923 at commit b43d384.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 8, 2016

Test build #3475 has finished for PR 15923 at commit b43d384.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 8, 2016

Test build #3480 has started for PR 15923 at commit b43d384.

@SparkQA
Copy link

SparkQA commented Dec 9, 2016

Test build #3481 has finished for PR 15923 at commit b43d384.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Dec 9, 2016

LGTM

@zsxwing
Copy link
Member

zsxwing commented Dec 9, 2016

Merging to master.

@asfgit asfgit closed this in cf33a86 Dec 9, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
## What changes were proposed in this pull request?

There is an outstanding issue that existed for a long time: Sometimes the shuffle blocks are corrupt and can't be decompressed. We recently hit this in three different workloads, sometimes we can reproduce it by every try, sometimes can't. I also found that when the corruption happened, the beginning and end of the blocks are correct, the corruption happen in the middle. There was one case that the string of block id is corrupt by one character. It seems that it's very likely the corruption is introduced by some weird machine/hardware, also the checksum (16 bits) in TCP is not strong enough to identify all the corruption.

Unfortunately, Spark does not have checksum for shuffle blocks or broadcast, the job will fail if any corruption happen in the shuffle block from disk, or broadcast blocks during network. This PR try to detect the corruption after fetching shuffle blocks by decompressing them, because most of the compression already have checksum in them. It will retry the block, or failed with FetchFailure, so the previous stage could be retried on different (still random) machines.

Checksum for broadcast will be added by another PR.

## How was this patch tested?

Added unit tests

Author: Davies Liu <[email protected]>

Closes apache#15923 from davies/detect_corrupt.
var result: FetchResult = null
var input: InputStream = null
// Take the next fetched result and try to decompress it to detect data corruption,
// then fetch it one more time if it's corrupt, throw FailureFetchResult if the second fetch
Copy link

@iinegve iinegve Dec 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davies Could you elaborate a bit here? In my mind TCP provides pretty robust data transfer, which means that if there is an error, then it's been written to disk corrupted and fetch it one more time won't help.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have observed in production a few failures related to this on virtualized environments. It is entirely possible there is a bug in the underlying networking stack, or a bug in Spark's networking stack. But either this way eliminates those issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fathersson The checksum in TCP is only 16 bits, it's not strong enough for large traffic, usually DFS or other system with heavy TCP traffic will have another application level checksum. Adding to @rxin 's point, we did see this retry helped in production to work around temporary corrupt.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is netty/shuffle data being compressed using Snappy algorithm by default? If so, might be good to idea to enable checksum checking at Netty level too?

https://netty.io/4.0/api/io/netty/handler/codec/compression/SnappyFramedDecoder.html

Note that by default, validation of the checksum header in each chunk is DISABLED for performance improvements. If performance is less of an issue, or if you would prefer the safety that checksum validation brings, please use the SnappyFramedDecoder(boolean) constructor with the argument set to true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Tagar Spark doesn't use Netty's Snappy compression.

ericl pushed a commit to ericl/spark that referenced this pull request Dec 30, 2016
## What changes were proposed in this pull request?

There is an outstanding issue that existed for a long time: Sometimes the shuffle blocks are corrupt and can't be decompressed. We recently hit this in three different workloads, sometimes we can reproduce it by every try, sometimes can't. I also found that when the corruption happened, the beginning and end of the blocks are correct, the corruption happen in the middle. There was one case that the string of block id is corrupt by one character. It seems that it's very likely the corruption is introduced by some weird machine/hardware, also the checksum (16 bits) in TCP is not strong enough to identify all the corruption.

Unfortunately, Spark does not have checksum for shuffle blocks or broadcast, the job will fail if any corruption happen in the shuffle block from disk, or broadcast blocks during network. This PR try to detect the corruption after fetching shuffle blocks by decompressing them, because most of the compression already have checksum in them. It will retry the block, or failed with FetchFailure, so the previous stage could be retried on different (still random) machines.

Checksum for broadcast will be added by another PR.

## How was this patch tested?

Added unit tests

Author: Davies Liu <[email protected]>

Closes apache#15923 from davies/detect_corrupt.
liancheng pushed a commit to liancheng/spark that referenced this pull request Jan 25, 2017
…lock is corrupt

(backport from upstream master to databricks branch 2.1)

## What changes were proposed in this pull request?

There is an outstanding issue that existed for a long time: Sometimes the shuffle blocks are corrupt and can't be decompressed. We recently hit this in three different workloads, sometimes we can reproduce it by every try, sometimes can't. I also found that when the corruption happened, the beginning and end of the blocks are correct, the corruption happen in the middle. There was one case that the string of block id is corrupt by one character. It seems that it's very likely the corruption is introduced by some weird machine/hardware, also the checksum (16 bits) in TCP is not strong enough to identify all the corruption.

Unfortunately, Spark does not have checksum for shuffle blocks or broadcast, the job will fail if any corruption happen in the shuffle block from disk, or broadcast blocks during network. This PR try to detect the corruption after fetching shuffle blocks by decompressing them, because most of the compression already have checksum in them. It will retry the block, or failed with FetchFailure, so the previous stage could be retried on different (still random) machines.

Checksum for broadcast will be added by another PR.

## How was this patch tested?

Added unit tests

Author: Davies Liu <daviesdatabricks.com>

Closes apache#15923 from davies/detect_corrupt.

Author: Davies Liu <[email protected]>

Closes apache#159 from ericl/sc-5362.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

There is an outstanding issue that existed for a long time: Sometimes the shuffle blocks are corrupt and can't be decompressed. We recently hit this in three different workloads, sometimes we can reproduce it by every try, sometimes can't. I also found that when the corruption happened, the beginning and end of the blocks are correct, the corruption happen in the middle. There was one case that the string of block id is corrupt by one character. It seems that it's very likely the corruption is introduced by some weird machine/hardware, also the checksum (16 bits) in TCP is not strong enough to identify all the corruption.

Unfortunately, Spark does not have checksum for shuffle blocks or broadcast, the job will fail if any corruption happen in the shuffle block from disk, or broadcast blocks during network. This PR try to detect the corruption after fetching shuffle blocks by decompressing them, because most of the compression already have checksum in them. It will retry the block, or failed with FetchFailure, so the previous stage could be retried on different (still random) machines.

Checksum for broadcast will be added by another PR.

## How was this patch tested?

Added unit tests

Author: Davies Liu <[email protected]>

Closes apache#15923 from davies/detect_corrupt.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants