@@ -50,7 +50,6 @@ private[spark] class SqlNewHadoopPartition(
5050}
5151
5252/**
53- * :: DeveloperApi ::
5453 * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
5554 * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
5655 * It is based on [[org.apache.spark.rdd.NewHadoopRDD ]]. It has three additions.
@@ -60,13 +59,10 @@ private[spark] class SqlNewHadoopPartition(
6059 * 3. An optional closure `initLocalJobFuncOpt` that set configurations at both the driver side
6160 * and the executor side to the shared Hadoop Configuration.
6261 *
63- * @param sc The SparkContext to associate the RDD with.
64- * @param inputFormatClass Storage format of the data to be read.
65- * @param keyClass Class of the key associated with the inputFormatClass.
66- * @param valueClass Class of the value associated with the inputFormatClass.
67- * @param conf The Hadoop configuration.
62+ * Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD ]] with
63+ * changes based on [[org.apache.spark.rdd.HadoopRDD ]]. In future, this functionality will be
64+ * folded into core.
6865 */
69- @ DeveloperApi
7066private [sql] class SqlNewHadoopRDD [K , V ](
7167 @ transient sc : SparkContext ,
7268 broadcastedConf : Broadcast [SerializableWritable [Configuration ]],
@@ -85,11 +81,22 @@ private[sql] class SqlNewHadoopRDD[K, V](
8581
8682 protected def getJob (): Job = {
8783 val conf : Configuration = broadcastedConf.value.value
84+ // "new Job" will make a copy of the conf. Then, it is
85+ // safe to mutate conf properties with initLocalJobFuncOpt
86+ // and initDriverSideJobFuncOpt.
8887 val newJob = new Job (conf)
8988 initLocalJobFuncOpt.map(f => f(newJob))
9089 newJob
9190 }
9291
92+ def getConf (isDriverSide : Boolean ): Configuration = {
93+ val job = getJob()
94+ if (isDriverSide) {
95+ initDriverSideJobFuncOpt.map(f => f(job))
96+ }
97+ job.getConfiguration
98+ }
99+
93100 private val jobTrackerId : String = {
94101 val formatter = new SimpleDateFormat (" yyyyMMddHHmm" )
95102 formatter.format(new Date ())
@@ -235,14 +242,6 @@ private[sql] class SqlNewHadoopRDD[K, V](
235242 }
236243 super .persist(storageLevel)
237244 }
238-
239- def getConf (isDriverSide : Boolean ): Configuration = {
240- val job = getJob()
241- if (isDriverSide) {
242- initDriverSideJobFuncOpt.map(f => f(job))
243- }
244- job.getConfiguration
245- }
246245}
247246
248247private [spark] object SqlNewHadoopRDD {
0 commit comments