Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer

/**
* Base class for dependencies.
Expand All @@ -43,12 +44,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
* Represents a dependency on the output of a shuffle stage.
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializerClass class name of the serializer to use
* @param serializer [[Serializer]] to use. If set to null, the default serializer, as specified
* by `spark.serializer` config option, will be used.
*/
class ShuffleDependency[K, V](
@transient rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializerClass: String = null)
val serializer: Serializer = null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know roughly how large the serialized serializer ends up being?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared with passing string, this adds about 58B to the task closure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it 'cos the SparkConf is getting shipped around ?
If yes, using the broadcast variable should alleviate it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it wasn't. SparkConf was not serializable.

On Saturday, March 15, 2014, Mridul Muralidharan [email protected]
wrote:

In core/src/main/scala/org/apache/spark/Dependency.scala:

*/
class ShuffleDependency[K, V](
@transient rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,

  • val serializerClass: String = null)
  • val serializer: Serializer = null)

Is it 'cos the SparkConf is getting shipped around ?
If yes, using the broadcast variable should alleviate it ?


Reply to this email directly or view it on GitHubhttps://github.com//pull/149/files#r10636424
.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. wondering what is adding this overhead then.
For JavaSerializer, it is just a single int (and some header for the class).
Is this 58k for kyro ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only 58 bytes, not 58KB, which is less than 5% increase for serializing ShuffleDependency, which is only part of a RDD task closure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin seems fine to me... thanks for digging into it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I am wondering why I read it as 58KB and not 58B ... apologies !

extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {

val shuffleId: Int = rdd.context.newShuffleId()
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ShuffleFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[spark] abstract class ShuffleFetcher {
shuffleId: Int,
reduceId: Int,
context: TaskContext,
serializer: Serializer = SparkEnv.get.serializerManager.default): Iterator[T]
serializer: Serializer = SparkEnv.get.serializer): Iterator[T]

/** Stop the fetcher */
def stop() {}
Expand Down
14 changes: 5 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.storage.{BlockManager, BlockManagerMaster, BlockManagerMasterActor}
import org.apache.spark.network.ConnectionManager
import org.apache.spark.serializer.{Serializer, SerializerManager}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.{AkkaUtils, Utils}

