@@ -332,7 +332,7 @@ def test_groupByKey_batch(self):
332332        """Basic operation test for DStream.groupByKey with batch deserializer.""" 
333333        test_input  =  [range (1 , 5 ), [1 , 1 , 1 , 2 , 2 , 3 ], ["a" , "a" , "b" , "" , "" , "" ]]
334334        def  test_func (dstream ):
335-             return  dstream .map (lambda  x : (x ,1 )).groupByKey ()
335+             return  dstream .map (lambda  x : (x ,  1 )).groupByKey ()
336336        expected_output  =  [[(1 , [1 ]), (2 , [1 ]), (3 , [1 ]), (4 , [1 ])],
337337                           [(1 , [1 , 1 , 1 ]), (2 , [1 , 1 ]), (3 , [1 ])],
338338                           [("a" , [1 , 1 ]), ("b" , [1 ]), ("" , [1 , 1 , 1 ])]]
@@ -345,8 +345,9 @@ def test_func(dstream):
345345    def  test_groupByKey_unbatch (self ):
346346        """Basic operation test for DStream.groupByKey with unbatch deserializer.""" 
347347        test_input  =  [range (1 , 4 ), [1 , 1 , "" ], ["a" , "a" , "b" ]]
348+ 
348349        def  test_func (dstream ):
349-             return  dstream .map (lambda  x : (x ,1 )).groupByKey ()
350+             return  dstream .map (lambda  x : (x ,  1 )).groupByKey ()
350351        expected_output  =  [[(1 , [1 ]), (2 , [1 ]), (3 , [1 ])],
351352                           [(1 , [1 , 1 ]), ("" , [1 ])],
352353                           [("a" , [1 , 1 ]), ("b" , [1 ])]]
@@ -356,6 +357,36 @@ def test_func(dstream):
356357            self ._sort_result_based_on_key (result )
357358        self .assertEqual (expected_output , output )
358359
360+     def  test_combineByKey_batch (self ):
361+         """Basic operation test for DStream.combineByKey with batch deserializer.""" 
362+         test_input  =  [range (1 , 5 ), [1 , 1 , 1 , 2 , 2 , 3 ], ["a" , "a" , "b" , "" , "" , "" ]]
363+ 
364+         def  test_func (dstream ):
365+             def  add (a , b ): return  a  +  str (b )
366+             return  dstream .map (lambda  x : (x , 1 )).combineByKey (str , add , add )
367+         expected_output  =  [[(1 , "1" ), (2 , "1" ), (3 , "1" ), (4 , "1" )],
368+                            [(1 , "111" ), (2 , "11" ), (3 , "1" )],
369+                            [("a" , "11" ), ("b" , "1" ), ("" , "111" )]]
370+         output  =  self ._run_stream (test_input , test_func , expected_output )
371+         for  result  in  (output , expected_output ):
372+             self ._sort_result_based_on_key (result )
373+         self .assertEqual (expected_output , output )
374+ 
375+     def  test_combineByKey_unbatch (self ):
376+         """Basic operation test for DStream.combineByKey with unbatch deserializer.""" 
377+         test_input  =  [range (1 , 4 ), [1 , 1 , "" ], ["a" , "a" , "b" ]]
378+ 
379+         def  test_func (dstream ):
380+             def  add (a , b ): return  a  +  str (b )
381+             return  dstream .map (lambda  x : (x , 1 )).combineByKey (str , add , add )
382+         expected_output  =  [[(1 , "1" ), (2 , "1" ), (3 , "1" )],
383+                            [(1 , "11" ), ("" , "1" )],
384+                            [("a" , "11" ), ("b" , "1" )]]
385+         output  =  self ._run_stream (test_input , test_func , expected_output )
386+         for  result  in  (output , expected_output ):
387+             self ._sort_result_based_on_key (result )
388+         self .assertEqual (expected_output , output )
389+ 
359390    def  _convert_iter_value_to_list (self , outputs ):
360391        """Return key value pair list. Value is converted to iterator to list.""" 
361392        result  =  list ()
0 commit comments