File tree Expand file tree Collapse file tree 2 files changed +24
-1
lines changed
examples/src/main/python/streaming Expand file tree Collapse file tree 2 files changed +24
-1
lines changed Original file line number Diff line number Diff line change 1+ import sys
2+ from operator import add
3+
4+ from pyspark .conf import SparkConf
5+ from pyspark .streaming .context import StreamingContext
6+ from pyspark .streaming .duration import *
7+
8+ if __name__ == "__main__" :
9+ if len (sys .argv ) != 3 :
10+ print >> sys .stderr , "Usage: wordcount <hostname> <port>"
11+ exit (- 1 )
12+ conf = SparkConf ()
13+ conf .setAppName ("PythonStreamingNetworkWordCount" )
14+ ssc = StreamingContext (conf = conf , duration = Seconds (1 ))
15+
16+ lines = ssc .socketTextStream (sys .argv [1 ], int (sys .argv [2 ]))
17+ words = lines .flatMap (lambda line : line .split (" " ))
18+ mapped_words = words .map (lambda word : (word , 1 ))
19+ count = mapped_words .reduceByKey (add )
20+
21+ count .pyprint ()
22+ ssc .start ()
23+ # ssc.awaitTermination()
24+ ssc .stop ()
Original file line number Diff line number Diff line change @@ -156,7 +156,6 @@ def _mergeCombiners(iterator):
156156 combiners [k ] = v
157157 else :
158158 combiners [k ] = mergeCombiners (combiners [k ], v )
159- return combiners .iteritems ()
160159
161160 return shuffled .mapPartitions (_mergeCombiners )
162161
You can’t perform that action at this time.
0 commit comments