Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 70 additions & 43 deletions sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConversions._
import scala.util.Try

import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
Expand All @@ -32,13 +33,14 @@ import parquet.hadoop._
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD._
import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
import org.apache.spark.{Partition => SparkPartition, SparkEnv, SerializableWritable, Logging, SparkException}

private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
Expand Down Expand Up @@ -233,53 +235,35 @@ private[sql] class ParquetRelation2(
override def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputFiles: Array[FileStatus]): RDD[Row] = {

val job = new Job(SparkHadoopUtil.get.conf)
val conf = ContextUtil.getConfiguration(job)

ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])

if (inputFiles.nonEmpty) {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}

// Try to push down filters when filter push-down is enabled.
if (sqlContext.conf.parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(dataSchema, _))
.reduceOption(FilterApi.and)
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
}

conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
})

conf.set(
RowWriteSupport.SPARK_ROW_SCHEMA,
ParquetTypesConverter.convertToString(dataSchema.toAttributes))

// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
inputFiles: Array[FileStatus],
broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
// Create the function to set variable Parquet confs at both driver and executor side.
val initLocalJobFuncOpt =
ParquetRelation2.initializeLocalJobFunc(
requiredColumns,
filters,
dataSchema,
useMetadataCache,
parquetFilterPushDown) _
// Create the function to set input paths at the driver side.
val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _

val footers = inputFiles.map(f => metadataCache.footers(f.getPath))

// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
// footers. Especially when a global arbitrative schema (either from metastore or data source
// DDL) is available.
new NewHadoopRDD(
sqlContext.sparkContext,
classOf[FilteringParquetRowInputFormat],
classOf[Void],
classOf[Row],
conf) {
new SqlNewHadoopRDD(
sc = sqlContext.sparkContext,
broadcastedConf = broadcastedConf,
initDriverSideJobFuncOpt = Some(setInputPaths),
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
inputFormatClass = classOf[FilteringParquetRowInputFormat],
keyClass = classOf[Void],
valueClass = classOf[Row]) {

val cacheMetadata = useMetadataCache

Expand Down Expand Up @@ -311,11 +295,11 @@ private[sql] class ParquetRelation2(
new FilteringParquetRowInputFormat
}

val jobContext = newJobContext(getConf, jobId)
val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
val rawSplits = inputFormat.getSplits(jobContext)

Array.tabulate[SparkPartition](rawSplits.size) { i =>
new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
}
}.values
Expand Down Expand Up @@ -452,6 +436,49 @@ private[sql] object ParquetRelation2 extends Logging {
// internally.
private[sql] val METASTORE_SCHEMA = "metastoreSchema"

/** This closure sets various Parquet configurations at both driver side and executor side. */
private[parquet] def initializeLocalJobFunc(
requiredColumns: Array[String],
filters: Array[Filter],
dataSchema: StructType,
useMetadataCache: Boolean,
parquetFilterPushDown: Boolean)(job: Job): Unit = {
val conf = job.getConfiguration
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[RowReadSupport].getName())

// Try to push down filters when filter push-down is enabled.
if (parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(dataSchema, _))
.reduceOption(FilterApi.and)
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
}

conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
})

conf.set(
RowWriteSupport.SPARK_ROW_SCHEMA,
ParquetTypesConverter.convertToString(dataSchema.toAttributes))

// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
}

/** This closure sets input paths at the driver side. */
private[parquet] def initializeDriverSideJobFunc(
inputFiles: Array[FileStatus])(job: Job): Unit = {
// We side the input paths at the driver side.
if (inputFiles.nonEmpty) {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}
}

private[parquet] def readSchema(
footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
footers.map { footer =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql.sources

import org.apache.spark.Logging
import org.apache.spark.{SerializableWritable, Logging}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -84,11 +85,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {

// Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
// See buildPartitionedTableScan for the reason that we need to create a shard
// broadcast HadoopConf.
val sharedHadoopConf = SparkHadoopUtil.get.conf
val confBroadcast =
t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
pruneFilterProject(
l,
projectList,
filters,
(a, f) => t.buildScan(a, f, t.paths)) :: Nil
(a, f) => t.buildScan(a, f, t.paths, confBroadcast)) :: Nil

case l @ LogicalRelation(t: TableScan) =>
createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil
Expand All @@ -115,6 +121,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val output = projections.map(_.toAttribute)
val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]

// Because we are creating one RDD per partition, we need to have a shared HadoopConf.
// Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
val sharedHadoopConf = SparkHadoopUtil.get.conf
val confBroadcast =
relation.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))

// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
// The table scan operator (PhysicalRDD) which retrieves required columns from data files.
Expand All @@ -132,7 +144,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// assuming partition columns data stored in data files are always consistent with those
// partition values encoded in partition directory paths.
val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains)
val dataRows = relation.buildScan(nonPartitionColumns, filters, Array(dir))
val dataRows =
relation.buildScan(nonPartitionColumns, filters, Array(dir), confBroadcast)

// Merges data values with partition values.
mergeWithPartitionValues(
Expand Down
Loading