Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,41 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)

test("configuration limits") {
val conf1 = conf.clone()
test("SPARK-7392 configuration limits") {
val kryoBufferProperty = "spark.kryoserializer.buffer"
val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"
conf1.set(kryoBufferProperty, "64k")
conf1.set(kryoBufferMaxProperty, "64m")
new KryoSerializer(conf1).newInstance()

def newKryoInstance(
conf: SparkConf,
bufferSize: String = "64k",
maxBufferSize: String = "64m"): SerializerInstance = {
val kryoConf = conf.clone()
kryoConf.set(kryoBufferProperty, bufferSize)
kryoConf.set(kryoBufferMaxProperty, maxBufferSize)
new KryoSerializer(kryoConf).newInstance()
}

// test default values
newKryoInstance(conf, "64k", "64m")
// 2048m = 2097152k
conf1.set(kryoBufferProperty, "2097151k")
conf1.set(kryoBufferMaxProperty, "64m")
// should not throw exception when kryoBufferMaxProperty < kryoBufferProperty
new KryoSerializer(conf1).newInstance()
conf1.set(kryoBufferMaxProperty, "2097151k")
new KryoSerializer(conf1).newInstance()
val conf2 = conf.clone()
conf2.set(kryoBufferProperty, "2048m")
val thrown1 = intercept[IllegalArgumentException](new KryoSerializer(conf2).newInstance())
newKryoInstance(conf, "2097151k", "64m")
// test maximum size with unit of KiB
newKryoInstance(conf, "2097151k", "2097151k")
// should throw exception with bufferSize out of bound
val thrown1 = intercept[IllegalArgumentException](newKryoInstance(conf, "2048m"))
assert(thrown1.getMessage.contains(kryoBufferProperty))
val conf3 = conf.clone()
conf3.set(kryoBufferMaxProperty, "2048m")
val thrown2 = intercept[IllegalArgumentException](new KryoSerializer(conf3).newInstance())
// should throw exception with maxBufferSize out of bound
val thrown2 = intercept[IllegalArgumentException](
newKryoInstance(conf, maxBufferSize = "2048m"))
assert(thrown2.getMessage.contains(kryoBufferMaxProperty))
val conf4 = conf.clone()
conf4.set(kryoBufferProperty, "2g")
conf4.set(kryoBufferMaxProperty, "3g")
val thrown3 = intercept[IllegalArgumentException](new KryoSerializer(conf4).newInstance())
// should throw exception when both bufferSize and maxBufferSize out of bound
// exception should only contain "spark.kryoserializer.buffer"
val thrown3 = intercept[IllegalArgumentException](newKryoInstance(conf, "2g", "3g"))
assert(thrown3.getMessage.contains(kryoBufferProperty))
assert(!thrown3.getMessage.contains(kryoBufferMaxProperty))
val conf5 = conf.clone()
conf5.set(kryoBufferProperty, "8m")
conf5.set(kryoBufferMaxProperty, "9m")
new KryoSerializer(conf5).newInstance()
// test configuration with mb is supported properly
newKryoInstance(conf, "8m", "9m")
}

test("basic types") {
Expand Down