File tree Expand file tree Collapse file tree 2 files changed +8
-17
lines changed
examples/src/main/python/streaming Expand file tree Collapse file tree 2 files changed +8
-17
lines changed Original file line number Diff line number Diff line change 1414 ssc = StreamingContext (conf = conf , duration = Seconds (1 ))
1515
1616 lines = ssc .socketTextStream (sys .argv [1 ], int (sys .argv [2 ]))
17- fm_lines = lines .flatMap (lambda x : x .split (" " ))
18- mapped_lines = fm_lines .map (lambda x : (x , 1 ))
19- reduced_lines = mapped_lines .reduceByKey (add )
17+ words = lines .flatMap (lambda line : line .split (" " ))
18+ mapped_words = words .map (lambda word : (word , 1 ))
19+ count = mapped_words .reduceByKey (add )
2020
21- reduced_lines .pyprint ()
22- count_lines = mapped_lines .count ()
23- count_lines .pyprint ()
21+ count .pyprint ()
2422 ssc .start ()
2523 ssc .awaitTermination ()
Original file line number Diff line number Diff line change 1111 exit (- 1 )
1212 conf = SparkConf ()
1313 conf .setAppName ("PythonStreamingWordCount" )
14- conf .set ("spark.default.parallelism" , 1 )
1514
16- # still has a bug
17- # ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
1815 ssc = StreamingContext (conf = conf , duration = Seconds (1 ))
1916
2017 lines = ssc .textFileStream (sys .argv [1 ])
21- fm_lines = lines .flatMap (lambda x : x .split (" " ))
22- filtered_lines = fm_lines .filter (lambda line : "Spark" in line )
23- mapped_lines = fm_lines .map (lambda x : (x , 1 ))
24- reduced_lines = mapped_lines .reduceByKey (add )
18+ words = lines .flatMap (lambda line : line .split (" " ))
19+ mapped_words = words .map (lambda x : (x , 1 ))
20+ count = mapped_words .reduceByKey (add )
2521
26- fm_lines .pyprint ()
27- filtered_lines .pyprint ()
28- mapped_lines .pyprint ()
29- reduced_lines .pyprint ()
22+ count .pyprint ()
3023 ssc .start ()
3124 ssc .awaitTermination ()
You can’t perform that action at this time.
0 commit comments