@@ -52,12 +52,13 @@ class PySparkTestCase(unittest.TestCase):
5252 def setUp (self ):
5353 self ._old_sys_path = list (sys .path )
5454 class_name = self .__class__ .__name__
55- self .sc = SparkContext ('local[4]' , class_name , batchSize = 2 )
55+ self .sc = SparkContext ('local[4]' , class_name , batchSize = 2 )
5656
5757 def tearDown (self ):
5858 self .sc .stop ()
5959 sys .path = self ._old_sys_path
6060
61+
6162class TestCheckpoint (PySparkTestCase ):
6263
6364 def setUp (self ):
@@ -190,24 +191,27 @@ def test_deleting_input_files(self):
190191
191192 def testAggregateByKey (self ):
192193 data = self .sc .parallelize ([(1 , 1 ), (1 , 1 ), (3 , 2 ), (5 , 1 ), (5 , 3 )], 2 )
194+
193195 def seqOp (x , y ):
194196 x .add (y )
195197 return x
196198
197199 def combOp (x , y ):
198200 x |= y
199201 return x
200-
202+
201203 sets = dict (data .aggregateByKey (set (), seqOp , combOp ).collect ())
202204 self .assertEqual (3 , len (sets ))
203205 self .assertEqual (set ([1 ]), sets [1 ])
204206 self .assertEqual (set ([2 ]), sets [3 ])
205207 self .assertEqual (set ([1 , 3 ]), sets [5 ])
206208
209+
207210class TestIO (PySparkTestCase ):
208211
209212 def test_stdout_redirection (self ):
210213 import subprocess
214+
211215 def func (x ):
212216 subprocess .check_call ('ls' , shell = True )
213217 self .sc .parallelize ([1 ]).foreach (func )
@@ -479,7 +483,7 @@ def test_module_dependency(self):
479483 | return x + 1
480484 """ )
481485 proc = subprocess .Popen ([self .sparkSubmit , "--py-files" , zip , script ],
482- stdout = subprocess .PIPE )
486+ stdout = subprocess .PIPE )
483487 out , err = proc .communicate ()
484488 self .assertEqual (0 , proc .returncode )
485489 self .assertIn ("[2, 3, 4]" , out )
0 commit comments