|  | 
| 35 | 35 | import time | 
| 36 | 36 | import unittest | 
| 37 | 37 | import zipfile | 
|  | 38 | +import operator | 
| 38 | 39 | 
 | 
|  | 40 | +from pyspark.context import SparkContext | 
| 39 | 41 | from pyspark.streaming.context import StreamingContext | 
| 40 | 42 | from pyspark.streaming.duration import * | 
| 41 | 43 | 
 | 
| 42 | 44 | 
 | 
| 43 | 45 | SPARK_HOME = os.environ["SPARK_HOME"] | 
| 44 | 46 | 
 | 
| 45 |  | -class buff: | 
|  | 47 | +class StreamOutput: | 
| 46 | 48 |     """ | 
| 47 |  | -    Buffer for store the output from stream | 
|  | 49 | +    a class to store the output from stream | 
| 48 | 50 |     """ | 
| 49 |  | -    result = None | 
|  | 51 | +    result = list() | 
| 50 | 52 | 
 | 
| 51 | 53 | class PySparkStreamingTestCase(unittest.TestCase): | 
| 52 | 54 |     def setUp(self): | 
| 53 |  | -        print "set up" | 
| 54 | 55 |         class_name = self.__class__.__name__ | 
| 55 | 56 |         self.ssc = StreamingContext(appName=class_name, duration=Seconds(1)) | 
| 56 | 57 | 
 | 
| 57 | 58 |     def tearDown(self): | 
| 58 |  | -        print "tear donw" | 
| 59 |  | -        self.ssc.stop() | 
| 60 |  | -        time.sleep(10) | 
|  | 59 | +        # Do not call StreamingContext.stop directly because we do not wait to shutdown | 
|  | 60 | +        # call back server and py4j client | 
|  | 61 | +        self.ssc._jssc.stop() | 
|  | 62 | +        self.ssc._sc.stop() | 
|  | 63 | +        # Why does it long time to terminaete StremaingContext and SparkContext? | 
|  | 64 | +        # Should we change the sleep time if this depends on machine spec? | 
|  | 65 | +        time.sleep(5) | 
|  | 66 | + | 
|  | 67 | +    @classmethod | 
|  | 68 | +    def tearDownClass(cls): | 
|  | 69 | +        time.sleep(5) | 
|  | 70 | +        SparkContext._gateway._shutdown_callback_server() | 
| 61 | 71 | 
 | 
| 62 | 72 | class TestBasicOperationsSuite(PySparkStreamingTestCase): | 
|  | 73 | +    """ | 
|  | 74 | +    Input and output of this TestBasicOperationsSuite is the equivalent to  | 
|  | 75 | +    Scala TestBasicOperationsSuite. | 
|  | 76 | +    """ | 
| 63 | 77 |     def setUp(self): | 
| 64 | 78 |         PySparkStreamingTestCase.setUp(self) | 
| 65 |  | -        buff.result = None | 
|  | 79 | +        StreamOutput.result = list() | 
| 66 | 80 |         self.timeout = 10 # seconds | 
| 67 | 81 | 
 | 
| 68 | 82 |     def tearDown(self): | 
| 69 | 83 |         PySparkStreamingTestCase.tearDown(self) | 
| 70 | 84 | 
 | 
|  | 85 | +    @classmethod | 
|  | 86 | +    def tearDownClass(cls): | 
|  | 87 | +        PySparkStreamingTestCase.tearDownClass() | 
|  | 88 | + | 
| 71 | 89 |     def test_map(self): | 
|  | 90 | +        """Basic operation test for DStream.map""" | 
| 72 | 91 |         test_input = [range(1,5), range(5,9), range(9, 13)] | 
| 73 | 92 |         def test_func(dstream): | 
| 74 | 93 |             return dstream.map(lambda x: str(x)) | 
| 75 |  | -        expected = map(str, test_input) | 
| 76 |  | -        output = self.run_stream(test_input, test_func) | 
| 77 |  | -        self.assertEqual(output, expected) | 
|  | 94 | +        expected_output = map(lambda x: map(lambda y: str(y), x), test_input) | 
|  | 95 | +        output = self._run_stream(test_input, test_func, expected_output) | 
|  | 96 | +        self.assertEqual(expected_output, output) | 
| 78 | 97 | 
 | 
