Skip to content

Conversation

@andrewor14
Copy link
Contributor

Summary of the changes

The bulk of this PR is comprised of tests and documentation; the actual fix is really just adding 1 line of code (see BlockObjectWriter.scala). We currently do not run the External* test suites with different compression codecs, and this would have caught the bug reported in SPARK-3277. This PR extends the existing code to test spilling using all compression codecs known to Spark, including LZ4.

The bug itself

In DiskBlockObjectWriter, we only report the shuffle bytes written before we close the streams. With LZ4, all the bytes written reported by our metrics were 0 because flush() was not taking effect for some reason. In general, compression codecs may write additional bytes to the file after we call close(), and so we must also capture those bytes in our shuffle write metrics.

Thanks @mridulm and @pwendell for help with debugging.

@SparkQA
Copy link

SparkQA commented Aug 28, 2014

QA tests have started for PR 2187 at commit 1c4624e.

  • This patch merges cleanly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cool trick I used in YarnSparkHadoopUtilSuite is to do something like this:

def spillingTest(name: String)(testFn: => Unit) = test(name) {
    blah blah blah testFn() blah
}

Then you can register like this:

spillingTest("spilling with null keys and values") {
   // body of testSpillingWithNullKeysAndValues
}

Would look a little bit cleaner overall, but no biggie.

@vanzin
Copy link
Contributor

vanzin commented Aug 28, 2014

LGTM, kinda skimmed through the tests since it looked like most changes were changing == for === and other minor things like that.

@SparkQA
Copy link

SparkQA commented Aug 29, 2014

QA tests have finished for PR 2187 at commit 1c4624e.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 29, 2014

QA tests have started for PR 2187 at commit 1b54bdc.

  • This patch merges cleanly.

asfgit pushed a commit that referenced this pull request Aug 29, 2014
**Summary of the changes**

