Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,18 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
/**
* Called from executors to get the server URIs and output sizes for each shuffle block that
* needs to be read from a given range of map output partitions (startPartition is included but
* endPartition is excluded from the range) and is produced by a specific mapper.
* endPartition is excluded from the range) and is produced by
* a range of mappers (startMapIndex, endMapIndex, startMapIndex is included and
* the endMapIndex is excluded).
*
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
* and the second item is a sequence of (shuffle block id, shuffle block size, map index)
* tuples describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByMapIndex(
def getMapSizesByRange(
shuffleId: Int,
mapIndex: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]

Expand Down Expand Up @@ -688,20 +691,25 @@ private[spark] class MapOutputTrackerMaster(
}

/**
* Return the location where the Mapper ran. The locations each includes both a host and an
* Return the locations where the Mappers ran. The locations each includes both a host and an
* executor id on that host.
*
* @param dep shuffle dependency object
* @param mapId the map id
* @param startMapIndex the start map index
* @param endMapIndex the end map index
* @return a sequence of locations where task runs.
*/
def getMapLocation(dep: ShuffleDependency[_, _, _], mapId: Int): Seq[String] =
def getMapLocation(
dep: ShuffleDependency[_, _, _],
startMapIndex: Int,
endMapIndex: Int): Seq[String] =
{
val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
if (shuffleStatus != null) {
shuffleStatus.withMapStatuses { statuses =>
if (mapId >= 0 && mapId < statuses.length) {
Seq(statuses(mapId).location.host)
if (startMapIndex < endMapIndex && (startMapIndex >= 0 && endMapIndex < statuses.length)) {
val statusesPicked = statuses.slice(startMapIndex, endMapIndex).filter(_ != null)
statusesPicked.map(_.location.host).toSeq
} else {
Nil
}
Expand Down Expand Up @@ -737,29 +745,26 @@ private[spark] class MapOutputTrackerMaster(
case Some (shuffleStatus) =>
shuffleStatus.withMapStatuses { statuses =>
MapOutputTracker.convertMapStatuses(
shuffleId, startPartition, endPartition, statuses)
shuffleId, startPartition, endPartition, statuses, 0, shuffleStatus.mapStatuses.length)
}
case None =>
Iterator.empty
}
}

override def getMapSizesByMapIndex(
override def getMapSizesByRange(
shuffleId: Int,
mapIndex: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, mapIndex $mapIndex" +
logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" +
s"partitions $startPartition-$endPartition")
shuffleStatuses.get(shuffleId) match {
case Some (shuffleStatus) =>
case Some(shuffleStatus) =>
shuffleStatus.withMapStatuses { statuses =>
MapOutputTracker.convertMapStatuses(
shuffleId,
startPartition,
endPartition,
statuses,
Some(mapIndex))
shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex)
}
case None =>
Iterator.empty
Expand Down Expand Up @@ -802,7 +807,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
val statuses = getStatuses(shuffleId, conf)
try {
MapOutputTracker.convertMapStatuses(
shuffleId, startPartition, endPartition, statuses)
shuffleId, startPartition, endPartition, statuses, 0, statuses.length)
} catch {
case e: MetadataFetchFailedException =>
// We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
Expand All @@ -811,17 +816,18 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
}
}

override def getMapSizesByMapIndex(
override def getMapSizesByRange(
shuffleId: Int,
mapIndex: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, mapIndex $mapIndex" +
logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" +
s"partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId, conf)
try {
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition,
statuses, Some(mapIndex))
MapOutputTracker.convertMapStatuses(
shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex)
} catch {
case e: MetadataFetchFailedException =>
// We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
Expand Down Expand Up @@ -980,7 +986,8 @@ private[spark] object MapOutputTracker extends Logging {
* @param startPartition Start of map output partition ID range (included in range)
* @param endPartition End of map output partition ID range (excluded from range)
* @param statuses List of map statuses, indexed by map partition index.
* @param mapIndex When specified, only shuffle blocks from this mapper will be processed.
* @param startMapIndex Start Map index.
* @param endMapIndex End Map index.
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
* and the second item is a sequence of (shuffle block id, shuffle block size, map index)
* tuples describing the shuffle blocks that are stored at that block manager.
Expand All @@ -990,11 +997,12 @@ private[spark] object MapOutputTracker extends Logging {
startPartition: Int,
endPartition: Int,
statuses: Array[MapStatus],
mapIndex : Option[Int] = None): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
startMapIndex : Int,
endMapIndex: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
assert (statuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]]
val iter = statuses.iterator.zipWithIndex
for ((status, mapIndex) <- mapIndex.map(index => iter.filter(_._2 == index)).getOrElse(iter)) {
for ((status, mapIndex) <- iter.slice(startMapIndex, endMapIndex)) {
if (status == null) {
val errorMessage = s"Missing an output location for shuffle $shuffleId"
logError(errorMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ private[spark] trait ShuffleManager {
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]

/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive)
* that are produced by one specific mapper. Called on executors by reduce tasks.
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to
* read from map output (startMapIndex to endMapIndex - 1, inclusive).
* Called on executors by reduce tasks.
*/
def getReaderForOneMapper[K, C](
def getReaderForRange[K, C](
handle: ShuffleHandle,
mapIndex: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,16 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
}

override def getReaderForOneMapper[K, C](
override def getReaderForRange[K, C](
handle: ShuffleHandle,
mapIndex: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByMapIndex(
handle.shuffleId, mapIndex, startPartition, endPartition)
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByRange(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,36 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED =
buildConf("spark.sql.adaptive.optimizeSkewedJoin.enabled")
.doc("When true and adaptive execution is enabled, a skewed join is automatically handled at " +
"runtime.")
.booleanConf
.createWithDefault(true)

val ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD =
buildConf("spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionSizeThreshold")
.doc("Configures the minimum size in bytes for a partition that is considered as a skewed " +
"partition in adaptive skewed join.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(64 * 1024 * 1024)

val ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR =
buildConf("spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionFactor")
.doc("A partition is considered as a skewed partition if its size is larger than" +
" this factor multiple the median partition size and also larger than " +
s" ${ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key}")
.intConf
.createWithDefault(10)

val ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS =
buildConf("spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionMaxSplits")
.doc("Configures the maximum number of task to handle a skewed partition in adaptive skewed" +
"join.")
.intConf
.checkValue( _ >= 1, "The split size at least be 1")
.createWithDefault(5)

val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
.doc("The relation with a non-empty partition ratio lower than this config will not be " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A
class ShuffledRowRDD(
var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
metrics: Map[String, SQLMetric],
specifiedPartitionStartIndices: Option[Array[Int]] = None)
specifiedPartitionIndices: Option[Array[(Int, Int)]] = None)
extends RDD[InternalRow](dependency.rdd.context, Nil) {

if (SQLConf.get.fetchShuffleBlocksInBatchEnabled) {
Expand All @@ -126,8 +126,8 @@ class ShuffledRowRDD(

private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions

private[this] val partitionStartIndices: Array[Int] = specifiedPartitionStartIndices match {
case Some(indices) => indices
private[this] val partitionStartIndices: Array[Int] = specifiedPartitionIndices match {
case Some(indices) => indices.map(_._1)
case None =>
// When specifiedPartitionStartIndices is not defined, every post-shuffle partition
// corresponds to a pre-shuffle partition.
Expand All @@ -142,16 +142,15 @@ class ShuffledRowRDD(
override val partitioner: Option[Partitioner] = Some(part)

override def getPartitions: Array[Partition] = {
assert(partitionStartIndices.length == part.numPartitions)
Array.tabulate[Partition](partitionStartIndices.length) { i =>
val startIndex = partitionStartIndices(i)
val endIndex =
if (i < partitionStartIndices.length - 1) {
partitionStartIndices(i + 1)
} else {
numPreShufflePartitions
specifiedPartitionIndices match {
case Some(indices) =>
Array.tabulate[Partition](indices.length) { i =>
new ShuffledRowRDDPartition(i, indices(i)._1, indices(i)._2)
}
case None =>
Array.tabulate[Partition](numPreShufflePartitions) { i =>
new ShuffledRowRDDPartition(i, i, i + 1)
}
new ShuffledRowRDDPartition(i, startIndex, endIndex)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ case class AdaptiveSparkPlanExec(
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReuseAdaptiveSubquery(conf, context.subqueryCache),
// Here the 'OptimizeSkewedPartitions' rule should be executed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment out-of-date

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I Will update later.

// before 'ReduceNumShufflePartitions', as the skewed partition handled
// in 'OptimizeSkewedPartitions' rule, should be omitted in 'ReduceNumShufflePartitions'.
OptimizeSkewedJoin(conf),
ReduceNumShufflePartitions(conf),
// The rule of 'OptimizeLocalShuffleReader' need to make use of the 'partitionStartIndices'
// in 'ReduceNumShufflePartitions' rule. So it must be after 'ReduceNumShufflePartitions' rule.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class LocalShuffledRowRDD(

override def getPreferredLocations(partition: Partition): Seq[String] = {
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
tracker.getMapLocation(dependency, partition.index)
tracker.getMapLocation(dependency, partition.index, partition.index + 1)
}

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
Expand All @@ -92,9 +92,11 @@ class LocalShuffledRowRDD(
// `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator,
// as well as the `tempMetrics` for basic shuffle metrics.
val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
val reader = SparkEnv.get.shuffleManager.getReaderForOneMapper(

val reader = SparkEnv.get.shuffleManager.getReaderForRange(
dependency.shuffleHandle,
mapIndex,
mapIndex + 1,
localRowPartition.startPartition,
localRowPartition.endPartition,
context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
plan match {
case c @ CoalescedShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
LocalShuffleReaderExec(
s, getPartitionStartIndices(s, Some(c.partitionStartIndices.length)))
s, getPartitionStartIndices(s, Some(c.partitionIndices.length)))
case s: ShuffleQueryStageExec =>
LocalShuffleReaderExec(s, getPartitionStartIndices(s, None))
}
Expand Down
Loading