Skip to content

Conversation

@vanzin
Copy link
Contributor

@vanzin vanzin commented Nov 22, 2016

The problem exists because it's not possible to just concatenate encrypted
partition data from different spill files; currently each partition would
have its own initial vector to set up encryption, and the final merged file
should contain a single initial vector for each merged partiton, otherwise
iterating over each record becomes really hard.

To fix that, UnsafeShuffleWriter now decrypts the partitions when merging,
so that the merged file contains a single initial vector at the start of
the partition data.

Because it's not possible to do that using the fast transferTo path, when
encryption is enabled UnsafeShuffleWriter will revert back to using file
streams when merging. It may be possible to use a hybrid approach when
using encryption, using an intermediate direct buffer when reading from
files and encrypting the data, but that's better left for a separate patch.

As part of the change I made DiskBlockObjectWriter take a SerializerManager
instead of a "wrap stream" closure, since that makes it easier to test the
code without having to mock SerializerManager functionality.

Tested with newly added unit tests (UnsafeShuffleWriterSuite for the write
side and ExternalAppendOnlyMapSuite for integration), and by running some
apps that failed without the fix.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 22, 2016

This also contains the changes in #15981 since it's built on top of those, but they should be merged separately. The fix for this bug is in the second commit.

EDIT: rebased so now diff only contains the fix for this bug.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 22, 2016

/cc @zsxwing @JoshRosen

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #69030 has finished for PR 15982 at commit fbd2bca.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #69032 has finished for PR 15982 at commit 3b879f4.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 22, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Nov 23, 2016

Test build #69035 has finished for PR 15982 at commit 3b879f4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 23, 2016

Unrelated failure. retest this please

@SparkQA
Copy link

SparkQA commented Nov 23, 2016

Test build #69040 has finished for PR 15982 at commit 3b879f4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 28, 2016

Ping.

The problem exists because it's not possible to just concatenate encrypted
partition data from different spill files; currently each partition would
have its own initial vector to set up encryption, and the final merged file
should contain a single initial vector for each merged partiton, otherwise
iterating over each record becomes really hard.

To fix that, UnsafeShuffleWriter now decrypts the partitions when merging,
so that the merged file contains a single initial vector at the start of
the partition data.

Because it's not possible to do that using the fast transferTo path, when
encryption is enabled UnsafeShuffleWriter will revert back to using file
streams when merging. It may be possible to use a hybrid approach when
using encryption, using an intermediate direct buffer when reading from
files and encrypting the data, but that's better left for a separate patch.

As part of the change I made DiskBlockObjectWriter take a SerializerManager
instead of a "wrap stream" closure, since that makes it easier to test the
code without having to mock SerializerManager functionality.

Tested with newly added unit tests (UnsafeShuffleWriterSuite for the write
side and ExternalAppendOnlyMapSuite for integration), and by running some
apps that failed without the fix.
Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good. Just left some minor comments

