Skip to content

Commit bf5632e

Browse files
committed
Register OpenHashMapBasedStateMap for Kryo
1 parent cbaea95 commit bf5632e

File tree

2 files changed

+19
-7
lines changed

2 files changed

+19
-7
lines changed

streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import java.io.{ObjectInputStream, ObjectOutputStream}
2121

2222
import scala.reflect.ClassTag
2323

24+
import com.esotericsoftware.kryo.DefaultSerializer
25+
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
26+
2427
import org.apache.spark.SparkConf
2528
import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
2629
import org.apache.spark.util.collection.OpenHashMap
@@ -77,6 +80,7 @@ private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends StateMa
7780
}
7881

7982
/** Implementation of StateMap based on Spark's [[org.apache.spark.util.collection.OpenHashMap]] */
83+
@DefaultSerializer(classOf[KryoJavaSerializer])
8084
private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: ClassTag](
8185
@transient @volatile var parentStateMap: StateMap[K, S],
8286
initialCapacity: Int = DEFAULT_INITIAL_CAPACITY,

streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818
package org.apache.spark.streaming
1919

2020
import scala.collection.{immutable, mutable, Map}
21+
import scala.reflect.ClassTag
2122
import scala.util.Random
2223

23-
import org.apache.spark.SparkFunSuite
24+
import org.apache.spark.{SparkConf, SparkFunSuite}
25+
import org.apache.spark.serializer._
2426
import org.apache.spark.streaming.util.{EmptyStateMap, OpenHashMapBasedStateMap, StateMap}
25-
import org.apache.spark.util.Utils
2627

2728
class StateMapSuite extends SparkFunSuite {
2829

30+
private val conf = new SparkConf()
31+
2932
test("EmptyStateMap") {
3033
val map = new EmptyStateMap[Int, Int]
3134
intercept[scala.NotImplementedError] {
@@ -267,12 +270,17 @@ class StateMapSuite extends SparkFunSuite {
267270
assertMap(stateMap, refMap.toMap, time, "Final state map does not match reference map")
268271
}
269272

270-
private def testSerialization[MapType <: StateMap[Int, Int]](
273+
private def testSerialization[MapType <: StateMap[Int, Int] : ClassTag](
271274
map: MapType, msg: String): MapType = {
272-
val deserMap = Utils.deserialize[MapType](
273-
Utils.serialize(map), Thread.currentThread().getContextClassLoader)
274-
assertMap(deserMap, map, 1, msg)
275-
deserMap
275+
val deserMaps = Array(new JavaSerializer(conf), new KryoSerializer(conf)).map {
276+
(serializer: Serializer) =>
277+
val serializerInstance = serializer.newInstance()
278+
val deserMap = serializerInstance.deserialize(
279+
serializerInstance.serialize(map), Thread.currentThread().getContextClassLoader)
280+
assertMap(deserMap, map, 1, msg)
281+
deserMap
282+
}
283+
deserMaps.head
276284
}
277285

278286
// Assert whether all the data and operations on a state map matches that of a reference state map

0 commit comments

Comments
 (0)