The bulk of this PR is comprised of tests and documentation; the actual fix is really just adding 1 line of code (see `BlockObjectWriter.scala`). We currently do not run the `External*` test suites with different compression codecs, and this would have caught the bug reported in [SPARK-3277](https://issues.apache.org/jira/browse/SPARK-3277). This PR extends the existing code to test spilling using all compression codecs known to Spark, including `LZ4`.

**The bug itself**

In `DiskBlockObjectWriter`, we only report the shuffle bytes written before we close the streams. With `LZ4`, all the bytes written reported by our metrics were 0 because `flush()` was not taking effect for some reason. In general, compression codecs may write additional bytes to the file after we call `close()`, and so we must also capture those bytes in our shuffle write metrics.

Thanks mridulm and pwendell for help with debugging.

Author: Andrew Or <[email protected]>
Author: Patrick Wendell <[email protected]>

Closes #2187 from andrewor14/fix-lz4-spilling and squashes the following commits:

1b54bdc [Andrew Or] Speed up tests by not compressing everything
1c4624e [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-lz4-spilling
6b2e7d1 [Andrew Or] Fix compilation error
92e251b [Patrick Wendell] Better documentation for BlockObjectWriter.
a1ad536 [Andrew Or] Fix tests
089593f [Andrew Or] Actually fix SPARK-3277 (tests still fail)
4bbcf68 [Andrew Or] Update tests to actually test all compression codecs
b264a84 [Andrew Or] ExternalAppendOnlyMapSuite code style fixes (minor)
1bfa743 [Andrew Or] Add more information to assert for better debugging
@asfgit asfgit closed this in a46b8f2 Aug 29, 2014
@SparkQA
Copy link

SparkQA commented Aug 29, 2014

QA tests have finished for PR 2187 at commit 1b54bdc.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14 andrewor14 deleted the fix-lz4-spilling branch August 29, 2014 01:24
@mridulm
Copy link
Contributor

mridulm commented Aug 29, 2014

This change looks incorrect.
On commitAndClose, reportedPosition is not updated.
So a subsequent close or revert won't update bytesWritten properly.

Since this was a blocker, would have been better to not rush into committing it.

@vanzin
Copy link
Contributor

vanzin commented Aug 29, 2014

Hi @mridulm

On commitAndClose, reportedPosition is not updated.
So a subsequent close or revert won't update bytesWritten properly.

Hmmm, maybe it's my unfamiliarity with the innards here, but why would you call close or revert after you've already committed and closed the writer?

(Perhaps close() could be made idempotent, but that would just protect against buggy code?)

@mridulm
Copy link
Contributor

mridulm commented Aug 29, 2014

A closed writer can be reverted in case some other writer's close fails in the shuffle group and we have to revert the entire group.
Particularly relevant when we have consolidated shuffle.

Yes, close should be idempotent - which is not the case here : each close will add to metrics

@vanzin
Copy link
Contributor

vanzin commented Aug 29, 2014

After a second look, close seems to be idempotent (albeit not thread-safe, but that's ok?). So it seems that the only problem is not updating reportedPosition in commitAndClose()?

@andrewor14
Copy link
Contributor Author

Maybe I'm missing something, but I don't see a way for the writer to be reverted after close() is called since we set everything to null there. The only place to revert the writes is through revertPartialWritesAndClose(), but this should not be called after close() anyway. Perhaps we should add some safeguard against calling anything else in this class after we call close(), but as far as I'm concerned the existing code is correct since we don't use reportedPosition again after close(), so there doesn't seem to be a need to update it.

@mridulm
Copy link
Contributor

mridulm commented Aug 29, 2014

I don't have my workspace handy, but search for revert usages ... That
should answer where it is called.
Since I added the error handling, I am fairly certain you will have revert
after close in case of issues : not doing this actually breaks consolidated
shuffle (1.0 behavior)
On 29-Aug-2014 11:52 pm, "andrewor14" [email protected] wrote:

Maybe I'm missing something, but I don't see a way for the writer to be
reverted after close() is called. The only place to revert the writes is
through revertPartialWritesAndClose(), but this should not be called
after close() anyway. Perhaps we should add some safeguard against
calling anything else in this class after we call close(), but as far as
I'm concerned the existing code is correct since we don't use
reportedPosition again after close(), so there is no need to update it.


Reply to this email directly or view it on GitHub
#2187 (comment).

@mridulm
Copy link
Contributor

mridulm commented Aug 29, 2014

Writers are not thread safe, so that is fine.
Updating reportedPosition might be sufficient; a naive analysis did suggest
that was minimal change required to this pr.

But this is slightly involved code reused in various forms; with extremely
minimal testcases ... Hence why we need to be very careful making changes
to it (similar to the mess that is Connection/ConnectionManager).
On 29-Aug-2014 10:25 pm, "Marcelo Vanzin" [email protected] wrote:

After a second look, close seems to be idempotent (albeit not thread-safe,
but that's ok?). So it seems that the only problem is not updating
reportedPosition in commitAndClose()?


Reply to this email directly or view it on GitHub
#2187 (comment).

xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
**Summary of the changes**

The bulk of this PR is comprised of tests and documentation; the actual fix is really just adding 1 line of code (see `BlockObjectWriter.scala`). We currently do not run the `External*` test suites with different compression codecs, and this would have caught the bug reported in [SPARK-3277](https://issues.apache.org/jira/browse/SPARK-3277). This PR extends the existing code to test spilling using all compression codecs known to Spark, including `LZ4`.

**The bug itself**

In `DiskBlockObjectWriter`, we only report the shuffle bytes written before we close the streams. With `LZ4`, all the bytes written reported by our metrics were 0 because `flush()` was not taking effect for some reason. In general, compression codecs may write additional bytes to the file after we call `close()`, and so we must also capture those bytes in our shuffle write metrics.

Thanks mridulm and pwendell for help with debugging.

Author: Andrew Or <[email protected]>
Author: Patrick Wendell <[email protected]>

Closes apache#2187 from andrewor14/fix-lz4-spilling and squashes the following commits:

1b54bdc [Andrew Or] Speed up tests by not compressing everything
1c4624e [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-lz4-spilling
6b2e7d1 [Andrew Or] Fix compilation error
92e251b [Patrick Wendell] Better documentation for BlockObjectWriter.
a1ad536 [Andrew Or] Fix tests
089593f [Andrew Or] Actually fix SPARK-3277 (tests still fail)
4bbcf68 [Andrew Or] Update tests to actually test all compression codecs
b264a84 [Andrew Or] ExternalAppendOnlyMapSuite code style fixes (minor)
1bfa743 [Andrew Or] Add more information to assert for better debugging
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants