@@ -23,6 +23,7 @@ import scala.collection.JavaConversions._
2323import scala .util .Try
2424
2525import com .google .common .base .Objects
26+ import org .apache .hadoop .conf .Configuration
2627import org .apache .hadoop .fs .{FileStatus , Path }
2728import org .apache .hadoop .io .Writable
2829import org .apache .hadoop .mapreduce ._
@@ -32,13 +33,14 @@ import parquet.hadoop._
3233import parquet .hadoop .metadata .CompressionCodecName
3334import parquet .hadoop .util .ContextUtil
3435
36+ import org .apache .spark .broadcast .Broadcast
3537import org .apache .spark .deploy .SparkHadoopUtil
3638import org .apache .spark .rdd .RDD ._
37- import org .apache .spark .rdd .{ NewHadoopPartition , NewHadoopRDD , RDD }
39+ import org .apache .spark .rdd .RDD
3840import org .apache .spark .sql .sources ._
3941import org .apache .spark .sql .types .{DataType , StructType }
4042import org .apache .spark .sql .{Row , SQLConf , SQLContext }
41- import org .apache .spark .{Logging , Partition => SparkPartition , SparkException }
43+ import org .apache .spark .{Partition => SparkPartition , SparkEnv , SerializableWritable , Logging , SparkException }
4244
4345private [sql] class DefaultSource extends HadoopFsRelationProvider {
4446 override def createRelation (
@@ -233,53 +235,35 @@ private[sql] class ParquetRelation2(
233235 override def buildScan (
234236 requiredColumns : Array [String ],
235237 filters : Array [Filter ],
236- inputFiles : Array [FileStatus ]): RDD [Row ] = {
237-
238- val job = new Job (SparkHadoopUtil .get.conf)
239- val conf = ContextUtil .getConfiguration(job)
240-
241- ParquetInputFormat .setReadSupportClass(job, classOf [RowReadSupport ])
242-
243- if (inputFiles.nonEmpty) {
244- FileInputFormat .setInputPaths(job, inputFiles.map(_.getPath): _* )
245- }
246-
247- // Try to push down filters when filter push-down is enabled.
248- if (sqlContext.conf.parquetFilterPushDown) {
249- filters
250- // Collects all converted Parquet filter predicates. Notice that not all predicates can be
251- // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
252- // is used here.
253- .flatMap(ParquetFilters .createFilter(dataSchema, _))
254- .reduceOption(FilterApi .and)
255- .foreach(ParquetInputFormat .setFilterPredicate(conf, _))
256- }
257-
258- conf.set(RowReadSupport .SPARK_ROW_REQUESTED_SCHEMA , {
259- val requestedSchema = StructType (requiredColumns.map(dataSchema(_)))
260- ParquetTypesConverter .convertToString(requestedSchema.toAttributes)
261- })
262-
263- conf.set(
264- RowWriteSupport .SPARK_ROW_SCHEMA ,
265- ParquetTypesConverter .convertToString(dataSchema.toAttributes))
266-
267- // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
238+ inputFiles : Array [FileStatus ],
239+ broadcastedConf : Broadcast [SerializableWritable [Configuration ]]): RDD [Row ] = {
268240 val useMetadataCache = sqlContext.getConf(SQLConf .PARQUET_CACHE_METADATA , " true" ).toBoolean
269- conf.set(SQLConf .PARQUET_CACHE_METADATA , useMetadataCache.toString)
241+ val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
242+ // Create the function to set variable Parquet confs at both driver and executor side.
243+ val initLocalJobFuncOpt =
244+ ParquetRelation2 .initializeLocalJobFunc(
245+ requiredColumns,
246+ filters,
247+ dataSchema,
248+ useMetadataCache,
249+ parquetFilterPushDown) _
250+ // Create the function to set input paths at the driver side.
251+ val setInputPaths = ParquetRelation2 .initializeDriverSideJobFunc(inputFiles) _
270252
271253 val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
272254
273255 // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
274256 // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
275257 // footers. Especially when a global arbitrative schema (either from metastore or data source
276258 // DDL) is available.
277- new NewHadoopRDD (
278- sqlContext.sparkContext,
279- classOf [FilteringParquetRowInputFormat ],
280- classOf [Void ],
281- classOf [Row ],
282- conf) {
259+ new SqlNewHadoopRDD (
260+ sc = sqlContext.sparkContext,
261+ broadcastedConf = broadcastedConf,
262+ initDriverSideJobFuncOpt = Some (setInputPaths),
263+ initLocalJobFuncOpt = Some (initLocalJobFuncOpt),
264+ inputFormatClass = classOf [FilteringParquetRowInputFormat ],
265+ keyClass = classOf [Void ],
266+ valueClass = classOf [Row ]) {
283267
284268 val cacheMetadata = useMetadataCache
285269
@@ -311,11 +295,11 @@ private[sql] class ParquetRelation2(
311295 new FilteringParquetRowInputFormat
312296 }
313297
314- val jobContext = newJobContext(getConf, jobId)
298+ val jobContext = newJobContext(getConf(isDriverSide = true ) , jobId)
315299 val rawSplits = inputFormat.getSplits(jobContext)
316300
317301 Array .tabulate[SparkPartition ](rawSplits.size) { i =>
318- new NewHadoopPartition (id, i, rawSplits(i).asInstanceOf [InputSplit with Writable ])
302+ new SqlNewHadoopPartition (id, i, rawSplits(i).asInstanceOf [InputSplit with Writable ])
319303 }
320304 }
321305 }.values
@@ -452,6 +436,49 @@ private[sql] object ParquetRelation2 extends Logging {
452436 // internally.
453437 private [sql] val METASTORE_SCHEMA = " metastoreSchema"
454438
439+ /** This closure sets various Parquet configurations at both driver side and executor side. */
440+ private [parquet] def initializeLocalJobFunc (
441+ requiredColumns : Array [String ],
442+ filters : Array [Filter ],
443+ dataSchema : StructType ,
444+ useMetadataCache : Boolean ,
445+ parquetFilterPushDown : Boolean )(job : Job ): Unit = {
446+ val conf = job.getConfiguration
447+ conf.set(ParquetInputFormat .READ_SUPPORT_CLASS , classOf [RowReadSupport ].getName())
448+
449+ // Try to push down filters when filter push-down is enabled.
450+ if (parquetFilterPushDown) {
451+ filters
452+ // Collects all converted Parquet filter predicates. Notice that not all predicates can be
453+ // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
454+ // is used here.
455+ .flatMap(ParquetFilters .createFilter(dataSchema, _))
456+ .reduceOption(FilterApi .and)
457+ .foreach(ParquetInputFormat .setFilterPredicate(conf, _))
458+ }
459+
460+ conf.set(RowReadSupport .SPARK_ROW_REQUESTED_SCHEMA , {
461+ val requestedSchema = StructType (requiredColumns.map(dataSchema(_)))
462+ ParquetTypesConverter .convertToString(requestedSchema.toAttributes)
463+ })
464+
465+ conf.set(
466+ RowWriteSupport .SPARK_ROW_SCHEMA ,
467+ ParquetTypesConverter .convertToString(dataSchema.toAttributes))
468+
469+ // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
470+ conf.set(SQLConf .PARQUET_CACHE_METADATA , useMetadataCache.toString)
471+ }
472+
473+ /** This closure sets input paths at the driver side. */
474+ private [parquet] def initializeDriverSideJobFunc (
475+ inputFiles : Array [FileStatus ])(job : Job ): Unit = {
476+ // We side the input paths at the driver side.
477+ if (inputFiles.nonEmpty) {
478+ FileInputFormat .setInputPaths(job, inputFiles.map(_.getPath): _* )
479+ }
480+ }
481+
455482 private [parquet] def readSchema (
456483 footers : Seq [Footer ], sqlContext : SQLContext ): Option [StructType ] = {
457484 footers.map { footer =>
0 commit comments