2424
2525"""
2626from itertools import chain
27- import os
2827import time
2928import unittest
3029import operator
3433from pyspark .streaming .duration import *
3534
3635
37- SPARK_HOME = os .environ ["SPARK_HOME" ]
38-
39-
4036class PySparkStreamingTestCase (unittest .TestCase ):
4137 def setUp (self ):
4238 class_name = self .__class__ .__name__
@@ -49,7 +45,7 @@ def tearDown(self):
4945 self .ssc ._sc .stop ()
5046 # Why does it long time to terminaete StremaingContext and SparkContext?
5147 # Should we change the sleep time if this depends on machine spec?
52- time .sleep (5 )
48+ time .sleep (8 )
5349
5450 @classmethod
5551 def tearDownClass (cls ):
@@ -59,8 +55,17 @@ def tearDownClass(cls):
5955
6056class TestBasicOperationsSuite (PySparkStreamingTestCase ):
6157 """
62- Input and output of this TestBasicOperationsSuite is the equivalent to
63- Scala TestBasicOperationsSuite.
58+ 2 tests for each function for batach deserializer and unbatch deserilizer because
59+ we cannot change the deserializer after streaming process starts.
60+ Default numInputPartitions is 2.
61+ If the number of input element is over 3, that DStream use batach deserializer.
62+ If not, that DStream use unbatch deserializer.
63+
64+ Most of the operation uses UTF8 deserializer to get value from Scala.
65+ I am wondering if these test are enough or not.
66+ All tests input should have list of lists. This represents stream.
67+ Every batch interval, the first object of list are chosen to make DStream.
68+ Please see the BasicTestSuits in Scala or QueStream which is close to this implementation.
6469 """
6570 def setUp (self ):
6671 PySparkStreamingTestCase .setUp (self )
@@ -75,8 +80,8 @@ def tearDown(self):
7580 def tearDownClass (cls ):
7681 PySparkStreamingTestCase .tearDownClass ()
7782
78- def test_map (self ):
79- """Basic operation test for DStream.map"""
83+ def test_map_batch (self ):
84+ """Basic operation test for DStream.map with batch deserializer """
8085 test_input = [range (1 , 5 ), range (5 , 9 ), range (9 , 13 )]
8186
8287 def test_func (dstream ):
@@ -85,8 +90,18 @@ def test_func(dstream):
8590 output = self ._run_stream (test_input , test_func , expected_output )
8691 self .assertEqual (expected_output , output )
8792
88- def test_flatMap (self ):
89- """Basic operation test for DStream.faltMap"""
93+ def test_map_unbatach (self ):
94+ """Basic operation test for DStream.map with unbatch deserializer"""
95+ test_input = [range (1 , 4 ), range (4 , 7 ), range (7 , 10 )]
96+
97+ def test_func (dstream ):
98+ return dstream .map (lambda x : str (x ))
99+ expected_output = map (lambda x : map (lambda y : str (y ), x ), test_input )
100+ output = self ._run_stream (test_input , test_func , expected_output )
101+ self .assertEqual (expected_output , output )
102+
103+ def test_flatMap_batch (self ):
104+ """Basic operation test for DStream.faltMap with batch deserializer"""
90105 test_input = [range (1 , 5 ), range (5 , 9 ), range (9 , 13 )]
91106
92107 def test_func (dstream ):
@@ -96,8 +111,19 @@ def test_func(dstream):
96111 output = self ._run_stream (test_input , test_func , expected_output )
97112 self .assertEqual (expected_output , output )
98113
99- def test_filter (self ):
100- """Basic operation test for DStream.filter"""
114+ def test_flatMap_unbatch (self ):
115+ """Basic operation test for DStream.faltMap with unbatch deserializer"""
116+ test_input = [range (1 , 4 ), range (4 , 7 ), range (7 , 10 )]
117+
118+ def test_func (dstream ):
119+ return dstream .flatMap (lambda x : (x , x * 2 ))
120+ expected_output = map (lambda x : list (chain .from_iterable ((map (lambda y : [y , y * 2 ], x )))),
121+ test_input )
122+ output = self ._run_stream (test_input , test_func , expected_output )
123+ self .assertEqual (expected_output , output )
124+
125+ def test_filter_batch (self ):
126+ """Basic operation test for DStream.filter with batch deserializer"""
101127 test_input = [range (1 , 5 ), range (5 , 9 ), range (9 , 13 )]
102128
103129 def test_func (dstream ):
@@ -106,21 +132,38 @@ def test_func(dstream):
106132 output = self ._run_stream (test_input , test_func , expected_output )
107133 self .assertEqual (expected_output , output )
108134
109- def test_count (self ):
110- """Basic operation test for DStream.count"""
111- #test_input = [[], [1], range(1, 3), range(1, 4), range(1, 5)]
112- test_input = [range (1 , 5 ), range (1 ,10 ), range (1 ,20 )]
135+ def test_filter_unbatch (self ):
136+ """Basic operation test for DStream.filter with unbatch deserializer"""
137+ test_input = [range (1 , 4 ), range (4 , 7 ), range (7 , 10 )]
138+
139+ def test_func (dstream ):
140+ return dstream .filter (lambda x : x % 2 == 0 )
141+ expected_output = map (lambda x : filter (lambda y : y % 2 == 0 , x ), test_input )
142+ output = self ._run_stream (test_input , test_func , expected_output )
143+ self .assertEqual (expected_output , output )
144+
145+ def test_count_batch (self ):
146+ """Basic operation test for DStream.count with batch deserializer"""
147+ test_input = [range (1 , 5 ), range (1 , 10 ), range (1 , 20 )]
113148
114149 def test_func (dstream ):
115- print "count"
116- dstream .count ().pyprint ()
117150 return dstream .count ()
118151 expected_output = map (lambda x : [len (x )], test_input )
119152 output = self ._run_stream (test_input , test_func , expected_output )
120153 self .assertEqual (expected_output , output )
121-
122- def test_reduce (self ):
123- """Basic operation test for DStream.reduce"""
154+
155+ def test_count_unbatch (self ):
156+ """Basic operation test for DStream.count with unbatch deserializer"""
157+ test_input = [[], [1 ], range (1 , 3 ), range (1 , 4 )]
158+
159+ def test_func (dstream ):
160+ return dstream .count ()
161+ expected_output = map (lambda x : [len (x )], test_input )
162+ output = self ._run_stream (test_input , test_func , expected_output )
163+ self .assertEqual (expected_output , output )
164+
165+ def test_reduce_batch (self ):
166+ """Basic operation test for DStream.reduce with batch deserializer"""
124167 test_input = [range (1 , 5 ), range (5 , 9 ), range (9 , 13 )]
125168
126169 def test_func (dstream ):
@@ -129,67 +172,132 @@ def test_func(dstream):
129172 output = self ._run_stream (test_input , test_func , expected_output )
130173 self .assertEqual (expected_output , output )
131174
132- def test_reduceByKey (self ):
133- """Basic operation test for DStream.reduceByKey"""
134- #test_input = [["a", "a", "b"], ["", ""], []]
135- test_input = [["a" , "a" , "b" , "b" ], ["" , "" , "" , "" ], []]
175+ def test_reduce_unbatch (self ):
176+ """Basic operation test for DStream.reduce with unbatch deserializer"""
177+ test_input = [[1 ], range (1 , 3 ), range (1 , 4 )]
178+
179+ def test_func (dstream ):
180+ return dstream .reduce (operator .add )
181+ expected_output = map (lambda x : [reduce (operator .add , x )], test_input )
182+ output = self ._run_stream (test_input , test_func , expected_output )
183+ self .assertEqual (expected_output , output )
184+
185+ def test_reduceByKey_batch (self ):
186+ """Basic operation test for DStream.reduceByKey with batch deserializer"""
187+ test_input = [["a" , "a" , "b" , "b" ], ["" , "" , "" , "" ]]
188+
189+ def test_func (dstream ):
190+ return dstream .map (lambda x : (x , 1 )).reduceByKey (operator .add )
191+ expected_output = [[("a" , 2 ), ("b" , 2 )], [("" , 4 )]]
192+ output = self ._run_stream (test_input , test_func , expected_output )
193+ self .assertEqual (expected_output , output )
194+
195+ def test_reduceByKey_unbatch (self ):
196+ """Basic operation test for DStream.reduceByKey with unbatch deserilizer"""
197+ test_input = [["a" , "a" , "b" ], ["" , "" ], []]
136198
137199 def test_func (dstream ):
138- print "reduceByKey"
139- dstream .map (lambda x : (x , 1 )).pyprint ()
140200 return dstream .map (lambda x : (x , 1 )).reduceByKey (operator .add )
141- #expected_output = [[("a", 2), ("b", 1)], [("", 2)], []]
142- expected_output = [[("a" , 2 ), ("b" , 2 )], [("" , 4 )], []]
201+ expected_output = [[("a" , 2 ), ("b" , 1 )], [("" , 2 )], []]
143202 output = self ._run_stream (test_input , test_func , expected_output )
144203 self .assertEqual (expected_output , output )
145204
146- def test_mapValues (self ):
147- """Basic operation test for DStream.mapValues"""
148- #test_input = [["a", "a", "b"], ["", ""], []]
149- test_input = [["a" , "a" , "b" , "b" ], ["" , "" , "" , "" ], []]
205+ def test_mapValues_batch (self ):
206+ """Basic operation test for DStream.mapValues with batch deserializer"""
207+ test_input = [["a" , "a" , "b" , "b" ], ["" , "" , "" , "" ]]
150208
151209 def test_func (dstream ):
152- return dstream .map (lambda x : (x , 1 )).reduceByKey (operator .add ).mapValues (lambda x : x + 10 )
153- #expected_output = [[("a", 12), ("b", 11)], [("", 12)], []]
154- expected_output = [[("a" , 12 ), ("b" , 12 )], [("" , 14 )], []]
210+ return dstream .map (lambda x : (x , 1 ))\
211+ .reduceByKey (operator .add )\
212+ .mapValues (lambda x : x + 10 )
213+ expected_output = [[("a" , 12 ), ("b" , 12 )], [("" , 14 )]]
155214 output = self ._run_stream (test_input , test_func , expected_output )
156215 self .assertEqual (expected_output , output )
157216
158- def test_flatMapValues (self ):
159- """Basic operation test for DStream.flatMapValues"""
160- #test_input = [["a", "a", "b"], ["", ""], []]
161- test_input = [["a" , "a" , "b" , "b" ], ["" , "" , "" ,"" ], []]
217+ def test_mapValues_unbatch (self ):
218+ """Basic operation test for DStream.mapValues with unbatch deserializer"""
219+ test_input = [["a" , "a" , "b" ], ["" , "" ], []]
162220
163221 def test_func (dstream ):
164- return dstream .map (lambda x : (x , 1 )).reduceByKey (operator .add ).flatMapValues (lambda x : (x , x + 10 ))
165- #expected_output = [[("a", 2), ("a", 12), ("b", 1), ("b", 11)], [("", 2), ("", 12)], []]
166- expected_output = [[("a" , 2 ), ("a" , 12 ), ("b" , 2 ), ("b" , 12 )], [("" , 4 ), ("" , 14 )], []]
222+ return dstream .map (lambda x : (x , 1 ))\
223+ .reduceByKey (operator .add )\
224+ .mapValues (lambda x : x + 10 )
225+ expected_output = [[("a" , 12 ), ("b" , 11 )], [("" , 12 )], []]
167226 output = self ._run_stream (test_input , test_func , expected_output )
168227 self .assertEqual (expected_output , output )
169228
170- def test_glom (self ):
171- """Basic operation test for DStream.glom"""
229+ def test_flatMapValues_batch (self ):
230+ """Basic operation test for DStream.flatMapValues with batch deserializer"""
231+ test_input = [["a" , "a" , "b" , "b" ], ["" , "" , "" , "" ]]
232+
233+ def test_func (dstream ):
234+ return dstream .map (lambda x : (x , 1 ))\
235+ .reduceByKey (operator .add )\
236+ .flatMapValues (lambda x : (x , x + 10 ))
237+ expected_output = [[("a" , 2 ), ("a" , 12 ), ("b" , 2 ), ("b" , 12 )], [("" , 4 ), ("" , 14 )]]
238+ output = self ._run_stream (test_input , test_func , expected_output )
239+ self .assertEqual (expected_output , output )
240+
241+ def test_flatMapValues_unbatch (self ):
242+ """Basic operation test for DStream.flatMapValues with unbatch deserializer"""
243+ test_input = [["a" , "a" , "b" ], ["" , "" ], []]
244+
245+ def test_func (dstream ):
246+ return dstream .map (lambda x : (x , 1 ))\
247+ .reduceByKey (operator .add )\
248+ .flatMapValues (lambda x : (x , x + 10 ))
249+ expected_output = [[("a" , 2 ), ("a" , 12 ), ("b" , 1 ), ("b" , 11 )], [("" , 2 ), ("" , 12 )], []]
250+ output = self ._run_stream (test_input , test_func , expected_output )
251+ self .assertEqual (expected_output , output )
252+
253+ def test_glom_batch (self ):
254+ """Basic operation test for DStream.glom with batch deserializer"""
172255 test_input = [range (1 , 5 ), range (5 , 9 ), range (9 , 13 )]
173256 numSlices = 2
174257
175258 def test_func (dstream ):
176259 return dstream .glom ()
177- expected_output = [[[1 ,2 ], [3 ,4 ]], [[5 ,6 ], [7 ,8 ]], [[9 ,10 ], [11 ,12 ]]]
260+ expected_output = [[[1 , 2 ], [3 , 4 ]], [[5 , 6 ], [7 , 8 ]], [[9 , 10 ], [11 , 12 ]]]
261+ output = self ._run_stream (test_input , test_func , expected_output , numSlices )
262+ self .assertEqual (expected_output , output )
263+
264+ def test_glom_unbatach (self ):
265+ """Basic operation test for DStream.glom with unbatch deserialiser"""
266+ test_input = [range (1 , 4 ), range (4 , 7 ), range (7 , 10 )]
267+ numSlices = 2
268+
269+ def test_func (dstream ):
270+ return dstream .glom ()
271+ expected_output = [[[1 ], [2 , 3 ]], [[4 ], [5 , 6 ]], [[7 ], [8 , 9 ]]]
178272 output = self ._run_stream (test_input , test_func , expected_output , numSlices )
179273 self .assertEqual (expected_output , output )
180274
181- def test_mapPartitions (self ):
182- """Basic operation test for DStream.mapPartitions"""
275+ def test_mapPartitions_batch (self ):
276+ """Basic operation test for DStream.mapPartitions with batch deserializer """
183277 test_input = [range (1 , 5 ), range (5 , 9 ), range (9 , 13 )]
184278 numSlices = 2
185279
186280 def test_func (dstream ):
187- def f (iterator ): yield sum (iterator )
281+ def f (iterator ):
282+ yield sum (iterator )
188283 return dstream .mapPartitions (f )
189284 expected_output = [[3 , 7 ], [11 , 15 ], [19 , 23 ]]
190285 output = self ._run_stream (test_input , test_func , expected_output , numSlices )
191286 self .assertEqual (expected_output , output )
192287
288+ def test_mapPartitions_unbatch (self ):
289+ """Basic operation test for DStream.mapPartitions with unbatch deserializer"""
290+ test_input = [range (1 , 4 ), range (4 , 7 ), range (7 , 10 )]
291+ numSlices = 2
292+
293+ def test_func (dstream ):
294+ def f (iterator ):
295+ yield sum (iterator )
296+ return dstream .mapPartitions (f )
297+ expected_output = [[1 , 5 ], [4 , 11 ], [7 , 17 ]]
298+ output = self ._run_stream (test_input , test_func , expected_output , numSlices )
299+ self .assertEqual (expected_output , output )
300+
193301 def _run_stream (self , test_input , test_func , expected_output , numSlices = None ):
194302 """Start stream and return the output"""
195303 # Generate input stream with user-defined input
@@ -212,6 +320,7 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
212320 # check if the output is the same length of expexted output
213321 if len (expected_output ) == len (self .result ):
214322 break
323+
215324 return self .result
216325
217326if __name__ == "__main__" :
0 commit comments