@@ -34,38 +34,41 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
3434 conf.set(" spark.serializer" , " org.apache.spark.serializer.KryoSerializer" )
3535 conf.set(" spark.kryo.registrator" , classOf [MyRegistrator ].getName)
3636
37- test(" configuration limits" ) {
38- val conf1 = conf.clone()
37+ test(" SPARK-7392 configuration limits" ) {
3938 val kryoBufferProperty = " spark.kryoserializer.buffer"
4039 val kryoBufferMaxProperty = " spark.kryoserializer.buffer.max"
41- conf1.set(kryoBufferProperty, " 64k" )
42- conf1.set(kryoBufferMaxProperty, " 64m" )
43- new KryoSerializer (conf1).newInstance()
40+
41+ def newKryoInstance (
42+ conf : SparkConf ,
43+ bufferSize : String = " 64k" ,
44+ maxBufferSize : String = " 64m" ): SerializerInstance = {
45+ val kryoConf = conf.clone()
46+ kryoConf.set(kryoBufferProperty, bufferSize)
47+ kryoConf.set(kryoBufferMaxProperty, maxBufferSize)
48+ new KryoSerializer (kryoConf).newInstance()
49+ }
50+
51+ // test default values
52+ newKryoInstance(conf, " 64k" , " 64m" )
4453 // 2048m = 2097152k
45- conf1.set(kryoBufferProperty, " 2097151k" )
46- conf1.set(kryoBufferMaxProperty, " 64m" )
4754 // should not throw exception when kryoBufferMaxProperty < kryoBufferProperty
48- new KryoSerializer (conf1).newInstance()
49- conf1.set(kryoBufferMaxProperty, " 2097151k" )
50- new KryoSerializer (conf1).newInstance()
51- val conf2 = conf.clone()
52- conf2.set(kryoBufferProperty, " 2048m" )
53- val thrown1 = intercept[IllegalArgumentException ](new KryoSerializer (conf2).newInstance())
55+ newKryoInstance(conf, " 2097151k" , " 64m" )
56+ // test maximum size with unit of KiB
57+ newKryoInstance(conf, " 2097151k" , " 2097151k" )
58+ // should throw exception with bufferSize out of bound
59+ val thrown1 = intercept[IllegalArgumentException ](newKryoInstance(conf, " 2048m" ))
5460 assert(thrown1.getMessage.contains(kryoBufferProperty))
55- val conf3 = conf.clone()
56- conf3.set(kryoBufferMaxProperty, " 2048m " )
57- val thrown2 = intercept[ IllegalArgumentException ]( new KryoSerializer (conf3).newInstance( ))
61+ // should throw exception with maxBufferSize out of bound
62+ val thrown2 = intercept[ IllegalArgumentException ](
63+ newKryoInstance(conf, maxBufferSize = " 2048m " ))
5864 assert(thrown2.getMessage.contains(kryoBufferMaxProperty))
59- val conf4 = conf.clone()
60- conf4.set(kryoBufferProperty, " 2g" )
61- conf4.set(kryoBufferMaxProperty, " 3g" )
62- val thrown3 = intercept[IllegalArgumentException ](new KryoSerializer (conf4).newInstance())
65+ // should throw exception when both bufferSize and maxBufferSize out of bound
66+ // exception should only contain "spark.kryoserializer.buffer"
67+ val thrown3 = intercept[IllegalArgumentException ](newKryoInstance(conf, " 2g" , " 3g" ))
6368 assert(thrown3.getMessage.contains(kryoBufferProperty))
6469 assert(! thrown3.getMessage.contains(kryoBufferMaxProperty))
65- val conf5 = conf.clone()
66- conf5.set(kryoBufferProperty, " 8m" )
67- conf5.set(kryoBufferMaxProperty, " 9m" )
68- new KryoSerializer (conf5).newInstance()
70+ // test configuration with mb is supported properly
71+ newKryoInstance(conf, " 8m" , " 9m" )
6972 }
7073
7174 test(" basic types" ) {
0 commit comments