| 79 | 98 |     def test_flatMap(self): | 
|  | 99 | +        """Basic operation test for DStream.faltMap""" | 
| 80 | 100 |         test_input = [range(1,5), range(5,9), range(9, 13)] | 
| 81 | 101 |         def test_func(dstream): | 
| 82 | 102 |             return dstream.flatMap(lambda x: (x, x * 2)) | 
| 83 |  | -        # Maybe there be good way to create flatmap | 
| 84 |  | -        excepted = map(lambda x: list(chain.from_iterable((map(lambda y:[y, y*2], x)))),  | 
|  | 103 | +        expected_output = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))),  | 
| 85 | 104 |                        test_input) | 
| 86 |  | -        output = self.run_stream(test_input, test_func) | 
|  | 105 | +        output = self._run_stream(test_input, test_func, expected_output) | 
|  | 106 | +        self.assertEqual(expected_output, output) | 
|  | 107 | + | 
|  | 108 | +    def test_filter(self): | 
|  | 109 | +        """Basic operation test for DStream.filter""" | 
|  | 110 | +        test_input = [range(1,5), range(5,9), range(9, 13)] | 
|  | 111 | +        def test_func(dstream): | 
|  | 112 | +            return dstream.filter(lambda x: x % 2 == 0) | 
|  | 113 | +        expected_output = map(lambda x: filter(lambda y: y % 2 == 0, x), test_input) | 
|  | 114 | +        output = self._run_stream(test_input, test_func, expected_output) | 
|  | 115 | +        self.assertEqual(expected_output, output) | 
|  | 116 | + | 
|  | 117 | +    def test_count(self): | 
|  | 118 | +        """Basic operation test for DStream.count""" | 
|  | 119 | +        test_input = [[], [1], range(1, 3), range(1,4), range(1,5)] | 
|  | 120 | +        def test_func(dstream): | 
|  | 121 | +            return dstream.count() | 
|  | 122 | +        expected_output = map(lambda x: [len(x)], test_input) | 
|  | 123 | +        output = self._run_stream(test_input, test_func, expected_output) | 
|  | 124 | +        self.assertEqual(expected_output, output) | 
|  | 125 | +         | 
|  | 126 | +    def test_reduce(self): | 
|  | 127 | +        """Basic operation test for DStream.reduce""" | 
|  | 128 | +        test_input = [range(1,5), range(5,9), range(9, 13)] | 
|  | 129 | +        def test_func(dstream): | 
|  | 130 | +            return dstream.reduce(operator.add) | 
|  | 131 | +        expected_output = map(lambda x: [reduce(operator.add, x)], test_input) | 
|  | 132 | +        output = self._run_stream(test_input, test_func, expected_output) | 
|  | 133 | +        self.assertEqual(expected_output, output) | 
|  | 134 | + | 
|  | 135 | +    def test_reduceByKey(self): | 
|  | 136 | +        """Basic operation test for DStream.reduceByKey""" | 
|  | 137 | +        test_input = [["a", "a", "b"], ["", ""], []] | 
|  | 138 | +        def test_func(dstream): | 
|  | 139 | +            return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add) | 
|  | 140 | +        expected_output = [[("a", 2), ("b", 1)],[("", 2)], []] | 
|  | 141 | +        output = self._run_stream(test_input, test_func, expected_output) | 
|  | 142 | +        self.assertEqual(expected_output, output) | 
| 87 | 143 | 
 | 
| 88 |  | -    def run_stream(self, test_input, test_func): | 
|  | 144 | +    def _run_stream(self, test_input, test_func, expected_output): | 
|  | 145 | +        """Start stream and return the output""" | 
| 89 | 146 |         # Generate input stream with user-defined input | 
| 90 | 147 |         test_input_stream = self.ssc._testInputStream(test_input) | 
| 91 | 148 |         # Applyed test function to stream | 
| 92 | 149 |         test_stream = test_func(test_input_stream) | 
| 93 | 150 |         # Add job to get outpuf from stream | 
| 94 |  | -        test_stream._test_output(buff) | 
|  | 151 | +        test_stream._test_output(StreamOutput.result) | 
| 95 | 152 |         self.ssc.start() | 
| 96 | 153 | 
 | 
| 97 | 154 |         start_time = time.time() | 
|  | 155 | +        # loop until get the result from stream | 
| 98 | 156 |         while True: | 
| 99 | 157 |             current_time = time.time() | 
| 100 | 158 |             # check time out | 
| 101 | 159 |             if (current_time - start_time) > self.timeout: | 
| 102 |  | -                self.ssc.stop() | 
| 103 | 160 |                 break | 
| 104 | 161 |             self.ssc.awaitTermination(50) | 
| 105 |  | -            if buff.result is not None: | 
|  | 162 | +            if len(expected_output) == len(StreamOutput.result): | 
| 106 | 163 |                 break | 
| 107 |  | -        return buff.result | 
|  | 164 | +        return StreamOutput.result | 
| 108 | 165 | 
 | 
| 109 | 166 | if __name__ == "__main__": | 
| 110 | 167 |     unittest.main() | 
0 commit comments