Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b615476
Scaffolding for sort-based shuffle
mateiz Jul 17, 2014
7a0895d
Some more partial work towards sort-based shuffle
mateiz Jul 18, 2014
3a56341
More partial work towards sort-based shuffle
mateiz Jul 18, 2014
bbf359d
More work
mateiz Jul 19, 2014
cc52caf
Add more error handling and tests for error cases
mateiz Jul 19, 2014
614f1b4
Add spill metrics to map tasks
mateiz Jul 20, 2014
5a40a1c
More tests
mateiz Jul 20, 2014
e1f84be
Fix disk block manager test
mateiz Jul 20, 2014
4b7a5ce
More tests, and ability to sort data if a total ordering is given
mateiz Jul 20, 2014
ef4e397
Support for partial aggregation even without an Ordering
mateiz Jul 21, 2014
ba7db7f
Handle null keys in hash-based comparator, and add tests for collisions
mateiz Jul 21, 2014
c1b7572
Small optimization
mateiz Jul 21, 2014
4988d16
tweak
mateiz Jul 21, 2014
de1fb40
Make trait SizeTrackingCollection private[spark]
mateiz Jul 21, 2014
c72362a
Added bug fix and test for when iterators are empty
mateiz Jul 21, 2014
e9ad356
Update ContextCleanerSuite to make sure shuffle cleanup tests use hash
mateiz Jul 21, 2014
5461cbb
Review comments and more tests (e.g. tests with 1 element per partition)
mateiz Jul 21, 2014
5686f71
Optimize merging phase for in-memory only data:
mateiz Jul 21, 2014
44d2a93
Use estimateSize instead of atGrowThreshold to test collection sizes
mateiz Jul 22, 2014
ad65fbd
Rebase on top of Aaron's Sorter change, and use Sorter in our buffer
mateiz Jul 22, 2014
3c7ff1f
Obey the spark.shuffle.spill setting in ExternalSorter
mateiz Jul 22, 2014
03e1006
Add a SortShuffleSuite that runs ShuffleSuite with sort-based shuffle
mateiz Jul 22, 2014
a34b352
Fix tracking of indices within a partition in SpillReader, and add test
mateiz Jul 23, 2014
fa2e8db
Allow nextBatchStream to be called after we're done looking at all st…
mateiz Jul 24, 2014
eb4ee0d
Remove customizable element type in ShuffledRDD
mateiz Jul 24, 2014
0174149
Add cleanup behavior and cleanup tests for sort-based shuffle
mateiz Jul 25, 2014
9464d5f
Simplify code and fix conflicts after latest rebase
mateiz Jul 28, 2014
f617432
Fix a failing test (seems to be due to change in SizeTracker logic)
mateiz Jul 28, 2014
62c56c8
Fix ShuffledRDD sometimes not returning Tuple2s.
mateiz Jul 28, 2014
a611159
Compile fixes due to rebase
mateiz Jul 30, 2014
d1c137f
Various review comments
mateiz Jul 30, 2014
bd841f9
Various review comments
mateiz Jul 30, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,23 @@ case class Aggregator[K, V, C] (
} else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
combiners.insertAll(iter)
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
// Update task metrics if context is not null
// TODO: Make context non optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
}
combiners.iterator
}
}

@deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] =
def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]]) : Iterator[(K, C)] =
combineCombinersByKey(iter, null)

def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext)
: Iterator[(K, C)] =
{
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
Expand All @@ -85,9 +90,12 @@ case class Aggregator[K, V, C] (
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
// Update task metrics if context is not null
// TODO: Make context non-optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
}
combiners.iterator
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class SparkContext(config: SparkConf) extends Logging {
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
}
Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
executorEnvs("SPARK_PREPEND_CLASSES") = v
}
// The Mesos scheduler backend relies on this environment variable to set executor memory.
Expand Down Expand Up @@ -1203,10 +1203,10 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
* If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
* check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
* If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
* check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
* if not.
*
*
* @param f the closure to clean
* @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
* @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def sample(withReplacement: Boolean, fraction: Double): JavaPairRDD[K, V] =
sample(withReplacement, fraction, Utils.random.nextLong)

