@@ -59,8 +59,7 @@ class SparkContext(object):
5959 _writeToFile = None
6060 _next_accum_id = 0
6161 _active_spark_context = None
62- _lock = Lock ()
63- # zip and egg files that need to be added to PYTHONPATH
62+ _lock = Lock () # zip and egg files that need to be added to PYTHONPATH
6463 _python_includes = None
6564 _default_batch_size_for_serialized_input = 10
6665
@@ -101,15 +100,13 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
101100 self ._callsite = rdd ._extract_concise_traceback ()
102101 else :
103102 tempNamedTuple = namedtuple ("Callsite" , "function file linenum" )
104- self ._callsite = tempNamedTuple (
105- function = None , file = None , linenum = None )
103+ self ._callsite = tempNamedTuple (function = None , file = None , linenum = None )
106104 SparkContext ._ensure_initialized (self , gateway = gateway )
107105 try :
108106 self ._do_init (master , appName , sparkHome , pyFiles , environment , batchSize , serializer ,
109107 conf )
110108 except :
111- # If an error occurs, clean up in order to allow future
112- # SparkContext creation:
109+ # If an error occurs, clean up in order to allow future SparkContext creation:
113110 self .stop ()
114111 raise
115112
@@ -142,8 +139,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
142139 if not self ._conf .contains ("spark.master" ):
143140 raise Exception ("A master URL must be set in your configuration" )
144141 if not self ._conf .contains ("spark.app.name" ):
145- raise Exception (
146- "An application name must be set in your configuration" )
142+ raise Exception ("An application name must be set in your configuration" )
147143
148144 # Read back our properties from the conf in case we loaded some of them from
149145 # the classpath or an external config file
@@ -184,8 +180,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
184180 self .addPyFile (path )
185181
186182 # Deploy code dependencies set by spark-submit; these will already have been added
187- # with SparkContext.addFile, so we just need to add them to the
188- # PYTHONPATH
183+ # with SparkContext.addFile, so we just need to add them to the PYTHONPATH
189184 for path in self ._conf .get ("spark.submit.pyFiles" , "" ).split ("," ):
190185 if path != "" :
191186 (dirname , filename ) = os .path .split (path )
@@ -195,11 +190,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
195190 sys .path .append (dirname )
196191
197192 # Create a temporary directory inside spark.local.dir:
198- local_dir = self ._jvm .org .apache .spark .util .Utils .getLocalDir (
199- self ._jsc .sc ().conf ())
193+ local_dir = self ._jvm .org .apache .spark .util .Utils .getLocalDir (self ._jsc .sc ().conf ())
200194 self ._temp_dir = \
201- self ._jvm .org .apache .spark .util .Utils .createTempDir (
202- local_dir ).getAbsolutePath ()
195+ self ._jvm .org .apache .spark .util .Utils .createTempDir (local_dir ).getAbsolutePath ()
203196
204197 def _initialize_context (self , jconf ):
205198 """
@@ -292,8 +285,7 @@ def parallelize(self, c, numSlices=None):
292285 # because it sends O(n) Py4J commands. As an alternative, serialized
293286 # objects are written to a file and loaded through textFile().
294287 tempFile = NamedTemporaryFile (delete = False , dir = self ._temp_dir )
295- # Make sure we distribute data evenly if it's smaller than
296- # self.batchSize
288+ # Make sure we distribute data evenly if it's smaller than self.batchSize
297289 if "__len__" not in dir (c ):
298290 c = list (c ) # Make it a list so we can compute its length
299291 batchSize = min (len (c ) // numSlices , self ._batchSize )
@@ -412,10 +404,8 @@ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
412404 Java object. (default sc._default_batch_size_for_serialized_input)
413405 """
414406 minSplits = minSplits or min (self .defaultParallelism , 2 )
415- batchSize = max (
416- 1 , batchSize or self ._default_batch_size_for_serialized_input )
417- ser = BatchedSerializer (PickleSerializer ()) if (
418- batchSize > 1 ) else PickleSerializer ()
407+ batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
408+ ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
419409 jrdd = self ._jvm .PythonRDD .sequenceFile (self ._jsc , path , keyClass , valueClass ,
420410 keyConverter , valueConverter , minSplits , batchSize )
421411 return RDD (jrdd , self , ser )
@@ -445,13 +435,11 @@ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConv
445435 Java object. (default sc._default_batch_size_for_serialized_input)
446436 """
447437 jconf = self ._dictToJavaMap (conf )
448- batchSize = max (
449- 1 , batchSize or self ._default_batch_size_for_serialized_input )
450- ser = BatchedSerializer (PickleSerializer ()) if (
451- batchSize > 1 ) else PickleSerializer ()
452- jrdd = self ._jvm .PythonRDD .newAPIHadoopFile (
453- self ._jsc , path , inputFormatClass , keyClass ,
454- valueClass , keyConverter , valueConverter , jconf , batchSize )
438+ batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
439+ ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
440+ jrdd = self ._jvm .PythonRDD .newAPIHadoopFile (self ._jsc , path , inputFormatClass , keyClass ,
441+ valueClass , keyConverter , valueConverter ,
442+ jconf , batchSize )
455443 return RDD (jrdd , self , ser )
456444
457445 def newAPIHadoopRDD (self , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -476,13 +464,11 @@ def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=N
476464 Java object. (default sc._default_batch_size_for_serialized_input)
477465 """
478466 jconf = self ._dictToJavaMap (conf )
479- batchSize = max (
480- 1 , batchSize or self ._default_batch_size_for_serialized_input )
481- ser = BatchedSerializer (PickleSerializer ()) if (
482- batchSize > 1 ) else PickleSerializer ()
483- jrdd = self ._jvm .PythonRDD .newAPIHadoopRDD (
484- self ._jsc , inputFormatClass , keyClass ,
485- valueClass , keyConverter , valueConverter , jconf , batchSize )
467+ batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
468+ ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
469+ jrdd = self ._jvm .PythonRDD .newAPIHadoopRDD (self ._jsc , inputFormatClass , keyClass ,
470+ valueClass , keyConverter , valueConverter ,
471+ jconf , batchSize )
486472 return RDD (jrdd , self , ser )
487473
488474 def hadoopFile (self , path , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -510,13 +496,11 @@ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=
510496 Java object. (default sc._default_batch_size_for_serialized_input)
511497 """
512498 jconf = self ._dictToJavaMap (conf )
513- batchSize = max (
514- 1 , batchSize or self ._default_batch_size_for_serialized_input )
515- ser = BatchedSerializer (PickleSerializer ()) if (
516- batchSize > 1 ) else PickleSerializer ()
517- jrdd = self ._jvm .PythonRDD .hadoopFile (
518- self ._jsc , path , inputFormatClass , keyClass ,
519- valueClass , keyConverter , valueConverter , jconf , batchSize )
499+ batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
500+ ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
501+ jrdd = self ._jvm .PythonRDD .hadoopFile (self ._jsc , path , inputFormatClass , keyClass ,
502+ valueClass , keyConverter , valueConverter ,
503+ jconf , batchSize )
520504 return RDD (jrdd , self , ser )
521505
522506 def hadoopRDD (self , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -541,12 +525,11 @@ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
541525 Java object. (default sc._default_batch_size_for_serialized_input)
542526 """
543527 jconf = self ._dictToJavaMap (conf )
544- batchSize = max (
545- 1 , batchSize or self ._default_batch_size_for_serialized_input )
546- ser = BatchedSerializer (PickleSerializer ()) if (
547- batchSize > 1 ) else PickleSerializer ()
548- jrdd = self ._jvm .PythonRDD .hadoopRDD (self ._jsc , inputFormatClass , keyClass , valueClass ,
549- keyConverter , valueConverter , jconf , batchSize )
528+ batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
529+ ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
530+ jrdd = self ._jvm .PythonRDD .hadoopRDD (self ._jsc , inputFormatClass , keyClass ,
531+ valueClass , keyConverter , valueConverter ,
532+ jconf , batchSize )
550533 return RDD (jrdd , self , ser )
551534
552535 def _checkpointFile (self , name , input_deserializer ):
@@ -577,8 +560,7 @@ def union(self, rdds):
577560 first = rdds [0 ]._jrdd
578561 rest = [x ._jrdd for x in rdds [1 :]]
579562 rest = ListConverter ().convert (rest , self ._gateway ._gateway_client )
580- return RDD (self ._jsc .union (first , rest ), self ,
581- rdds [0 ]._jrdd_deserializer )
563+ return RDD (self ._jsc .union (first , rest ), self , rdds [0 ]._jrdd_deserializer )
582564
583565 def broadcast (self , value ):
584566 """
@@ -590,8 +572,7 @@ def broadcast(self, value):
590572 pickleSer = PickleSerializer ()
591573 pickled = pickleSer .dumps (value )
592574 jbroadcast = self ._jsc .broadcast (bytearray (pickled ))
593- return Broadcast (jbroadcast .id (), value , jbroadcast ,
594- self ._pickled_broadcast_vars )
575+ return Broadcast (jbroadcast .id (), value , jbroadcast , self ._pickled_broadcast_vars )
595576
596577 def accumulator (self , value , accum_param = None ):
597578 """
@@ -609,8 +590,7 @@ def accumulator(self, value, accum_param=None):
609590 elif isinstance (value , complex ):
610591 accum_param = accumulators .COMPLEX_ACCUMULATOR_PARAM
611592 else :
612- raise Exception (
613- "No default accumulator param for type %s" % type (value ))
593+ raise Exception ("No default accumulator param for type %s" % type (value ))
614594 SparkContext ._next_accum_id += 1
615595 return Accumulator (SparkContext ._next_accum_id - 1 , value , accum_param )
616596
@@ -655,14 +635,12 @@ def addPyFile(self, path):
655635 HTTP, HTTPS or FTP URI.
656636 """
657637 self .addFile (path )
658- # dirname may be directory or HDFS/S3 prefix
659- (dirname , filename ) = os .path .split (path )
638+ (dirname , filename ) = os .path .split (path ) # dirname may be directory or HDFS/S3 prefix
660639
661640 if filename .endswith ('.zip' ) or filename .endswith ('.ZIP' ) or filename .endswith ('.egg' ):
662641 self ._python_includes .append (filename )
663642 # for tests in local mode
664- sys .path .append (
665- os .path .join (SparkFiles .getRootDirectory (), filename ))
643+ sys .path .append (os .path .join (SparkFiles .getRootDirectory (), filename ))
666644
667645 def setCheckpointDir (self , dirName ):
668646 """
@@ -676,8 +654,7 @@ def _getJavaStorageLevel(self, storageLevel):
676654 Returns a Java StorageLevel based on a pyspark.StorageLevel.
677655 """
678656 if not isinstance (storageLevel , StorageLevel ):
679- raise Exception (
680- "storageLevel must be of type pyspark.StorageLevel" )
657+ raise Exception ("storageLevel must be of type pyspark.StorageLevel" )
681658
682659 newStorageLevel = self ._jvm .org .apache .spark .storage .StorageLevel
683660 return newStorageLevel (storageLevel .useDisk ,
@@ -780,15 +757,13 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
780757 """
781758 if partitions is None :
782759 partitions = range (rdd ._jrdd .partitions ().size ())
783- javaPartitions = ListConverter ().convert (
784- partitions , self ._gateway ._gateway_client )
760+ javaPartitions = ListConverter ().convert (partitions , self ._gateway ._gateway_client )
785761
786762 # Implementation note: This is implemented as a mapPartitions followed
787763 # by runJob() in order to avoid having to pass a Python lambda into
788764 # SparkContext#runJob.
789765 mappedRDD = rdd .mapPartitions (partitionFunc )
790- it = self ._jvm .PythonRDD .runJob (
791- self ._jsc .sc (), mappedRDD ._jrdd , javaPartitions , allowLocal )
766+ it = self ._jvm .PythonRDD .runJob (self ._jsc .sc (), mappedRDD ._jrdd , javaPartitions , allowLocal )
792767 return list (mappedRDD ._collect_iterator_through_file (it ))
793768
794769
@@ -800,8 +775,7 @@ def _test():
800775 globs ['sc' ] = SparkContext ('local[4]' , 'PythonTest' , batchSize = 2 )
801776 globs ['tempdir' ] = tempfile .mkdtemp ()
802777 atexit .register (lambda : shutil .rmtree (globs ['tempdir' ]))
803- (failure_count , test_count ) = doctest .testmod (
804- globs = globs , optionflags = doctest .ELLIPSIS )
778+ (failure_count , test_count ) = doctest .testmod (globs = globs , optionflags = doctest .ELLIPSIS )
805779 globs ['sc' ].stop ()
806780 if failure_count :
807781 exit (- 1 )
0 commit comments