diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala index df8ce9c054..5ade06de9c 100644 --- a/core/src/main/scala/spark/Aggregator.scala +++ b/core/src/main/scala/spark/Aggregator.scala @@ -4,6 +4,15 @@ import java.util.{HashMap => JHashMap} import scala.collection.JavaConversions._ +import serializer.{SerializerInstance, DeserializationStream, SerializationStream} + +import scala.collection.mutable.{LinkedList, ArrayBuffer} +import spark.Logging +import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} +import org.xerial.snappy.Snappy +import java.io.ByteArrayOutputStream +import java.io.ByteArrayInputStream + /** A set of functions used to aggregate data. * * @param createCombiner function to create the initial value of the aggregation. @@ -13,7 +22,8 @@ import scala.collection.JavaConversions._ case class Aggregator[K, V, C] ( val createCombiner: V => C, val mergeValue: (C, V) => C, - val mergeCombiners: (C, C) => C) { + val mergeCombiners: (C, C) => C) + extends Logging { def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = { val combiners = new JHashMap[K, C] @@ -40,5 +50,79 @@ case class Aggregator[K, V, C] ( } combiners.iterator } + + abstract class CompressionStreamsIterator[+A] extends Iterator[A] + + def combineValuesByKeyInCompression(iter: Iterator[(K, V)]) : Iterator[(K, C)] = { + val partitionCount = System.getProperty("spark.reduce.side.combine.compression.partition", "11").toInt + logDebug("combine compression enabled, partition count:"+partitionCount) + val indexArray = Array.fill(partitionCount){new JHashMap[K,LinkedList[Int]]()} + val byteArrayStreams = Array.fill(partitionCount){new ByteArrayOutputStream()} + val serializeStreams = Array.tabulate(partitionCount)(i => SparkEnv.get.serializer. + newInstance().serializeStream(new SnappyOutputStream(byteArrayStreams(i)))) + var data: Array[Object] = null + var counts = Array.fill(partitionCount){-1} + + for ((k, v) <- iter) { + var keyHash = k.hashCode % partitionCount + if (keyHash < 0) { + keyHash = keyHash + partitionCount + } + serializeStreams(keyHash).writeObject(v) + + val list = indexArray(keyHash).get(k) + counts(keyHash) = counts(keyHash) + 1 + + if (list == null) { + indexArray(keyHash).put(k, LinkedList(counts(keyHash))) + } else { + list.append(LinkedList(counts(keyHash))) + } + } + + serializeStreams.foreach(_.close()) + + for (i <- 0 to partitionCount - 1) + { + logDebug("Compression partition [" + i + "], size:" + + byteArrayStreams(i).size() + ", objects count:" + + counts(i)) + } + + new CompressionStreamsIterator[(K,C)] { + private var uncompressedData: ArrayBuffer[Any] = null + private var cur: Iterator[(K,LinkedList[Int])] = Iterator.empty + private var curIdx: Int = -1 + def hasNext: Boolean = + cur.hasNext || curIdx < partitionCount - 1 && { + curIdx = curIdx + 1 + if (counts(curIdx) == -1) { + hasNext + } else { + uncompressedData = new ArrayBuffer[Any](counts(curIdx) + 1) + SparkEnv.get.serializer.newInstance().deserializeStream(new SnappyInputStream( + new ByteArrayInputStream(byteArrayStreams(curIdx).toByteArray()))).asIterator. + foreach(uncompressedData += _) + cur = indexArray(curIdx).toIterator + hasNext + } + } + def next(): (K,C) = + if (hasNext) { + val (curKey, curValue) = cur.next() + indexArray(curIdx).put(curKey, null) + + val combiner = createCombiner(uncompressedData(curValue(0)).asInstanceOf[V]) + curValue.slice(1, curValue.length).foreach( + (i: Int) => { + mergeValue(combiner, uncompressedData(i).asInstanceOf[V]) + uncompressedData(i) = null + }) + (curKey, combiner) + } else { + Iterator.empty.next() + } + } + } } diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 7630fe7803..bffa5e7c46 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -77,7 +77,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( // A sanity check to make sure mergeCombiners is not defined. assert(mergeCombiners == null) val values = new ShuffledRDD[K, V](self, partitioner, serializerClass) - values.mapPartitions(aggregator.combineValuesByKey(_), true) + val reduceSideCompression = + System.getProperty("spark.reduce.side.combine.compression","false") == "true" + values.mapPartitions( + if (reduceSideCompression) aggregator.combineValuesByKeyInCompression(_) + else aggregator.combineValuesByKey(_), true) } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 07572201de..37bcf24c6e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -162,6 +162,7 @@ object SparkBuild extends Build { "cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty), "org.apache.mesos" % "mesos" % "0.9.0-incubating", "io.netty" % "netty-all" % "4.0.0.Beta2", + "org.xerial.snappy" % "snappy-java" % "1.0.5", "org.apache.derby" % "derby" % "10.4.2.0" % "test" ) ++ ( if (HADOOP_MAJOR_VERSION == "2") {