@@ -54,7 +54,7 @@ def _test_func(self, input, func, expected, sort=False, input2=None):
5454 else :
5555 stream = func (input_stream )
5656
57- result = stream .collect ()
57+ result = stream ._collect ()
5858 self .ssc .start ()
5959
6060 start_time = time .time ()
@@ -86,12 +86,12 @@ class TestBasicOperations(PySparkStreamingTestCase):
8686 def test_take (self ):
8787 input = [range (i ) for i in range (3 )]
8888 dstream = self .ssc .queueStream (input )
89- self .assertEqual ([0 , 0 , 1 ], dstream .take (3 ))
89+ self .assertEqual ([0 , 0 , 1 ], dstream ._take (3 ))
9090
9191 def test_first (self ):
9292 input = [range (10 )]
9393 dstream = self .ssc .queueStream (input )
94- self .assertEqual (0 , dstream .first ())
94+ self .assertEqual (0 , dstream ._first ())
9595
9696 def test_map (self ):
9797 """Basic operation test for DStream.map."""
@@ -415,26 +415,31 @@ def _addInputStream(self):
415415 # Make sure each length of input is over 3
416416 inputs = map (lambda x : range (1 , x ), range (5 , 101 ))
417417 stream = self .ssc .queueStream (inputs )
418- stream .collect ()
418+ stream ._collect ()
419419
420420 def test_queueStream (self ):
421421 input = [range (i ) for i in range (3 )]
422422 dstream = self .ssc .queueStream (input )
423- result = dstream .collect ()
423+ result = dstream ._collect ()
424424 self .ssc .start ()
425425 time .sleep (1 )
426426 self .assertEqual (input , result [:3 ])
427427
428- # TODO: test textFileStream
428+ # TODO: fix this test
429429 # def test_textFileStream(self):
430430 # input = [range(i) for i in range(3)]
431431 # dstream = self.ssc.queueStream(input)
432432 # d = os.path.join(tempfile.gettempdir(), str(id(self)))
433433 # if not os.path.exists(d):
434434 # os.makedirs(d)
435435 # dstream.saveAsTextFiles(os.path.join(d, 'test'))
436+ # self.ssc.start()
437+ # time.sleep(1)
438+ # self.ssc.stop(False, True)
439+ #
440+ # self.ssc = StreamingContext(self.sc, self.batachDuration)
436441 # dstream2 = self.ssc.textFileStream(d)
437- # result = dstream2.collect ()
442+ # result = dstream2._collect ()
438443 # self.ssc.start()
439444 # time.sleep(2)
440445 # self.assertEqual(input, result[:3])
@@ -444,7 +449,7 @@ def test_union(self):
444449 dstream = self .ssc .queueStream (input )
445450 dstream2 = self .ssc .queueStream (input )
446451 dstream3 = self .ssc .union (dstream , dstream2 )
447- result = dstream3 .collect ()
452+ result = dstream3 ._collect ()
448453 self .ssc .start ()
449454 time .sleep (1 )
450455 expected = [i * 2 for i in input ]
@@ -461,7 +466,7 @@ def func(rdds):
461466
462467 dstream = self .ssc .transform ([dstream1 , dstream2 , dstream3 ], func )
463468
464- self .assertEqual ([2 , 3 , 1 ], dstream .take (3 ))
469+ self .assertEqual ([2 , 3 , 1 ], dstream ._take (3 ))
465470
466471
467472if __name__ == "__main__" :
0 commit comments