File tree Expand file tree Collapse file tree 4 files changed +41
-11
lines changed
examples/src/main/python/streaming
streaming/src/main/scala/org/apache/spark/streaming/api/python Expand file tree Collapse file tree 4 files changed +41
-11
lines changed Original file line number Diff line number Diff line change @@ -118,11 +118,9 @@ def add_shuffle_key(split, iterator):
118118 keyed = PipelinedDStream (self , add_shuffle_key )
119119 keyed ._bypass_serializer = True
120120 with _JavaStackTrace (self .ctx ) as st :
121- #JavaDStream
122- pairDStream = self .ctx ._jvm .PairwiseDStream (keyed ._jdstream .dstream ()).asJavaPairDStream ()
123121 partitioner = self .ctx ._jvm .PythonPartitioner (numPartitions ,
124- id (partitionFunc ))
125- jdstream = pairDStream . partitionBy ( partitioner ).values ()
122+ id (partitionFunc ))
123+ jdstream = self . ctx . _jvm . PairwiseDStream ( keyed . _jdstream . dstream (), partitioner ).asJavaDStream ()
126124 dstream = DStream (jdstream , self ._ssc , BatchedSerializer (outputSerializer ))
127125 # This is required so that id(partitionFunc) remains unique, even if
128126 # partitionFunc is a lambda:
Original file line number Diff line number Diff line change @@ -59,3 +59,30 @@ class PythonDStream[T: ClassTag](
5959 val asJavaDStream = JavaDStream .fromDStream(this )
6060}
6161
62+
63+ private class PairwiseDStream (prev: DStream [Array [Byte ]], partitioner : Partitioner ) extends
64+ DStream [Array [Byte ]](prev.ssc){
65+ override def dependencies = List (prev)
66+
67+ override def slideDuration : Duration = prev.slideDuration
68+
69+ override def compute (validTime: Time ): Option [RDD [Array [Byte ]]]= {
70+ prev.getOrCompute(validTime) match {
71+ case Some (rdd)=> Some (rdd)
72+ val pairwiseRDD = new PairwiseRDD (rdd)
73+ /*
74+ * This is equivalent to following python code
75+ * with _JavaStackTrace(self.context) as st:
76+ * pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
77+ * partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
78+ * id(partitionFunc))
79+ * jrdd = pairRDD.partitionBy(partitioner).values()
80+ * rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
81+ */
82+ Some (pairwiseRDD.asJavaPairRDD.partitionBy(partitioner).values().rdd)
83+ case None => None
84+ }
85+ }
86+ val asJavaDStream = JavaDStream .fromDStream(this )
87+ // val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
88+ }
Original file line number Diff line number Diff line change 1+ /*
2+
13package org.apache.spark.streaming.api.python
24
35import org.apache.spark.Accumulator
@@ -10,11 +12,8 @@ import org.apache.spark.streaming.dstream.DStream
1012
1113import scala.reflect.ClassTag
1214
13- /**
14- * Created by ken on 7/15/14.
15- */
1615class PythonTransformedDStream[T: ClassTag](
17- parents : Seq [ DStream [T ] ],
16+ parent: DStream[T],
1817 command: Array[Byte],
1918 envVars: JMap[String, String],
2019 pythonIncludes: JList[String],
@@ -30,8 +29,14 @@ class PythonTransformedDStream[T: ClassTag](
3029
3130 //pythonDStream compute
3231 override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
33- val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
34- Some ()
32+
33+ // val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
34+ // parents.map(_.getOrCompute(validTime).orNull).to
35+ // parent = parents.head.asInstanceOf[RDD]
36+ // Some()
3537 }
36- val asJavaDStream = JavaDStream .fromDStream(this )
38+
39+ val asJavaDStream = JavaDStream.fromDStream(this)
3740}
41+
42+ */
You can’t perform that action at this time.
0 commit comments