@@ -39,6 +39,7 @@ class PySparkStreamingTestCase(unittest.TestCase):
3939 def setUp (self ):
4040 class_name = self .__class__ .__name__
4141 self .ssc = StreamingContext (appName = class_name , duration = Seconds (1 ))
42+ time .sleep (1 )
4243
4344 def tearDown (self ):
4445 # Do not call pyspark.streaming.context.StreamingContext.stop directly because
@@ -186,68 +187,73 @@ def test_func(dstream):
186187
187188 def test_reduceByKey_batch (self ):
188189 """Basic operation test for DStream.reduceByKey with batch deserializer"""
189- test_input = [["a" , "a" , "b" , "b" ], ["" , "" , "" , "" ]]
190+ test_input = [[("a" , 1 ), ("a" , 1 ), ("b" , 1 ), ("b" , 1 )],
191+ [("" , 1 ),("" , 1 ), ("" , 1 ), ("" , 1 )],
192+ [(1 , 1 ), (1 , 1 ), (2 , 1 ), (2 , 1 ), (3 , 1 )]]
190193
191194 def test_func (dstream ):
192- return dstream .map ( lambda x : ( x , 1 )). reduceByKey (operator .add )
193- expected_output = [[("a" , 2 ), ("b" , 2 )], [("" , 4 )]]
195+ return dstream .reduceByKey (operator .add )
196+ expected_output = [[("a" , 2 ), ("b" , 2 )], [("" , 4 )], [( 1 , 2 ), ( 2 , 2 ), ( 3 , 1 )] ]
194197 output = self ._run_stream (test_input , test_func , expected_output )
198+ for result in (output , expected_output ):
199+ self ._sort_result_based_on_key (result )
195200 self .assertEqual (expected_output , output )
196201
197202 def test_reduceByKey_unbatch (self ):
198203 """Basic operation test for DStream.reduceByKey with unbatch deserilizer"""
199- test_input = [["a" , "a" , "b" ], ["" , "" ], []]
204+ test_input = [[( "a" , 1 ), ( "a" , 1 ), ( "b" , 1 ) ], [( "" , 1 ), ( "" , 1 ) ], []]
200205
201206 def test_func (dstream ):
202- return dstream .map ( lambda x : ( x , 1 )). reduceByKey (operator .add )
207+ return dstream .reduceByKey (operator .add )
203208 expected_output = [[("a" , 2 ), ("b" , 1 )], [("" , 2 )], []]
204209 output = self ._run_stream (test_input , test_func , expected_output )
210+ for result in (output , expected_output ):
211+ self ._sort_result_based_on_key (result )
205212 self .assertEqual (expected_output , output )
206213
207214 def test_mapValues_batch (self ):
208215 """Basic operation test for DStream.mapValues with batch deserializer"""
209- test_input = [["a" , "a" , "b" , "b" ], ["" , "" , "" , "" ]]
216+ test_input = [[("a" , 2 ), ("b" , 2 ), ("c" , 1 ), ("d" , 1 )],
217+ [("" , 4 ), (1 , 1 ), (2 , 2 ), (3 , 3 )]]
210218
211219 def test_func (dstream ):
212- return dstream .map (lambda x : (x , 1 ))\
213- .reduceByKey (operator .add )\
214- .mapValues (lambda x : x + 10 )
215- expected_output = [[("a" , 12 ), ("b" , 12 )], [("" , 14 )]]
220+ return dstream .mapValues (lambda x : x + 10 )
221+ expected_output = [[("a" , 12 ), ("b" , 12 ), ("c" , 11 ), ("d" , 11 )],
222+ [("" , 14 ), (1 , 11 ), (2 , 12 ), (3 , 13 )]]
216223 output = self ._run_stream (test_input , test_func , expected_output )
224+ for result in (output , expected_output ):
225+ self ._sort_result_based_on_key (result )
217226 self .assertEqual (expected_output , output )
218227
219228 def test_mapValues_unbatch (self ):
220229 """Basic operation test for DStream.mapValues with unbatch deserializer"""
221- test_input = [["a" , "a" , "b" ], ["" , "" ], []]
230+ test_input = [[( "a" , 2 ), ( "b" , 1 ) ], [( "" , 2 ) ], []]
222231
223232 def test_func (dstream ):
224- return dstream .map (lambda x : (x , 1 ))\
225- .reduceByKey (operator .add )\
226- .mapValues (lambda x : x + 10 )
233+ return dstream .mapValues (lambda x : x + 10 )
227234 expected_output = [[("a" , 12 ), ("b" , 11 )], [("" , 12 )], []]
228235 output = self ._run_stream (test_input , test_func , expected_output )
229236 self .assertEqual (expected_output , output )
230237
231238 def test_flatMapValues_batch (self ):
232239 """Basic operation test for DStream.flatMapValues with batch deserializer"""
233- test_input = [["a" , "a " , "b " , "b" ], ["" , "" , "" , "" ]]
240+ test_input = [[( "a" , 2 ), ( "b " , 2 ), ( "c " , 1 ), ( "d" , 1 ) ], [( "" , 4 ), ( 1 , 1 ), ( 2 , 1 ), ( 3 , 1 ) ]]
234241
235242 def test_func (dstream ):
236- return dstream .map (lambda x : (x , 1 ))\
237- .reduceByKey (operator .add )\
238- .flatMapValues (lambda x : (x , x + 10 ))
239- expected_output = [[("a" , 2 ), ("a" , 12 ), ("b" , 2 ), ("b" , 12 )], [("" , 4 ), ("" , 14 )]]
243+ return dstream .flatMapValues (lambda x : (x , x + 10 ))
244+ expected_output = [[("a" , 2 ), ("a" , 12 ), ("b" , 2 ), ("b" , 12 ),
245+ ("c" , 1 ), ("c" , 11 ), ("d" , 1 ), ("d" , 11 )],
246+ [("" , 4 ), ("" , 14 ), (1 , 1 ), (1 , 11 ),
247+ (2 , 1 ), (2 , 11 ), (3 , 1 ), (3 , 11 )]]
240248 output = self ._run_stream (test_input , test_func , expected_output )
241249 self .assertEqual (expected_output , output )
242250
243251 def test_flatMapValues_unbatch (self ):
244252 """Basic operation test for DStream.flatMapValues with unbatch deserializer"""
245- test_input = [["a" , "a" , "b" ], ["" , "" ], []]
253+ test_input = [[( "a" , 2 ), ( "b" , 1 ) ], [( "" , 2 ) ], []]
246254
247255 def test_func (dstream ):
248- return dstream .map (lambda x : (x , 1 ))\
249- .reduceByKey (operator .add )\
250- .flatMapValues (lambda x : (x , x + 10 ))
256+ return dstream .flatMapValues (lambda x : (x , x + 10 ))
251257 expected_output = [[("a" , 2 ), ("a" , 12 ), ("b" , 1 ), ("b" , 11 )], [("" , 2 ), ("" , 12 )], []]
252258 output = self ._run_stream (test_input , test_func , expected_output )
253259 self .assertEqual (expected_output , output )
@@ -302,7 +308,7 @@ def f(iterator):
302308
303309 def test_countByValue_batch (self ):
304310 """Basic operation test for DStream.countByValue with batch deserializer."""
305- test_input = [range (1 , 5 ) + range ( 1 , 5 ) , range (5 , 7 ) + range (5 , 9 ), ["a" , "a" , "b" , "" ]]
311+ test_input = [range (1 , 5 ) * 2 , range (5 , 7 ) + range (5 , 9 ), ["a" , "a" , "b" , "" ]]
306312
307313 def test_func (dstream ):
308314 return dstream .countByValue ()
@@ -330,9 +336,12 @@ def test_func(dstream):
330336
331337 def test_groupByKey_batch (self ):
332338 """Basic operation test for DStream.groupByKey with batch deserializer."""
333- test_input = [range (1 , 5 ), [1 , 1 , 1 , 2 , 2 , 3 ], ["a" , "a" , "b" , "" , "" , "" ]]
339+ test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 ), (4 , 1 )],
340+ [(1 , 1 ), (1 , 1 ), (1 , 1 ), (2 , 1 ), (2 , 1 ), (3 , 1 )],
341+ [("a" , 1 ), ("a" , 1 ), ("b" , 1 ), ("" , 1 ), ("" , 1 ), ("" , 1 )]]
342+
334343 def test_func (dstream ):
335- return dstream .map ( lambda x : ( x , 1 )). groupByKey ()
344+ return dstream .groupByKey ()
336345 expected_output = [[(1 , [1 ]), (2 , [1 ]), (3 , [1 ]), (4 , [1 ])],
337346 [(1 , [1 , 1 , 1 ]), (2 , [1 , 1 ]), (3 , [1 ])],
338347 [("a" , [1 , 1 ]), ("b" , [1 ]), ("" , [1 , 1 , 1 ])]]
@@ -344,10 +353,12 @@ def test_func(dstream):
344353
345354 def test_groupByKey_unbatch (self ):
346355 """Basic operation test for DStream.groupByKey with unbatch deserializer."""
347- test_input = [range (1 , 4 ), [1 , 1 , "" ], ["a" , "a" , "b" ]]
356+ test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 )],
357+ [(1 , 1 ), (1 , 1 ), ("" , 1 )],
358+ [("a" , 1 ), ("a" , 1 ), ("b" , 1 )]]
348359
349360 def test_func (dstream ):
350- return dstream .map ( lambda x : ( x , 1 )). groupByKey ()
361+ return dstream .groupByKey ()
351362 expected_output = [[(1 , [1 ]), (2 , [1 ]), (3 , [1 ])],
352363 [(1 , [1 , 1 ]), ("" , [1 ])],
353364 [("a" , [1 , 1 ]), ("b" , [1 ])]]
@@ -359,11 +370,13 @@ def test_func(dstream):
359370
360371 def test_combineByKey_batch (self ):
361372 """Basic operation test for DStream.combineByKey with batch deserializer."""
362- test_input = [range (1 , 5 ), [1 , 1 , 1 , 2 , 2 , 3 ], ["a" , "a" , "b" , "" , "" , "" ]]
373+ test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 ), (4 , 1 )],
374+ [(1 , 1 ), (1 , 1 ), (1 , 1 ), (2 , 1 ), (2 , 1 ), (3 , 1 )],
375+ [("a" , 1 ), ("a" , 1 ), ("b" , 1 ), ("" , 1 ), ("" , 1 ), ("" , 1 )]]
363376
364377 def test_func (dstream ):
365378 def add (a , b ): return a + str (b )
366- return dstream .map ( lambda x : ( x , 1 )). combineByKey (str , add , add )
379+ return dstream .combineByKey (str , add , add )
367380 expected_output = [[(1 , "1" ), (2 , "1" ), (3 , "1" ), (4 , "1" )],
368381 [(1 , "111" ), (2 , "11" ), (3 , "1" )],
369382 [("a" , "11" ), ("b" , "1" ), ("" , "111" )]]
@@ -374,11 +387,11 @@ def add(a, b): return a + str(b)
374387
375388 def test_combineByKey_unbatch (self ):
376389 """Basic operation test for DStream.combineByKey with unbatch deserializer."""
377- test_input = [range (1 , 4 ), [ 1 , 1 , "" ], ["a" , "a" , "b" ]]
390+ test_input = [[ (1 , 1 ), ( 2 , 1 ), ( 3 , 1 )], [( 1 , 1 ), ( 1 , 1 ), ( "" , 1 ) ], [( "a" , 1 ), ( "a" , 1 ), ( "b" , 1 ) ]]
378391
379392 def test_func (dstream ):
380393 def add (a , b ): return a + str (b )
381- return dstream .map ( lambda x : ( x , 1 )). combineByKey (str , add , add )
394+ return dstream .combineByKey (str , add , add )
382395 expected_output = [[(1 , "1" ), (2 , "1" ), (3 , "1" )],
383396 [(1 , "11" ), ("" , "1" )],
384397 [("a" , "11" ), ("b" , "1" )]]
@@ -446,4 +459,4 @@ def tearDownClass(cls):
446459
447460
448461if __name__ == "__main__" :
449- unittest .main ()
462+ unittest .main (verbosity = 2 )
0 commit comments