@@ -44,7 +44,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
4444
4545import org .apache .mesos .MesosNativeLibrary
4646
47- import org .apache .spark .annotation .{DeveloperApi , Experimental }
47+ import org .apache .spark .annotation .{DeveloperApi , Experimental , RDDScope }
4848import org .apache .spark .broadcast .Broadcast
4949import org .apache .spark .deploy .{LocalSparkCluster , SparkHadoopUtil }
5050import org .apache .spark .executor .{ExecutorEndpoint , TriggerThreadDump }
@@ -641,6 +641,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
641641 * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
642642 * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
643643 */
644+ @ RDDScope
644645 def parallelize [T : ClassTag ](seq : Seq [T ], numSlices : Int = defaultParallelism): RDD [T ] = {
645646 assertNotStopped()
646647 new ParallelCollectionRDD [T ](this , seq, numSlices, Map [Int , Seq [String ]]())
@@ -650,13 +651,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
650651 *
651652 * This method is identical to `parallelize`.
652653 */
654+ @ RDDScope
653655 def makeRDD [T : ClassTag ](seq : Seq [T ], numSlices : Int = defaultParallelism): RDD [T ] = {
654656 parallelize(seq, numSlices)
655657 }
656658
657659 /** Distribute a local Scala collection to form an RDD, with one or more
658660 * location preferences (hostnames of Spark nodes) for each object.
659661 * Create a new partition for each collection item. */
662+ @ RDDScope
660663 def makeRDD [T : ClassTag ](seq : Seq [(T , Seq [String ])]): RDD [T ] = {
661664 assertNotStopped()
662665 val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
@@ -667,10 +670,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
667670 * Read a text file from HDFS, a local file system (available on all nodes), or any
668671 * Hadoop-supported file system URI, and return it as an RDD of Strings.
669672 */
673+ @ RDDScope
670674 def textFile (path : String , minPartitions : Int = defaultMinPartitions): RDD [String ] = {
671675 assertNotStopped()
672676 hadoopFile(path, classOf [TextInputFormat ], classOf [LongWritable ], classOf [Text ],
673- minPartitions).map(pair => pair._2.toString).setName(path)
677+ minPartitions).map(pair => pair._2.toString)
674678 }
675679
676680 /**
@@ -700,6 +704,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
700704 *
701705 * @param minPartitions A suggestion value of the minimal splitting number for input data.
702706 */
707+ @ RDDScope
703708 def wholeTextFiles (path : String , minPartitions : Int = defaultMinPartitions):
704709 RDD [(String , String )] = {
705710 assertNotStopped()
@@ -746,6 +751,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
746751 * @note Small files are preferred; very large files may cause bad performance.
747752 */
748753 @ Experimental
754+ @ RDDScope
749755 def binaryFiles (path : String , minPartitions : Int = defaultMinPartitions):
750756 RDD [(String , PortableDataStream )] = {
751757 assertNotStopped()
@@ -774,6 +780,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
774780 * @return An RDD of data with values, represented as byte arrays
775781 */
776782 @ Experimental
783+ @ RDDScope
777784 def binaryRecords (path : String , recordLength : Int , conf : Configuration = hadoopConfiguration)
778785 : RDD [Array [Byte ]] = {
779786 assertNotStopped()
@@ -811,6 +818,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
811818 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
812819 * copy them using a `map` function.
813820 */
821+ @ RDDScope
814822 def hadoopRDD [K , V ](
815823 conf : JobConf ,
816824 inputFormatClass : Class [_ <: InputFormat [K , V ]],
@@ -832,6 +840,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
832840 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
833841 * copy them using a `map` function.
834842 */
843+ @ RDDScope
835844 def hadoopFile [K , V ](
836845 path : String ,
837846 inputFormatClass : Class [_ <: InputFormat [K , V ]],
@@ -850,7 +859,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
850859 inputFormatClass,
851860 keyClass,
852861 valueClass,
853- minPartitions).setName(path)
862+ minPartitions).setName(s " HadoopRDD[ $ path] " )
854863 }
855864
856865 /**
@@ -867,6 +876,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
867876 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
868877 * copy them using a `map` function.
869878 */
879+ @ RDDScope
870880 def hadoopFile [K , V , F <: InputFormat [K , V ]]
871881 (path : String , minPartitions : Int )
872882 (implicit km : ClassTag [K ], vm : ClassTag [V ], fm : ClassTag [F ]): RDD [(K , V )] = {
@@ -891,11 +901,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
891901 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
892902 * copy them using a `map` function.
893903 */
904+ @ RDDScope
894905 def hadoopFile [K , V , F <: InputFormat [K , V ]](path : String )
895906 (implicit km : ClassTag [K ], vm : ClassTag [V ], fm : ClassTag [F ]): RDD [(K , V )] =
896907 hadoopFile[K , V , F ](path, defaultMinPartitions)
897908
898909 /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
910+ @ RDDScope
899911 def newAPIHadoopFile [K , V , F <: NewInputFormat [K , V ]]
900912 (path : String )
901913 (implicit km : ClassTag [K ], vm : ClassTag [V ], fm : ClassTag [F ]): RDD [(K , V )] = {
@@ -916,6 +928,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
916928 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
917929 * copy them using a `map` function.
918930 */
931+ @ RDDScope
919932 def newAPIHadoopFile [K , V , F <: NewInputFormat [K , V ]](
920933 path : String ,
921934 fClass : Class [F ],
@@ -949,6 +962,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
949962 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
950963 * copy them using a `map` function.
951964 */
965+ @ RDDScope
952966 def newAPIHadoopRDD [K , V , F <: NewInputFormat [K , V ]](
953967 conf : Configuration = hadoopConfiguration,
954968 fClass : Class [F ],
@@ -969,6 +983,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
969983 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
970984 * copy them using a `map` function.
971985 */
986+ @ RDDScope
972987 def sequenceFile [K , V ](path : String ,
973988 keyClass : Class [K ],
974989 valueClass : Class [V ],
@@ -987,6 +1002,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
9871002 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
9881003 * copy them using a `map` function.
9891004 * */
1005+ @ RDDScope
9901006 def sequenceFile [K , V ](path : String , keyClass : Class [K ], valueClass : Class [V ]): RDD [(K , V )] = {
9911007 assertNotStopped()
9921008 sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
@@ -1014,6 +1030,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10141030 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
10151031 * copy them using a `map` function.
10161032 */
1033+ @ RDDScope
10171034 def sequenceFile [K , V ]
10181035 (path : String , minPartitions : Int = defaultMinPartitions)
10191036 (implicit km : ClassTag [K ], vm : ClassTag [V ],
@@ -1037,6 +1054,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10371054 * though the nice thing about it is that there's very little effort required to save arbitrary
10381055 * objects.
10391056 */
1057+ @ RDDScope
10401058 def objectFile [T : ClassTag ](
10411059 path : String ,
10421060 minPartitions : Int = defaultMinPartitions
@@ -1046,13 +1064,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10461064 .flatMap(x => Utils .deserialize[Array [T ]](x._2.getBytes, Utils .getContextOrSparkClassLoader))
10471065 }
10481066
1067+ @ RDDScope
10491068 protected [spark] def checkpointFile [T : ClassTag ](
10501069 path : String
10511070 ): RDD [T ] = {
10521071 new CheckpointRDD [T ](this , path)
10531072 }
10541073
10551074 /** Build the union of a list of RDDs. */
1075+ @ RDDScope
10561076 def union [T : ClassTag ](rdds : Seq [RDD [T ]]): RDD [T ] = {
10571077 val partitioners = rdds.flatMap(_.partitioner).toSet
10581078 if (partitioners.size == 1 ) {
@@ -1063,6 +1083,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10631083 }
10641084
10651085 /** Build the union of a list of RDDs passed as variable-length arguments. */
1086+ @ RDDScope
10661087 def union [T : ClassTag ](first : RDD [T ], rest : RDD [T ]* ): RDD [T ] =
10671088 union(Seq (first) ++ rest)
10681089
0 commit comments