Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
/**
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to report statistics to Spark.
*
* Statistics are reported to the optimizer before a projection or any filters are pushed to the
* DataSourceReader. Implementations that return more accurate statistics based on projection and
* filters will not improve query performance until the planner can push operators before getting
* stats.
*/
@InterfaceStability.Evolving
public interface SupportsReportStatistics extends DataSourceReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
import org.apache.spark.sql.execution.datasources.v2.PushDownOperatorsToDataSource
import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate

class SparkOptimizer(
Expand All @@ -32,8 +31,7 @@ class SparkOptimizer(
override def batches: Seq[Batch] = (preOptimizationBatches ++ super.batches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
Batch("Push down operators to data source scan", Once, PushDownOperatorsToDataSource)) ++
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++
postHocOptimizationBatches :+
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
Expand All @@ -32,79 +31,35 @@ import org.apache.spark.sql.types.StructType

case class DataSourceV2Relation(
source: DataSourceV2,
output: Seq[AttributeReference],
options: Map[String, String],
projection: Seq[AttributeReference],
filters: Option[Seq[Expression]] = None,
userSpecifiedSchema: Option[StructType] = None)
extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {

import DataSourceV2Relation._

override def simpleString: String = "RelationV2 " + metadataString

override lazy val schema: StructType = reader.readSchema()

override lazy val output: Seq[AttributeReference] = {
// use the projection attributes to avoid assigning new ids. fields that are not projected
// will be assigned new ids, which is okay because they are not projected.
val attrMap = projection.map(a => a.name -> a).toMap
schema.map(f => attrMap.getOrElse(f.name,
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
}

private lazy val v2Options: DataSourceOptions = makeV2Options(options)
override def pushedFilters: Seq[Expression] = Seq.empty

// postScanFilters: filters that need to be evaluated after the scan.
// pushedFilters: filters that will be pushed down and evaluated in the underlying data sources.
// Note: postScanFilters and pushedFilters can overlap, e.g. the parquet row group filter.
lazy val (
reader: DataSourceReader,
postScanFilters: Seq[Expression],
pushedFilters: Seq[Expression]) = {
val newReader = userSpecifiedSchema match {
case Some(s) =>
source.asReadSupportWithSchema.createReader(s, v2Options)
case _ =>
source.asReadSupport.createReader(v2Options)
}

DataSourceV2Relation.pushRequiredColumns(newReader, projection.toStructType)

val (postScanFilters, pushedFilters) = filters match {
case Some(filterSeq) =>
DataSourceV2Relation.pushFilters(newReader, filterSeq)
case _ =>
(Nil, Nil)
}
logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")

(newReader, postScanFilters, pushedFilters)
}

override def doCanonicalize(): LogicalPlan = {
val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]
override def simpleString: String = "RelationV2 " + metadataString

// override output with canonicalized output to avoid attempting to configure a reader
val canonicalOutput: Seq[AttributeReference] = this.output
.map(a => QueryPlan.normalizeExprId(a, projection))
lazy val v2Options: DataSourceOptions = makeV2Options(options)

new DataSourceV2Relation(c.source, c.options, c.projection) {
override lazy val output: Seq[AttributeReference] = canonicalOutput
}
def newReader: DataSourceReader = userSpecifiedSchema match {
case Some(userSchema) =>
source.asReadSupportWithSchema.createReader(userSchema, v2Options)
case None =>
source.asReadSupport.createReader(v2Options)
}

override def computeStats(): Statistics = reader match {
override def computeStats(): Statistics = newReader match {
case r: SupportsReportStatistics =>
Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}

override def newInstance(): DataSourceV2Relation = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to override newInstance now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that initially, but the canonicalization test was failing without this.

// projection is used to maintain id assignment.
// if projection is not set, use output so the copy is not equal to the original
copy(projection = projection.map(_.newInstance()))
copy(output = output.map(_.newInstance()))
}
}

Expand Down Expand Up @@ -206,21 +161,27 @@ object DataSourceV2Relation {
def create(
source: DataSourceV2,
options: Map[String, String],
filters: Option[Seq[Expression]] = None,
userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
val projection = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes
DataSourceV2Relation(source, options, projection, filters, userSpecifiedSchema)
val output = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes
DataSourceV2Relation(source, output, options, userSpecifiedSchema)
}

private def pushRequiredColumns(reader: DataSourceReader, struct: StructType): Unit = {
def pushRequiredColumns(
relation: DataSourceV2Relation,
reader: DataSourceReader,
struct: StructType): Seq[AttributeReference] = {
reader match {
case projectionSupport: SupportsPushDownRequiredColumns =>
projectionSupport.pruneColumns(struct)
// return the output columns from the relation that were projected
val attrMap = relation.output.map(a => a.name -> a).toMap
projectionSupport.readSchema().map(f => attrMap(f.name))
case _ =>
relation.output
}
}

private def pushFilters(
def pushFilters(
reader: DataSourceReader,
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
reader match {
Expand Down Expand Up @@ -248,7 +209,7 @@ object DataSourceV2Relation {
// the data source cannot guarantee the rows returned can pass these filters.
// As a result we must return it so Spark can plan an extra filter operator.
val postScanFilters =
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
// The filters which are marked as pushed to this data source
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,56 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.Strategy
import org.apache.spark.sql.{execution, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}

object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: DataSourceV2Relation =>
DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val projectSet = AttributeSet(project.flatMap(_.references))
val filterSet = AttributeSet(filters.flatMap(_.references))

val projection = if (filterSet.subsetOf(projectSet) &&
AttributeSet(relation.output) == projectSet) {
// When the required projection contains all of the filter columns and column pruning alone
// can produce the required projection, push the required projection.
// A final projection may still be needed if the data source produces a different column
// order or if it cannot prune all of the nested columns.
relation.output
} else {
// When there are filter columns not already in the required projection or when the required
// projection is more complicated than column pruning, base column pruning on the set of
// all columns needed by both.
(projectSet ++ filterSet).toSeq
}

val reader = relation.newReader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to confirm, do we have to do operator pushdown twice now? One in the plan visitor to calculate statistics, one here to build the physical plan, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will configure two readers. One for the pushdown when converting to a physical plan and one for stats. The stats one should be temporary, though, since we want to address the problem. Configuring two readers instead of one allows us to decouple the problems so we can move forward with pushdown that works like the other data sources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's nice to decouple the problem and do pushdown during planning, but I feel the cost is too high in this approach. For file-based data sources, we need to query hive metastore to apply partitioning pruning during filter pushdown, and this can be very expensive. Doing it twice looks scaring to me.

cc @gatorsmile @dongjoon-hyun @mallman , please correct me if I have a wrong understanding.

also cc @wzhfy do you have an estimation about how long it takes to move statistics to physical plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, there's nothing forcing other data sources to implement the new trait. Other sources can continue to report stats for the entire table and not account for filters (the code assumes that row counts don't change). This just opens the option of reporting stats that are more accurate using the filters and projection that will be pushed.

Ideally, I think that stats-based decisions would happen after pushdown so we get data that is as accurate as possible. But for now, this fixes the regression for v2 sources that happens because we move pushdown to a later step (conversion to physical plan like the other sources).

Copy link
Contributor

@cloud-fan cloud-fan Jun 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's nothing forcing other data sources to implement the new trait ...

hmmm, I'm a little confused here. All v2 data sources (will be DataSourceV2Relation) would have to apply pushdown twice right? Or are you suggesting we should not migrate file-based data source to data source v2?

Copy link
Contributor Author

@rdblue rdblue Jun 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind either option #1 or #2. #2 is basically what happens for non-v2 data sources right now. Plus, both should be temporary.

I think it is a bad idea to continue with hacky code that uses the reader in the logical plan. It is much cleaner otherwise and we've spent too much time making sure that everything still works. The main example that comes to mind is setting the requested projection and finding out what output is using pushdown. I think hacks are slowing progress on the v2 sources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the second proposal is what happens for the v1 data sources. For file-based data source we kind of pick the third proposal and add an optimizer rule PruneFileSourcePartitions to push down some of the filters to data source at the logical phase, to get precise stats.

I'd like to pick from the 2nd and 3rd proposals(the 3rd proposal is also temporary, before we move stats to physical plan). Applying pushdown twice is hard to workaround(need to cache), while we can keep the PruneFileSourcePartitions rule to work around the issue in 2nd proposal for file-based data sources.

Let's also get more inputs from other people.

Copy link
Contributor

@jose-torres jose-torres Jun 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not strongly opposed to any of the options, but based on the description above 2 would be my choice if I had to pick one. A temporary state where functionality is missing is easier to reason about than temporary states where we deliberately impose a fuzzy lifecycle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue do you have time to prepare a PR for the 2rd proposal? I can do that too if you are busy with other stuff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, no problem. I can just remove the stats commit from this one.


val output = DataSourceV2Relation.pushRequiredColumns(relation, reader,
projection.asInstanceOf[Seq[AttributeReference]].toStructType)

val (postScanFilters, pushedFilters) = DataSourceV2Relation.pushFilters(reader, filters)

logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")

val scan = DataSourceV2ScanExec(
output, relation.source, relation.options, pushedFilters, reader)

val filter = postScanFilters.reduceLeftOption(And)
val withFilter = filter.map(execution.FilterExec(_, scan)).getOrElse(scan)

val withProjection = if (withFilter.output != project) {
execution.ProjectExec(project, withFilter)
} else {
withFilter
}

withProjection :: Nil

case r: StreamingDataSourceV2Relation =>
DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -323,21 +323,22 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
}

test("SPARK-23315: get output from canonicalized data source v2 related plans") {
def checkCanonicalizedOutput(df: DataFrame, numOutput: Int): Unit = {
def checkCanonicalizedOutput(
df: DataFrame, logicalNumOutput: Int, physicalNumOutput: Int): Unit = {
val logical = df.queryExecution.optimizedPlan.collect {
case d: DataSourceV2Relation => d
}.head
assert(logical.canonicalized.output.length == numOutput)
assert(logical.canonicalized.output.length == logicalNumOutput)

val physical = df.queryExecution.executedPlan.collect {
case d: DataSourceV2ScanExec => d
}.head
assert(physical.canonicalized.output.length == numOutput)
assert(physical.canonicalized.output.length == physicalNumOutput)
}

val df = spark.read.format(classOf[AdvancedDataSourceV2].getName).load()
checkCanonicalizedOutput(df, 2)
checkCanonicalizedOutput(df.select('i), 1)
checkCanonicalizedOutput(df, 2, 2)
checkCanonicalizedOutput(df.select('i), 2, 1)
}
}

Expand Down