@@ -20,21 +20,14 @@ def __init__(self, jdstream, ssc, jrdd_deserializer):
2020 self .ctx = ssc ._sc
2121 self ._jrdd_deserializer = jrdd_deserializer
2222
23- def generatedRDDs (self ):
24- """
25- // RDDs generated, marked as private[streaming] so that testsuites can access it
26- @transient
27- """
28- pass
29-
3023 def count (self ):
3124 """
3225
3326 """
3427 #TODO make sure count implementation, thiis different from what pyspark does
35- return self .mapPartitions (lambda i : [sum (1 for _ in i )]).sum (). map (lambda x : x [ 1 ] )
28+ return self .mapPartitions (lambda i : [sum (1 for _ in i )]).map (lambda x : ( None , 1 ) )
3629
37- def sum (self ):
30+ def _sum (self ):
3831 """
3932 """
4033 return self .mapPartitions (lambda x : [sum (x )]).reduce (operator .add )
@@ -65,15 +58,22 @@ def func(s, iterator): return chain.from_iterable(imap(f, iterator))
6558 def map (self , f , preservesPartitioning = False ):
6659 """
6760 """
68- def func (split , iterator ): return imap (f , iterator )
69- return PipelinedDStream (self , func , preservesPartitioning )
61+ def func (iterator ): return imap (f , iterator )
62+ return self .mapPartitions (func )
63+ #return PipelinedDStream(self, func, preservesPartitioning)
7064
7165 def mapPartitions (self , f ):
7266 """
7367 """
7468 def func (s , iterator ): return f (iterator )
7569 return self .mapPartitionsWithIndex (func )
7670
71+ def mapPartitionsWithIndex (self , f , preservesPartitioning = False ):
72+ """
73+
74+ """
75+ return PipelinedDStream (self , f , preservesPartitioning )
76+
7777 def reduce (self , func , numPartitions = None ):
7878 """
7979
@@ -92,8 +92,8 @@ def combineLocally(iterator):
9292
9393 #TODO for count operation make sure count implementation
9494 # This is different from what pyspark does
95- if isinstance (x , int ):
96- x = ("" , x )
95+ # if isinstance(x, int):
96+ # x = ("", x)
9797
9898 (k , v ) = x
9999 if k not in combiners :
0 commit comments