defaultSerializer: Serializer,
conf: SparkConf,
encryptionKey: Option[Array[Byte]]) {
val encryptionKey: Option[Array[Byte]]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I prefer to add a method called isEncryptionEnabled instead of exposing this field.

final long[] partitionLengths = new long[numPartitions];
final InputStream[] spillInputStreams = new FileInputStream[spills.length];
OutputStream mergedFileOutputStream = null;
final CountingOutputStream mergedFileOutputStream = new CountingOutputStream(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment about why need to use CountingOutputStream + CloseShieldOutputStream? It took me a while to figure out the optimization you did.

* Wrap an output stream for compression if block compression is enabled for its block type
*/
private[this] def wrapForCompression(blockId: BlockId, s: OutputStream): OutputStream = {
def wrapForCompression(blockId: BlockId, s: OutputStream): OutputStream = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not need to open this method

* Wrap an input stream for compression if block compression is enabled for its block type
*/
private[this] def wrapForCompression(blockId: BlockId, s: InputStream): InputStream = {
def wrapForCompression(blockId: BlockId, s: InputStream): InputStream = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not need to open this method

Closeables.close(partitionInputStream, innerThrewException);
InputStream partitionInputStream = new LimitedInputStream(spillInputStreams[i],
partitionLengthInSpill, false);
partitionInputStream = blockManager.serializerManager().wrapForEncryption(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionInputStream is not closed


package org.apache.spark.util.collection

import java.security.PrivilegedExceptionAction
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unused import

import scala.collection.mutable.ArrayBuffer

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unused import

}

@Test
public void mergeSpillsWithCompressionAndEncryptionSlowPath() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also test testMergingSpills(false, null, true); and testMergingSpills(true, null, true).

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69337 has finished for PR 15982 at commit 2e03ee6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

partitionOutput = compressionCodec.compressedOutputStream(partitionOutput);
}

partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another change here is that TimeTrackingOutputStream now goes around the compression codec. I think that is the right change, but its at least worth mentioning in the commit msg.

I'm wondering if this its worth having a separate jira for this, just since it will effect metrics for all users

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... let me revert this and open a bug. DiskBlockObjectWriter doesn't count the time for compression / encryption, so this should behave the same. Both should be fixed together.

import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.deploy.SparkHadoopUtil;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other than CryptoStreamUtils, the other added imports look unused. Also looks like you can eliminate AbstractFunction1 and ByteStreams since you are no longer using them.

@Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager;
@Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager;

private static final class WrapStream extends AbstractFunction1<OutputStream, OutputStream> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can eliminate the imports of AbstractFunction1 and OutputStream after this


private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "4m");

private static final class WrapStream extends AbstractFunction1<OutputStream, OutputStream> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same on trimming imports

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69348 has finished for PR 15982 at commit 8ac9276.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69360 has finished for PR 15982 at commit 1025c6b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 30, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69375 has finished for PR 15982 at commit 1025c6b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Nov 30, 2016

I think the comment on #mergeSpillsWithFileStreams needs to be updated slightly to include encryption, but other than that lgtm.

@zsxwing
Copy link
Member

zsxwing commented Nov 30, 2016

LGTM

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69419 has finished for PR 15982 at commit 49737d9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 30, 2016

Merging to master / 2.1.

asfgit pushed a commit that referenced this pull request Nov 30, 2016
The problem exists because it's not possible to just concatenate encrypted
partition data from different spill files; currently each partition would
have its own initial vector to set up encryption, and the final merged file
should contain a single initial vector for each merged partiton, otherwise
iterating over each record becomes really hard.

To fix that, UnsafeShuffleWriter now decrypts the partitions when merging,
so that the merged file contains a single initial vector at the start of
the partition data.

Because it's not possible to do that using the fast transferTo path, when
encryption is enabled UnsafeShuffleWriter will revert back to using file
streams when merging. It may be possible to use a hybrid approach when
using encryption, using an intermediate direct buffer when reading from
files and encrypting the data, but that's better left for a separate patch.

As part of the change I made DiskBlockObjectWriter take a SerializerManager
instead of a "wrap stream" closure, since that makes it easier to test the
code without having to mock SerializerManager functionality.

Tested with newly added unit tests (UnsafeShuffleWriterSuite for the write
side and ExternalAppendOnlyMapSuite for integration), and by running some
apps that failed without the fix.

Author: Marcelo Vanzin <[email protected]>

Closes #15982 from vanzin/SPARK-18546.

(cherry picked from commit 93e9d88)
Signed-off-by: Marcelo Vanzin <[email protected]>
@asfgit asfgit closed this in 93e9d88 Nov 30, 2016
@vanzin vanzin deleted the SPARK-18546 branch November 30, 2016 22:54
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
The problem exists because it's not possible to just concatenate encrypted
partition data from different spill files; currently each partition would
have its own initial vector to set up encryption, and the final merged file
should contain a single initial vector for each merged partiton, otherwise
iterating over each record becomes really hard.

To fix that, UnsafeShuffleWriter now decrypts the partitions when merging,
so that the merged file contains a single initial vector at the start of
the partition data.

Because it's not possible to do that using the fast transferTo path, when
encryption is enabled UnsafeShuffleWriter will revert back to using file
streams when merging. It may be possible to use a hybrid approach when
using encryption, using an intermediate direct buffer when reading from
files and encrypting the data, but that's better left for a separate patch.

As part of the change I made DiskBlockObjectWriter take a SerializerManager
instead of a "wrap stream" closure, since that makes it easier to test the
code without having to mock SerializerManager functionality.

Tested with newly added unit tests (UnsafeShuffleWriterSuite for the write
side and ExternalAppendOnlyMapSuite for integration), and by running some
apps that failed without the fix.

Author: Marcelo Vanzin <[email protected]>

Closes apache#15982 from vanzin/SPARK-18546.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
The problem exists because it's not possible to just concatenate encrypted
partition data from different spill files; currently each partition would
have its own initial vector to set up encryption, and the final merged file
should contain a single initial vector for each merged partiton, otherwise
iterating over each record becomes really hard.

To fix that, UnsafeShuffleWriter now decrypts the partitions when merging,
so that the merged file contains a single initial vector at the start of
the partition data.

Because it's not possible to do that using the fast transferTo path, when
encryption is enabled UnsafeShuffleWriter will revert back to using file
streams when merging. It may be possible to use a hybrid approach when
using encryption, using an intermediate direct buffer when reading from
files and encrypting the data, but that's better left for a separate patch.

As part of the change I made DiskBlockObjectWriter take a SerializerManager
instead of a "wrap stream" closure, since that makes it easier to test the
code without having to mock SerializerManager functionality.

Tested with newly added unit tests (UnsafeShuffleWriterSuite for the write
side and ExternalAppendOnlyMapSuite for integration), and by running some
apps that failed without the fix.

Author: Marcelo Vanzin <[email protected]>

Closes apache#15982 from vanzin/SPARK-18546.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants