Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.collection.mutable.{HashMap, LinkedHashSet}
import org.apache.spark.serializer.KryoSerializer

/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
Expand Down Expand Up @@ -140,6 +141,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}

/**
* Use Kryo serialization and register the given set of classes with Kryo.
* If called multiple times, this will append the classes from all calls together.
*/
def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
val allClassNames = new LinkedHashSet[String]()
allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty)
allClassNames ++= classes.map(_.getName)

set("spark.kryo.classesToRegister", allClassNames.mkString(","))
set("spark.serializer", classOf[KryoSerializer].getName)
this
}

/** Remove a parameter from the configuration */
def remove(key: String): SparkConf = {
settings.remove(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,18 @@ class KryoSerializer(conf: SparkConf)
private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
private val registrator = conf.getOption("spark.kryo.registrator")
private val userRegistrator = conf.getOption("spark.kryo.registrator")
private val classesToRegister = conf.get("spark.kryo.classesToRegister", "")
.split(',')
.filter(!_.isEmpty)
.map { className =>
try {
Class.forName(className)
} catch {
case e: Exception =>
throw new SparkException("Failed to load class to register with Kryo", e)
}
}

def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))

Expand All @@ -80,22 +91,20 @@ class KryoSerializer(conf: SparkConf)
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())

// Allow the user to register their own classes by setting spark.kryo.registrator
for (regCls <- registrator) {
logDebug("Running user registrator: " + regCls)
try {
val reg = Class.forName(regCls, true, classLoader).newInstance()
.asInstanceOf[KryoRegistrator]

// Use the default classloader when calling the user registrator.
Thread.currentThread.setContextClassLoader(classLoader)
reg.registerClasses(kryo)
} catch {
case e: Exception =>
throw new SparkException(s"Failed to invoke $regCls", e)
} finally {
Thread.currentThread.setContextClassLoader(oldClassLoader)
}
try {
// Use the default classloader when calling the user registrator.
Thread.currentThread.setContextClassLoader(classLoader)
// Register classes given through spark.kryo.classesToRegister.
classesToRegister.foreach { clazz => kryo.register(clazz) }
// Allow the user to register their own classes by setting spark.kryo.registrator.
userRegistrator
.map(Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator])
.foreach { reg => reg.registerClasses(kryo) }
} catch {
case e: Exception =>
throw new SparkException(s"Failed to register classes with Kryo", e)
} finally {
Thread.currentThread.setContextClassLoader(oldClassLoader)
}

// Register Chill's classes; we do this after our ranges and the user's own classes to let
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -1333,4 +1333,16 @@ public Optional<Integer> call(Integer i) {
}
}

static class Class1 {}
static class Class2 {}

@Test
public void testRegisterKryoClasses() {
SparkConf conf = new SparkConf();
conf.registerKryoClasses(new Class[]{ Class1.class, Class2.class });
Assert.assertEquals(
Class1.class.getName() + "," + Class2.class.getName(),
conf.get("spark.kryo.classesToRegister"));
}

}
62 changes: 62 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark

import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import com.esotericsoftware.kryo.Kryo

class SparkConfSuite extends FunSuite with LocalSparkContext {
test("loading from system properties") {
Expand Down Expand Up @@ -133,4 +135,64 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {
System.clearProperty("spark.test.a.b.c")
}
}

test("register kryo classes through registerKryoClasses") {
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")

conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2]))
assert(conf.get("spark.kryo.classesToRegister") ===
classOf[Class1].getName + "," + classOf[Class2].getName)

conf.registerKryoClasses(Array(classOf[Class3]))
assert(conf.get("spark.kryo.classesToRegister") ===
classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)

conf.registerKryoClasses(Array(classOf[Class2]))
assert(conf.get("spark.kryo.classesToRegister") ===
classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)

// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
// blow up.
val serializer = new KryoSerializer(conf)
serializer.newInstance().serialize(new Class1())
serializer.newInstance().serialize(new Class2())
serializer.newInstance().serialize(new Class3())
}

test("register kryo classes through registerKryoClasses and custom registrator") {
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")

conf.registerKryoClasses(Array(classOf[Class1]))
assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName)

conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName)

// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
// blow up.
val serializer = new KryoSerializer(conf)
serializer.newInstance().serialize(new Class1())
serializer.newInstance().serialize(new Class2())
}

test("register kryo classes through conf") {
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer")
conf.set("spark.serializer", classOf[KryoSerializer].getName)

// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
// blow up.
val serializer = new KryoSerializer(conf)
serializer.newInstance().serialize(new StringBuffer())
}

}

class Class1 {}
class Class2 {}
class Class3 {}

class CustomRegistrator extends KryoRegistrator {
def registerClasses(kryo: Kryo) {
kryo.register(classOf[Class2])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}

test("kryo with nonexistent custom registrator should fail") {
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkException

val conf = new SparkConf(false)
conf.set("spark.kryo.registrator", "this.class.does.not.exist")

val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance())
assert(thrown.getMessage.contains("Failed to invoke this.class.does.not.exist"))
assert(thrown.getMessage.contains("Failed to register classes with Kryo"))
}