/**
Expand All @@ -41,7 +41,6 @@ import org.apache.spark.util.{AkkaUtils, Utils}
class SparkEnv private[spark] (
val executorId: String,
val actorSystem: ActorSystem,
val serializerManager: SerializerManager,
val serializer: Serializer,
val closureSerializer: Serializer,
val cacheManager: CacheManager,
Expand Down Expand Up @@ -141,14 +140,12 @@ object SparkEnv extends Logging {
val name = conf.get(propertyName, defaultClassName)
Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
}
val serializerManager = new SerializerManager

val serializer = serializerManager.setDefault(
conf.get("spark.serializer", "org.apache.spark.serializer.JavaSerializer"), conf)
val serializer = instantiateClass[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")

val closureSerializer = serializerManager.get(
conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
conf)
val closureSerializer = instantiateClass[Serializer](
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
Expand Down Expand Up @@ -220,7 +217,6 @@ object SparkEnv extends Logging {
new SparkEnv(
executorId,
actorSystem,
serializerManager,
serializer,
closureSerializer,
cacheManager,
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
import org.apache.spark.serializer.Serializer

private[spark] sealed trait CoGroupSplitDep extends Serializable

Expand Down Expand Up @@ -66,10 +67,10 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
private type CoGroupValue = (Any, Int) // Int is dependency number
private type CoGroupCombiner = Seq[CoGroup]

private var serializerClass: String = null
private var serializer: Serializer = null

def setSerializer(cls: String): CoGroupedRDD[K] = {
serializerClass = cls
def setSerializer(serializer: Serializer): CoGroupedRDD[K] = {
this.serializer = serializer
this
}

Expand All @@ -80,7 +81,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[Any, Any](rdd, part, serializerClass)
new ShuffleDependency[Any, Any](rdd, part, serializer)
}
}
}
Expand Down Expand Up @@ -113,18 +114,17 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
// Read them from the parent
val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]]
rddIterators += ((it, depNum))
}
case ShuffleCoGroupSplitDep(shuffleId) => {

case ShuffleCoGroupSplitDep(shuffleId) =>
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
val ser = SparkEnv.get.serializerManager.get(serializerClass, sparkConf)
val ser = Serializer.getSerializer(serializer)
val it = fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser)
rddIterators += ((it, depNum))
}
}

if (!externalSorting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.SerializableHyperLogLog

/**
Expand Down Expand Up @@ -73,7 +74,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializerClass: String = null): RDD[(K, C)] = {
serializer: Serializer = null): RDD[(K, C)] = {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (getKeyClass().isArray) {
if (mapSideCombine) {
Expand All @@ -93,13 +94,13 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializerClass)
.setSerializer(serializer)
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializer)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag

import org.apache.spark.{Dependency, Partition, Partitioner, ShuffleDependency, SparkEnv, TaskContext}
import org.apache.spark.serializer.Serializer

private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
override val index = idx
Expand All @@ -38,15 +39,15 @@ class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag](
part: Partitioner)
extends RDD[P](prev.context, Nil) {

private var serializerClass: String = null
private var serializer: Serializer = null

def setSerializer(cls: String): ShuffledRDD[K, V, P] = {
serializerClass = cls
def setSerializer(serializer: Serializer): ShuffledRDD[K, V, P] = {
this.serializer = serializer
this
}

override def getDependencies: Seq[Dependency[_]] = {
List(new ShuffleDependency(prev, part, serializerClass))
List(new ShuffleDependency(prev, part, serializer))
}

override val partitioner = Some(part)
Expand All @@ -57,8 +58,8 @@ class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag](

override def compute(split: Partition, context: TaskContext): Iterator[P] = {
val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context,
SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf))
val ser = Serializer.getSerializer(serializer)
SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser)
}

override def clearDependencies() {
Expand Down
20 changes: 10 additions & 10 deletions core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.Partitioner
import org.apache.spark.ShuffleDependency
import org.apache.spark.SparkEnv
import org.apache.spark.TaskContext
import org.apache.spark.serializer.Serializer

/**
* An optimized version of cogroup for set difference/subtraction.
Expand All @@ -53,10 +54,10 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
part: Partitioner)
extends RDD[(K, V)](rdd1.context, Nil) {

private var serializerClass: String = null
private var serializer: Serializer = null

def setSerializer(cls: String): SubtractedRDD[K, V, W] = {
serializerClass = cls
def setSerializer(serializer: Serializer): SubtractedRDD[K, V, W] = {
this.serializer = serializer
this
}

Expand All @@ -67,7 +68,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency(rdd, part, serializerClass)
new ShuffleDependency(rdd, part, serializer)
}
}
}
Expand All @@ -92,7 +93,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](

override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
val partition = p.asInstanceOf[CoGroupPartition]
val serializer = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf)
val ser = Serializer.getSerializer(serializer)
val map = new JHashMap[K, ArrayBuffer[V]]
def getSeq(k: K): ArrayBuffer[V] = {
val seq = map.get(k)
Expand All @@ -105,14 +106,13 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
}
}
def integrate(dep: CoGroupSplitDep, op: Product2[K, V] => Unit) = dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
}
case ShuffleCoGroupSplitDep(shuffleId) => {

case ShuffleCoGroupSplitDep(shuffleId) =>
val iter = SparkEnv.get.shuffleFetcher.fetch[Product2[K, V]](shuffleId, partition.index,
context, serializer)
context, ser)
iter.foreach(op)
}
}
// the first dep is rdd1; add all values to the map
integrate(partition.deps(0), t => getSeq(t._1) += t._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark._
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDDCheckpointData
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage._
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}

Expand Down Expand Up @@ -153,7 +154,7 @@ private[spark] class ShuffleMapTask(

try {
// Obtain all the block writers for shuffle blocks.
val ser = SparkEnv.get.serializerManager.get(dep.serializerClass, SparkEnv.get.conf)
val ser = Serializer.getSerializer(dep.serializer)
shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)

// Write the map output to its associated buckets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,6 @@ private[spark] class JavaSerializerInstance(conf: SparkConf) extends SerializerI
/**
* A Spark serializer that uses Java's built-in serialization.
*/
class JavaSerializer(conf: SparkConf) extends Serializer {
class JavaSerializer(conf: SparkConf) extends Serializer with Serializable {
def newInstance(): SerializerInstance = new JavaSerializerInstance(conf)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
*/
class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging {
private val bufferSize = {
conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
}
class KryoSerializer(conf: SparkConf)
extends org.apache.spark.serializer.Serializer
with Logging
with Serializable {

private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrator = conf.getOption("spark.kryo.registrator")

def newKryoOutput() = new KryoOutput(bufferSize)

Expand All @@ -48,7 +52,7 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial

// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
// Do this before we invoke the user registrator so the user registrator can override this.
kryo.setReferences(conf.getBoolean("spark.kryo.referenceTracking", true))
kryo.setReferences(referenceTracking)

for (cls <- KryoSerializer.toRegister) kryo.register(cls)

Expand All @@ -58,7 +62,7 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial

// Allow the user to register their own classes by setting spark.kryo.registrator
try {
for (regCls <- conf.getOption("spark.kryo.registrator")) {
for (regCls <- registrator) {
logDebug("Running user registrator: " + regCls)
val reg = Class.forName(regCls, true, classLoader).newInstance()
.asInstanceOf[KryoRegistrator]
Expand Down
16 changes: 13 additions & 3 deletions core/src/main/scala/org/apache/spark/serializer/Serializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,31 @@ import java.nio.ByteBuffer
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream

import org.apache.spark.util.{ByteBufferInputStream, NextIterator}
import org.apache.spark.SparkEnv

/**
* A serializer. Because some serialization libraries are not thread safe, this class is used to
* create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual
* serialization and are guaranteed to only be called from one thread at a time.
*
* Implementations of this trait should have a zero-arg constructor or a constructor that accepts a
* [[org.apache.spark.SparkConf]] as parameter. If both constructors are defined, the latter takes
* precedence.
* Implementations of this trait should implement:
* 1. a zero-arg constructor or a constructor that accepts a [[org.apache.spark.SparkConf]]
* as parameter. If both constructors are defined, the latter takes precedence.
*
* 2. Java serialization interface.
*/
trait Serializer {
def newInstance(): SerializerInstance
}


object Serializer {
def getSerializer(serializer: Serializer): Serializer = {
if (serializer == null) SparkEnv.get.serializer else serializer
}
}


/**
* An instance of a serializer, for use by one thread at a time.
*/
Expand Down
Loading