@@ -20,21 +20,14 @@ def __init__(self, jdstream, ssc, jrdd_deserializer):
2020        self .ctx  =  ssc ._sc 
2121        self ._jrdd_deserializer  =  jrdd_deserializer 
2222
23-     def  generatedRDDs (self ):
24-         """ 
25-          // RDDs generated, marked as private[streaming] so that testsuites can access it 
26-          @transient 
27-         """ 
28-         pass 
29- 
3023    def  count (self ):
3124        """ 
3225
3326        """ 
3427        #TODO make sure count implementation, thiis different from what pyspark does 
35-         return  self .mapPartitions (lambda  i : [sum (1  for  _  in  i )]).sum (). map (lambda  x : x [ 1 ] )
28+         return  self .mapPartitions (lambda  i : [sum (1  for  _  in  i )]).map (lambda  x : ( None ,  1 ) )
3629
37-     def  sum (self ):
30+     def  _sum (self ):
3831        """ 
3932        """ 
4033        return  self .mapPartitions (lambda  x : [sum (x )]).reduce (operator .add )
@@ -65,15 +58,22 @@ def func(s, iterator): return chain.from_iterable(imap(f, iterator))
6558    def  map (self , f , preservesPartitioning = False ):
6659        """ 
6760        """ 
68-         def  func (split , iterator ): return  imap (f , iterator )
69-         return  PipelinedDStream (self , func , preservesPartitioning )
61+         def  func (iterator ): return  imap (f , iterator )
62+         return  self .mapPartitions (func )
63+         #return PipelinedDStream(self, func, preservesPartitioning) 
7064
7165    def  mapPartitions (self , f ):
7266        """ 
7367        """ 
7468        def  func (s , iterator ): return  f (iterator )
7569        return  self .mapPartitionsWithIndex (func )
7670
71+     def  mapPartitionsWithIndex (self , f , preservesPartitioning = False ):
72+         """ 
73+ 
74+         """ 
75+         return  PipelinedDStream (self , f , preservesPartitioning )
76+ 
7777    def  reduce (self , func , numPartitions = None ):
7878        """ 
7979
@@ -92,8 +92,8 @@ def combineLocally(iterator):
9292
9393                #TODO for count operation make sure count implementation 
9494                # This is different from what pyspark does 
95-                 if  isinstance (x , int ):
96-                     x  =  ("" , x )
95+                 # if isinstance(x, int):
96+                 #     x = ("", x)
9797
9898                (k , v ) =  x 
9999                if  k  not  in   combiners :
@@ -166,12 +166,6 @@ def _defaultReducePartitions(self):
166166
167167        return  self ._jdstream .partitions ().size ()
168168
169-     def  mapPartitionsWithIndex (self , f , preservesPartitioning = False ):
170-         """ 
171- 
172-         """ 
173-         return  PipelinedDStream (self , f , preservesPartitioning )
174- 
175169    def  _defaultReducePartitions (self ):
176170        """ 
177171
0 commit comments