test("default class loader can be set by a different thread") {
Expand Down
15 changes: 13 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,23 @@ of the most common options to set are:
<code>org.apache.spark.Serializer</code></a>.
</td>
</tr>
<tr>
<td><code>spark.kryo.classesToRegister</code></td>
<td>(none)</td>
<td>
If you use Kryo serialization, give a comma-separated list of custom class names to register
with Kryo.
See the <a href="tuning.html#data-serialization">tuning guide</a> for more details.
</td>
</tr>
<tr>
<td><code>spark.kryo.registrator</code></td>
<td>(none)</td>
<td>
If you use Kryo serialization, set this class to register your custom classes with Kryo.
It should be set to a class that extends
If you use Kryo serialization, set this class to register your custom classes with Kryo. This
property is useful if you need to register your classes in a custom way, e.g. to specify a custom
field serializer. Otherwise <code>spark.kryo.classesToRegister</code> is simpler. It should be
set to a class that extends
<a href="api/scala/index.html#org.apache.spark.serializer.KryoRegistrator">
<code>KryoRegistrator</code></a>.
See the <a href="tuning.html#data-serialization">tuning guide</a> for more details.
Expand Down
17 changes: 2 additions & 15 deletions docs/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,11 @@ registration requirement, but we recommend trying it in any network-intensive ap
Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered
in the AllScalaRegistrar from the [Twitter chill](https://github.com/twitter/chill) library.

To register your own custom classes with Kryo, create a public class that extends
[`org.apache.spark.serializer.KryoRegistrator`](api/scala/index.html#org.apache.spark.serializer.KryoRegistrator) and set the
`spark.kryo.registrator` config property to point to it, as follows:
To register your own custom classes with Kryo, use the `registerKryoClasses` method.

{% highlight scala %}
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[MyClass1])
kryo.register(classOf[MyClass2])
}
}

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "mypackage.MyRegistrator")
conf.registerKryoClasses(Seq(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
{% endhighlight %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,7 @@
package org.apache.spark.examples.bagel

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.serializer.KryoRegistrator

import org.apache.spark.bagel._
import org.apache.spark.bagel.Bagel._

import scala.collection.mutable.ArrayBuffer

import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}

import com.esotericsoftware.kryo._

class PageRankUtils extends Serializable {
def computeWithCombiner(numVertices: Long, epsilon: Double)(
Expand Down Expand Up @@ -99,13 +89,6 @@ class PRMessage() extends Message[String] with Serializable {
}
}

class PRKryoRegistrator extends KryoRegistrator {
def registerClasses(kryo: Kryo) {
kryo.register(classOf[PRVertex])
kryo.register(classOf[PRMessage])
}
}

class CustomPartitioner(partitions: Int) extends Partitioner {
def numPartitions = partitions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ object WikipediaPageRank {
}
val sparkConf = new SparkConf()
sparkConf.setAppName("WikipediaPageRank")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.kryo.registrator", classOf[PRKryoRegistrator].getName)
sparkConf.registerKryoClasses(Array(classOf[PRVertex], classOf[PRMessage]))

val inputFile = args(0)
val threshold = args(1).toDouble
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@ object Analytics extends Logging {
}
val options = mutable.Map(optionsList: _*)

val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
.set("spark.locality.wait", "100000")
val conf = new SparkConf().set("spark.locality.wait", "100000")
GraphXUtils.registerKryoClasses(conf)

val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
println("Set the number of edge partitions using --numEPart.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.examples.graphx

import org.apache.spark.SparkContext._
import org.apache.spark.graphx.PartitionStrategy
import org.apache.spark.graphx.{GraphXUtils, PartitionStrategy}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.graphx.util.GraphGenerators
import java.io.{PrintWriter, FileOutputStream}
Expand Down Expand Up @@ -80,8 +80,7 @@ object SynthBenchmark {

val conf = new SparkConf()
.setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
GraphXUtils.registerKryoClasses(conf)

val sc = new SparkContext(conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ package org.apache.spark.examples.mllib

import scala.collection.mutable

import com.esotericsoftware.kryo.Kryo
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator}

/**
* An example app for ALS on MovieLens data (http://grouplens.org/datasets/movielens/).
Expand All @@ -40,13 +38,6 @@ import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator}
*/
object MovieLensALS {

class ALSRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Rating])
kryo.register(classOf[mutable.BitSet])
}
}

case class Params(
input: String = null,
kryo: Boolean = false,
Expand Down Expand Up @@ -108,8 +99,7 @@ object MovieLensALS {
def run(params: Params) {
val conf = new SparkConf().setAppName(s"MovieLensALS with $params")
if (params.kryo) {
conf.set("spark.serializer", classOf[KryoSerializer].getName)
.set("spark.kryo.registrator", classOf[ALSRegistrator].getName)
conf.registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating]))
.set("spark.kryoserializer.buffer.mb", "8")
}
val sc = new SparkContext(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import org.apache.spark.graphx.impl._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
import org.apache.spark.util.collection.OpenHashSet


/**
* Registers GraphX classes with Kryo for improved performance.
*/
@deprecated("Register GraphX classes with Kryo using GraphXUtils.registerKryoClasses", "1.2.0")
class GraphKryoRegistrator extends KryoRegistrator {

def registerClasses(kryo: Kryo) {
Expand Down
Loading