Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 100 additions & 56 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner}
import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.{SQLContext, Row}
Expand Down Expand Up @@ -59,11 +59,62 @@ case class Exchange(

override def output: Seq[Attribute] = child.output

/** We must copy rows when sort based shuffle is on */
protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]

private val bypassMergeThreshold =
child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
/**
* Determines whether records must be defensively copied before being sent to the shuffle.
* Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
* shuffle code assumes that objects are immutable and hence does not perform its own defensive
* copying. In Spark SQL, however, operators' iterators return the same mutable `Row` object. In
* order to properly shuffle the output of these operators, we need to perform our own copying
* prior to sending records to the shuffle. This copying is expensive, so we try to avoid it
* whenever possible. This method encapsulates the logic for choosing when to copy.
*
* In the long run, we might want to push this logic into core's shuffle APIs so that we don't
* have to rely on knowledge of core internals here in SQL.
*
* See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.
*
* @param partitioner the partitioner for the shuffle
* @param serializer the serializer that will be used to write rows
* @return true if rows should be copied before being shuffled, false otherwise
*/
private def needToCopyObjectsBeforeShuffle(
partitioner: Partitioner,
serializer: Serializer): Boolean = {
// Note: even though we only use the partitioner's `numPartitions` field, we require it to be
// passed instead of directly passing the number of partitions in order to guard against
// corner-cases where a partitioner constructed with `numPartitions` partitions may output
// fewer partitions (like RangePartitioner, for example).
val conf = child.sqlContext.sparkContext.conf
val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
if (newOrdering.nonEmpty) {
// If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
// which requires a defensive copy.
true
} else if (sortBasedShuffleOn) {
// Spark's sort-based shuffle also uses `ExternalSorter` to buffer records in memory.
// However, there are two special cases where we can avoid the copy, described below:
if (partitioner.numPartitions <= bypassMergeThreshold) {
// If the number of output partitions is sufficiently small, then Spark will fall back to
// the old hash-based shuffle write path which doesn't buffer deserialized records.
// Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
false
} else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) {
// SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting
// them. This optimization is guarded by a feature-flag and is only applied in cases where
// shuffle dependency does not specify an ordering and the record serializer has certain
// properties. If this optimization is enabled, we can safely avoid the copy.
false
} else {
// None of the special cases held, so we must copy.
true
}
} else {
// We're using hash-based shuffle, so we don't need to copy.
false
}
}

private val keyOrdering = {
if (newOrdering.nonEmpty) {
Expand All @@ -81,7 +132,7 @@ case class Exchange(

@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf

def serializer(
private def getSerializer(
keySchema: Array[DataType],
valueSchema: Array[DataType],
hasKeyOrdering: Boolean,
Expand Down Expand Up @@ -112,17 +163,12 @@ case class Exchange(
override def execute(): RDD[Row] = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
// This is a workaround for SPARK-4479. When:
// 1. sort based shuffle is on, and
// 2. the partition number is under the merge threshold, and
// 3. no ordering is required
// we can avoid the defensive copies to improve performance. In the long run, we probably
// want to include information in shuffle dependencies to indicate whether elements in the
// source RDD should be copied.
val willMergeSort = sortBasedShuffleOn && numPartitions > bypassMergeThreshold

val rdd = if (willMergeSort || newOrdering.nonEmpty) {
val keySchema = expressions.map(_.dataType).toArray
val valueSchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions)
val part = new HashPartitioner(numPartitions)

val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
iter.map(r => (hashExpressions(r).copy(), r.copy()))
Expand All @@ -134,63 +180,61 @@ case class Exchange(
iter.map(r => mutablePair.update(hashExpressions(r), r))
}
}
val part = new HashPartitioner(numPartitions)
val shuffled =
if (newOrdering.nonEmpty) {
new ShuffledRDD[Row, Row, Row](rdd, part).setKeyOrdering(keyOrdering)
} else {
new ShuffledRDD[Row, Row, Row](rdd, part)
}
val keySchema = expressions.map(_.dataType).toArray
val valueSchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(
serializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions))

val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
if (newOrdering.nonEmpty) {
shuffled.setKeyOrdering(keyOrdering)
}
shuffled.setSerializer(serializer)
shuffled.map(_._2)

case RangePartitioning(sortingExpressions, numPartitions) =>
val rdd = if (sortBasedShuffleOn || newOrdering.nonEmpty) {
child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))}
} else {
child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Row, Null](null, null)
iter.map(row => mutablePair.update(row, null))
val keySchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(keySchema, null, newOrdering.nonEmpty, numPartitions)

val childRdd = child.execute()
val part: Partitioner = {
// Internally, RangePartitioner runs a job on the RDD that samples keys to compute
// partition bounds. To get accurate samples, we need to copy the mutable keys.
val rddForSampling = childRdd.mapPartitions { iter =>
val mutablePair = new MutablePair[Row, Null]()
iter.map(row => mutablePair.update(row.copy(), null))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to know how many partitions will be shuffled, we need to create the RangePartitioner and compute its partition bounds. Depending on the results of the sampling, we may end up with a partitioner that produces fewer than numPartitions partitions, so we need to know the partitioner's actual number of partitions in order to determine whether sort-based shuffle will bypass its merge sort and fall back to hash-shuffle (in which case we can avoid a copy).

In the old code, the input to the range partitioner was rdd, so the input to the range bounds calculation may or may not have been defensively copied. As a result, I suspect that the actual bounds computed could vary depending on which shuffle mode was being used, which might result in extra work being done to compute accurate partition counts. To address this, I've updated this code to always perform a defensive copy of the partitioner input. This allows us to determine the actual partition count before computing rdd, which lets us benefit from the copy bypass optimizations.

}
// TODO: RangePartitioner should take an Ordering.
implicit val ordering = new RowOrdering(sortingExpressions, child.output)
new RangePartitioner(numPartitions, rddForSampling, ascending = true)
}

// TODO: RangePartitioner should take an Ordering.
implicit val ordering = new RowOrdering(sortingExpressions, child.output)

val part = new RangePartitioner(numPartitions, rdd, ascending = true)
val shuffled =
if (newOrdering.nonEmpty) {
new ShuffledRDD[Row, Null, Null](rdd, part).setKeyOrdering(keyOrdering)
} else {
new ShuffledRDD[Row, Null, Null](rdd, part)
val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
childRdd.mapPartitions { iter => iter.map(row => (row.copy(), null))}
} else {
childRdd.mapPartitions { iter =>
val mutablePair = new MutablePair[Row, Null]()
iter.map(row => mutablePair.update(row, null))
}
val keySchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(
serializer(keySchema, null, newOrdering.nonEmpty, numPartitions))
}

val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
if (newOrdering.nonEmpty) {
shuffled.setKeyOrdering(keyOrdering)
}
shuffled.setSerializer(serializer)
shuffled.map(_._1)

case SinglePartition =>
// SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since
// operators like `TakeOrdered` may require an ordering within the partition, and currently
// `SinglePartition` doesn't include ordering information.
// TODO Add `SingleOrderedPartition` for operators like `TakeOrdered`
val rdd = if (sortBasedShuffleOn) {
val valueSchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(null, valueSchema, hasKeyOrdering = false, 1)
val partitioner = new HashPartitioner(1)

val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) {
child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
} else {
child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Null, Row]()
iter.map(r => mutablePair.update(null, r))
}
}
val partitioner = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner)
val valueSchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(serializer(null, valueSchema, false, 1))
shuffled.setSerializer(serializer)
shuffled.map(_._2)

case _ => sys.error(s"Exchange not implemented for $newPartitioning")
Expand Down