-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18546][core] Fix merging shuffle spills when using encryption. #15982
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
EDIT: rebased so now diff only contains the fix for this bug. |
|
/cc @zsxwing @JoshRosen |
|
Test build #69030 has finished for PR 15982 at commit
|
|
Test build #69032 has finished for PR 15982 at commit
|
|
retest this please |
|
Test build #69035 has finished for PR 15982 at commit
|
|
Unrelated failure. retest this please |
|
Test build #69040 has finished for PR 15982 at commit
|
|
Ping. |
The problem exists because it's not possible to just concatenate encrypted partition data from different spill files; currently each partition would have its own initial vector to set up encryption, and the final merged file should contain a single initial vector for each merged partiton, otherwise iterating over each record becomes really hard. To fix that, UnsafeShuffleWriter now decrypts the partitions when merging, so that the merged file contains a single initial vector at the start of the partition data. Because it's not possible to do that using the fast transferTo path, when encryption is enabled UnsafeShuffleWriter will revert back to using file streams when merging. It may be possible to use a hybrid approach when using encryption, using an intermediate direct buffer when reading from files and encrypting the data, but that's better left for a separate patch. As part of the change I made DiskBlockObjectWriter take a SerializerManager instead of a "wrap stream" closure, since that makes it easier to test the code without having to mock SerializerManager functionality. Tested with newly added unit tests (UnsafeShuffleWriterSuite for the write side and ExternalAppendOnlyMapSuite for integration), and by running some apps that failed without the fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. Just left some minor comments
| defaultSerializer: Serializer, | ||
| conf: SparkConf, | ||
| encryptionKey: Option[Array[Byte]]) { | ||
| val encryptionKey: Option[Array[Byte]]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I prefer to add a method called isEncryptionEnabled instead of exposing this field.
| final long[] partitionLengths = new long[numPartitions]; | ||
| final InputStream[] spillInputStreams = new FileInputStream[spills.length]; | ||
| OutputStream mergedFileOutputStream = null; | ||
| final CountingOutputStream mergedFileOutputStream = new CountingOutputStream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment about why need to use CountingOutputStream + CloseShieldOutputStream? It took me a while to figure out the optimization you did.
| * Wrap an output stream for compression if block compression is enabled for its block type | ||
| */ | ||
| private[this] def wrapForCompression(blockId: BlockId, s: OutputStream): OutputStream = { | ||
| def wrapForCompression(blockId: BlockId, s: OutputStream): OutputStream = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not need to open this method
| * Wrap an input stream for compression if block compression is enabled for its block type | ||
| */ | ||
| private[this] def wrapForCompression(blockId: BlockId, s: InputStream): InputStream = { | ||
| def wrapForCompression(blockId: BlockId, s: InputStream): InputStream = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not need to open this method
| Closeables.close(partitionInputStream, innerThrewException); | ||
| InputStream partitionInputStream = new LimitedInputStream(spillInputStreams[i], | ||
| partitionLengthInSpill, false); | ||
| partitionInputStream = blockManager.serializerManager().wrapForEncryption( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionInputStream is not closed
|
|
||
| package org.apache.spark.util.collection | ||
|
|
||
| import java.security.PrivilegedExceptionAction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unused import
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark._ | ||
| import org.apache.spark.deploy.SparkHadoopUtil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unused import
| } | ||
|
|
||
| @Test | ||
| public void mergeSpillsWithCompressionAndEncryptionSlowPath() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also test testMergingSpills(false, null, true); and testMergingSpills(true, null, true).
|
Test build #69337 has finished for PR 15982 at commit
|
| partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); | ||
| } | ||
|
|
||
| partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another change here is that TimeTrackingOutputStream now goes around the compression codec. I think that is the right change, but its at least worth mentioning in the commit msg.
I'm wondering if this its worth having a separate jira for this, just since it will effect metrics for all users
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... let me revert this and open a bug. DiskBlockObjectWriter doesn't count the time for compression / encryption, so this should behave the same. Both should be fixed together.
| import org.apache.spark.ShuffleDependency; | ||
| import org.apache.spark.SparkConf; | ||
| import org.apache.spark.TaskContext; | ||
| import org.apache.spark.deploy.SparkHadoopUtil; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other than CryptoStreamUtils, the other added imports look unused. Also looks like you can eliminate AbstractFunction1 and ByteStreams since you are no longer using them.
| @Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager; | ||
| @Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager; | ||
|
|
||
| private static final class WrapStream extends AbstractFunction1<OutputStream, OutputStream> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can eliminate the imports of AbstractFunction1 and OutputStream after this
|
|
||
| private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "4m"); | ||
|
|
||
| private static final class WrapStream extends AbstractFunction1<OutputStream, OutputStream> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same on trimming imports
|
Test build #69348 has finished for PR 15982 at commit
|
|
Test build #69360 has finished for PR 15982 at commit
|
|
retest this please |
|
Test build #69375 has finished for PR 15982 at commit
|
|
I think the comment on |
|
LGTM |
|
Test build #69419 has finished for PR 15982 at commit
|
|
Merging to master / 2.1. |
The problem exists because it's not possible to just concatenate encrypted partition data from different spill files; currently each partition would have its own initial vector to set up encryption, and the final merged file should contain a single initial vector for each merged partiton, otherwise iterating over each record becomes really hard. To fix that, UnsafeShuffleWriter now decrypts the partitions when merging, so that the merged file contains a single initial vector at the start of the partition data. Because it's not possible to do that using the fast transferTo path, when encryption is enabled UnsafeShuffleWriter will revert back to using file streams when merging. It may be possible to use a hybrid approach when using encryption, using an intermediate direct buffer when reading from files and encrypting the data, but that's better left for a separate patch. As part of the change I made DiskBlockObjectWriter take a SerializerManager instead of a "wrap stream" closure, since that makes it easier to test the code without having to mock SerializerManager functionality. Tested with newly added unit tests (UnsafeShuffleWriterSuite for the write side and ExternalAppendOnlyMapSuite for integration), and by running some apps that failed without the fix. Author: Marcelo Vanzin <[email protected]> Closes #15982 from vanzin/SPARK-18546. (cherry picked from commit 93e9d88) Signed-off-by: Marcelo Vanzin <[email protected]>
The problem exists because it's not possible to just concatenate encrypted partition data from different spill files; currently each partition would have its own initial vector to set up encryption, and the final merged file should contain a single initial vector for each merged partiton, otherwise iterating over each record becomes really hard. To fix that, UnsafeShuffleWriter now decrypts the partitions when merging, so that the merged file contains a single initial vector at the start of the partition data. Because it's not possible to do that using the fast transferTo path, when encryption is enabled UnsafeShuffleWriter will revert back to using file streams when merging. It may be possible to use a hybrid approach when using encryption, using an intermediate direct buffer when reading from files and encrypting the data, but that's better left for a separate patch. As part of the change I made DiskBlockObjectWriter take a SerializerManager instead of a "wrap stream" closure, since that makes it easier to test the code without having to mock SerializerManager functionality. Tested with newly added unit tests (UnsafeShuffleWriterSuite for the write side and ExternalAppendOnlyMapSuite for integration), and by running some apps that failed without the fix. Author: Marcelo Vanzin <[email protected]> Closes apache#15982 from vanzin/SPARK-18546.
The problem exists because it's not possible to just concatenate encrypted partition data from different spill files; currently each partition would have its own initial vector to set up encryption, and the final merged file should contain a single initial vector for each merged partiton, otherwise iterating over each record becomes really hard. To fix that, UnsafeShuffleWriter now decrypts the partitions when merging, so that the merged file contains a single initial vector at the start of the partition data. Because it's not possible to do that using the fast transferTo path, when encryption is enabled UnsafeShuffleWriter will revert back to using file streams when merging. It may be possible to use a hybrid approach when using encryption, using an intermediate direct buffer when reading from files and encrypting the data, but that's better left for a separate patch. As part of the change I made DiskBlockObjectWriter take a SerializerManager instead of a "wrap stream" closure, since that makes it easier to test the code without having to mock SerializerManager functionality. Tested with newly added unit tests (UnsafeShuffleWriterSuite for the write side and ExternalAppendOnlyMapSuite for integration), and by running some apps that failed without the fix. Author: Marcelo Vanzin <[email protected]> Closes apache#15982 from vanzin/SPARK-18546.
The problem exists because it's not possible to just concatenate encrypted
partition data from different spill files; currently each partition would
have its own initial vector to set up encryption, and the final merged file
should contain a single initial vector for each merged partiton, otherwise
iterating over each record becomes really hard.
To fix that, UnsafeShuffleWriter now decrypts the partitions when merging,
so that the merged file contains a single initial vector at the start of
the partition data.
Because it's not possible to do that using the fast transferTo path, when
encryption is enabled UnsafeShuffleWriter will revert back to using file
streams when merging. It may be possible to use a hybrid approach when
using encryption, using an intermediate direct buffer when reading from
files and encrypting the data, but that's better left for a separate patch.
As part of the change I made DiskBlockObjectWriter take a SerializerManager
instead of a "wrap stream" closure, since that makes it easier to test the
code without having to mock SerializerManager functionality.
Tested with newly added unit tests (UnsafeShuffleWriterSuite for the write
side and ExternalAppendOnlyMapSuite for integration), and by running some
apps that failed without the fix.