/**
* Return a sampled subset of this RDD.
*/
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.rdd

import scala.language.existentials

import java.io.{IOException, ObjectOutputStream}

import scala.collection.mutable.ArrayBuffer
import scala.language.existentials

import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
Expand Down Expand Up @@ -157,8 +158,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
for ((it, depNum) <- rddIterators) {
map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
context.taskMetrics.memoryBytesSpilled += map.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += map.diskBytesSpilled
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag

import org.apache.spark.{Logging, RangePartitioner}
import org.apache.spark.annotation.DeveloperApi

/**
* Extra functions available on RDDs of (key, value) pairs where the key is sortable through
Expand All @@ -43,10 +44,10 @@ import org.apache.spark.{Logging, RangePartitioner}
*/
class OrderedRDDFunctions[K : Ordering : ClassTag,
V: ClassTag,
P <: Product2[K, V] : ClassTag](
P <: Product2[K, V] : ClassTag] @DeveloperApi() (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to add this annotation here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, see the comments on mateiz@bf71388.

self: RDD[P])
extends Logging with Serializable {

extends Logging with Serializable
{
private val ordering = implicitly[Ordering[K]]

/**
Expand All @@ -55,9 +56,12 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
* order of the keys).
*/
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
: RDD[(K, V)] =
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V, P](self, part)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C, (K, C)](self, partitioner)
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
Expand Down Expand Up @@ -425,7 +425,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V, (K, V)](self, partitioner)
new ShuffledRDD[K, V, V](self, partitioner)
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ abstract class RDD[T: ClassTag](
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
Expand All @@ -341,7 +341,7 @@ abstract class RDD[T: ClassTag](

// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T, (Int, T)](mapPartitionsWithIndex(distributePartition),
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions).values
} else {
Expand All @@ -352,8 +352,8 @@ abstract class RDD[T: ClassTag](
/**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean,
fraction: Double,
def sample(withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
Expand Down
17 changes: 9 additions & 8 deletions core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
* @tparam V the value class.
* @tparam C the combiner class.
*/
// TODO: Make this return RDD[Product2[K, C]] or have some way to configure mutable pairs
@DeveloperApi
class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
class ShuffledRDD[K, V, C](
@transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
extends RDD[P](prev.context, Nil) {
extends RDD[(K, C)](prev.context, Nil) {

private var serializer: Option[Serializer] = None

Expand All @@ -52,25 +53,25 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
private var mapSideCombine: Boolean = false

/** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C, P] = {
def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C] = {
this.serializer = Option(serializer)
this
}

/** Set key ordering for RDD's shuffle. */
def setKeyOrdering(keyOrdering: Ordering[K]): ShuffledRDD[K, V, C, P] = {
def setKeyOrdering(keyOrdering: Ordering[K]): ShuffledRDD[K, V, C] = {
this.keyOrdering = Option(keyOrdering)
this
}

/** Set aggregator for RDD's shuffle. */
def setAggregator(aggregator: Aggregator[K, V, C]): ShuffledRDD[K, V, C, P] = {
def setAggregator(aggregator: Aggregator[K, V, C]): ShuffledRDD[K, V, C] = {
this.aggregator = Option(aggregator)
this
}

/** Set mapSideCombine flag for RDD's shuffle. */
def setMapSideCombine(mapSideCombine: Boolean): ShuffledRDD[K, V, C, P] = {
def setMapSideCombine(mapSideCombine: Boolean): ShuffledRDD[K, V, C] = {
this.mapSideCombine = mapSideCombine
this
}
Expand All @@ -85,11 +86,11 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
}

override def compute(split: Partition, context: TaskContext): Iterator[P] = {
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[P]]
.asInstanceOf[Iterator[(K, C)]]
}

override def clearDependencies() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.shuffle._
* A ShuffleManager using hashing, that creates one output file per reduce partition on each
* mapper (possibly reusing these across waves of tasks).
*/
class HashShuffleManager(conf: SparkConf) extends ShuffleManager {
private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {
/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
override def registerShuffle[K, V, C](
shuffleId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.{InterruptibleIterator, TaskContext}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}

class HashShuffleReader[K, C](
private[spark] class HashShuffleReader[K, C](
handle: BaseShuffleHandle[K, _, C],
startPartition: Int,
endPartition: Int,
Expand All @@ -47,7 +47,8 @@ class HashShuffleReader[K, C](
} else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
} else {
iter
// Convert the Product2s to pairs since this is what downstream RDDs currently expect
iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
}

// Sort the output if there is a sort ordering defined.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus

class HashShuffleWriter[K, V](
private[spark] class HashShuffleWriter[K, V](
handle: BaseShuffleHandle[K, V, _],
mapId: Int,
context: TaskContext)
Expand All @@ -33,6 +33,10 @@ class HashShuffleWriter[K, V](
private val dep = handle.dependency
private val numOutputSplits = dep.partitioner.numPartitions
private val metrics = context.taskMetrics

// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
// we don't try deleting files, etc twice.
private var stopping = false

private val blockManager = SparkEnv.get.blockManager
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort

import java.io.{DataInputStream, FileInputStream}

import org.apache.spark.shuffle._
import org.apache.spark.{TaskContext, ShuffleDependency}
import org.apache.spark.shuffle.hash.HashShuffleReader
import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId}

private[spark] class SortShuffleManager extends ShuffleManager {
/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*/
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}

/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
// We currently use the same block store shuffle fetcher as the hash-based shuffle.
new HashShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}

/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
: ShuffleWriter[K, V] = {
new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
}

/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Unit = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shuffle output file in sortShuffleWritter do not get cleaned. We might need to add map to save registered shuffle handle, Then try to remove the data file in unregisterShuffle method. Though at present, in HashShuffleManager, this is also not implemented. But HashShuffleManager depends on shuffleBlockManager and the file will be cleaned there.

I have a PR to generalize shuffleBlockManager and hide it behind shuffleMananger. as in #1241 , and upon blockMananger do remove shuffle, will call into this unregisterShuffle method. Will rebase upon this PR been merged.

Should we fix this issue in my PR, or add the store/clean shuffleHandle logic here in this PR firstly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, but files on disk actually get cleaned by BlockManager directly if cleaning is turned on, and sort-based shuffle is set up to only use files. ShuffleBlockManager had extra state in memory in the form of metadata, that's why it needs its own cleaner. But I don't think there's an issue here.

It will be important to do something for unregisterShuffle, but right now nothing calls that. That's part of the API because I wanted to move the MapOutputTracker behind the ShuffleManager as well. But any patches to move more stuff behind it are welcome, and I agree the ShuffleBlockManager should be specific to hash-based shuffle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I miss some code? But from what I understanding, when the cleaning is turned on, it seems to me, if by timestamp approaching, blockManager won't remove shuffle data,ShuffleBlockManager will do the work by itself. And if by auto clean approaching when doCleanShuffle is called, it go through the current ShuffleBlockManager interface. In neither case, these sortShuffleWritter generated file will be cleaned? Or do you mean, as long as there are no metadata in memory, shuffle file on disk is ok to not be removed until application exit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, these files are never registered with the BlockManager so they won't be cleaned by it. I'll modify ShuffleBlockManager to clean them for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, fixed this in the latest commit.


/** Shut down this ShuffleManager. */
override def stop(): Unit = {}

/** Get the location of a block in a map output file. Uses the index file we create for it. */
def getBlockLocation(blockId: ShuffleBlockId, diskManager: DiskBlockManager): FileSegment = {
// The block is actually going to be a range of a single map output file for this map, so
// figure out the ID of the consolidated file, then the offset within that from our index
val consolidatedId = blockId.copy(reduceId = 0)
val indexFile = diskManager.getFile(consolidatedId.name + ".index")
val in = new DataInputStream(new FileInputStream(indexFile))
try {
in.skip(blockId.reduceId * 8)
val offset = in.readLong()
val nextOffset = in.readLong()
new FileSegment(diskManager.getFile(consolidatedId), offset, nextOffset - offset)
} finally {
in.close()
}
}
}
Loading