1717
1818package org .apache .spark .streaming .api .python
1919
20+ import java .io .{ObjectInputStream , ObjectOutputStream }
21+ import java .lang .reflect .Proxy
2022import java .util .{ArrayList => JArrayList , List => JList }
2123import scala .collection .JavaConversions ._
2224import scala .collection .JavaConverters ._
23- import scala .collection .mutable
2425
2526import org .apache .spark .api .java ._
2627import org .apache .spark .api .python ._
@@ -35,14 +36,14 @@ import org.apache.spark.streaming.api.java._
3536 * Interface for Python callback function with three arguments
3637 */
3738private [python] trait PythonRDDFunction {
38- // callback in Python
3939 def call (time : Long , rdds : JList [_]): JavaRDD [Array [Byte ]]
4040}
4141
4242/**
4343 * Wrapper for PythonRDDFunction
44+ * TODO: support checkpoint
4445 */
45- private [python] class RDDFunction (pfunc : PythonRDDFunction )
46+ private [python] class RDDFunction (@ transient var pfunc : PythonRDDFunction )
4647 extends function.Function2 [JList [JavaRDD [_]], Time , JavaRDD [Array [Byte ]]] with Serializable {
4748
4849 def apply (rdd : Option [RDD [_]], time : Time ): Option [RDD [Array [Byte ]]] = {
@@ -58,30 +59,62 @@ private[python] class RDDFunction(pfunc: PythonRDDFunction)
5859 def call (rdds : JList [JavaRDD [_]], time : Time ): JavaRDD [Array [Byte ]] = {
5960 pfunc.call(time.milliseconds, rdds)
6061 }
61- }
6262
63+ private def writeObject (out : ObjectOutputStream ): Unit = {
64+ assert(PythonDStream .serializer != null , " Serializer has not been registered!" )
65+ val bytes = PythonDStream .serializer.serialize(pfunc)
66+ out.writeInt(bytes.length)
67+ out.write(bytes)
68+ }
69+
70+ private def readObject (in : ObjectInputStream ): Unit = {
71+ assert(PythonDStream .serializer != null , " Serializer has not been registered!" )
72+ val length = in.readInt()
73+ val bytes = new Array [Byte ](length)
74+ in.readFully(bytes)
75+ pfunc = PythonDStream .serializer.deserialize(bytes)
76+ }
77+ }
6378
6479/**
65- * Base class for PythonDStream with some common methods
80+ * Inferface for Python Serializer to serialize PythonRDDFunction
6681 */
67- private [python]
68- abstract class PythonDStream (parent : DStream [_], pfunc : PythonRDDFunction )
69- extends DStream [Array [Byte ]] (parent.ssc) {
70-
71- val func = new RDDFunction (pfunc)
72-
73- override def dependencies = List (parent)
82+ private [python] trait PythonRDDFunctionSerializer {
83+ def dumps (id : String ): Array [Byte ] //
84+ def loads (bytes : Array [Byte ]): PythonRDDFunction
85+ }
7486
75- override def slideDuration : Duration = parent.slideDuration
87+ /**
88+ * Wrapper for PythonRDDFunctionSerializer
89+ */
90+ private [python] class RDDFunctionSerializer (pser : PythonRDDFunctionSerializer ) {
91+ def serialize (func : PythonRDDFunction ): Array [Byte ] = {
92+ // get the id of PythonRDDFunction in py4j
93+ val h = Proxy .getInvocationHandler(func.asInstanceOf [Proxy ])
94+ val f = h.getClass().getDeclaredField(" id" );
95+ f.setAccessible(true );
96+ val id = f.get(h).asInstanceOf [String ];
97+ pser.dumps(id)
98+ }
7699
77- val asJavaDStream = JavaDStream .fromDStream(this )
100+ def deserialize (bytes : Array [Byte ]): PythonRDDFunction = {
101+ pser.loads(bytes)
102+ }
78103}
79104
80105/**
81106 * Helper functions
82107 */
83108private [python] object PythonDStream {
84109
110+ // A serializer in Python, used to serialize PythonRDDFunction
111+ var serializer : RDDFunctionSerializer = _
112+
113+ // Register a serializer from Python, should be called during initialization
114+ def registerSerializer (ser : PythonRDDFunctionSerializer ) = {
115+ serializer = new RDDFunctionSerializer (ser)
116+ }
117+
85118 // convert Option[RDD[_]] to JavaRDD, handle null gracefully
86119 def wrapRDD (rdd : Option [RDD [_]]): JavaRDD [_] = {
87120 if (rdd.isDefined) {
@@ -123,14 +156,30 @@ private[python] object PythonDStream {
123156 }
124157}
125158
159+ /**
160+ * Base class for PythonDStream with some common methods
161+ */
162+ private [python]
163+ abstract class PythonDStream (parent : DStream [_], @ transient pfunc : PythonRDDFunction )
164+ extends DStream [Array [Byte ]] (parent.ssc) {
165+
166+ val func = new RDDFunction (pfunc)
167+
168+ override def dependencies = List (parent)
169+
170+ override def slideDuration : Duration = parent.slideDuration
171+
172+ val asJavaDStream = JavaDStream .fromDStream(this )
173+ }
174+
126175/**
127176 * Transformed DStream in Python.
128177 *
129178 * If `reuse` is true and the result of the `func` is an PythonRDD, then it will cache it
130179 * as an template for future use, this can reduce the Python callbacks.
131180 */
132181private [python]
133- class PythonTransformedDStream (parent : DStream [_], pfunc : PythonRDDFunction ,
182+ class PythonTransformedDStream (parent : DStream [_], @ transient pfunc : PythonRDDFunction ,
134183 var reuse : Boolean = false )
135184 extends PythonDStream (parent, pfunc) {
136185
@@ -170,7 +219,7 @@ class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction,
170219 */
171220private [python]
172221class PythonTransformed2DStream (parent : DStream [_], parent2 : DStream [_],
173- pfunc : PythonRDDFunction )
222+ @ transient pfunc : PythonRDDFunction )
174223 extends DStream [Array [Byte ]] (parent.ssc) {
175224
176225 val func = new RDDFunction (pfunc)
@@ -190,7 +239,7 @@ class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_],
190239 * similar to StateDStream
191240 */
192241private [python]
193- class PythonStateDStream (parent : DStream [Array [Byte ]], reduceFunc : PythonRDDFunction )
242+ class PythonStateDStream (parent : DStream [Array [Byte ]], @ transient reduceFunc : PythonRDDFunction )
194243 extends PythonDStream (parent, reduceFunc) {
195244
196245 super .persist(StorageLevel .MEMORY_ONLY )
@@ -212,8 +261,8 @@ class PythonStateDStream(parent: DStream[Array[Byte]], reduceFunc: PythonRDDFunc
212261 */
213262private [python]
214263class PythonReducedWindowedDStream (parent : DStream [Array [Byte ]],
215- preduceFunc : PythonRDDFunction ,
216- pinvReduceFunc : PythonRDDFunction ,
264+ @ transient preduceFunc : PythonRDDFunction ,
265+ @ transient pinvReduceFunc : PythonRDDFunction ,
217266 _windowDuration : Duration ,
218267 _slideDuration : Duration
219268 ) extends PythonStateDStream (parent, preduceFunc) {
0 commit comments