Skip to content

Conversation

huaxingao
Copy link
Contributor

What changes were proposed in this pull request?

Remove fullOutput from RowDataSourceScanExec

Why are the changes needed?

RowDataSourceScanExec requires the full output instead of the scan output after column pruning. However, in v2 code path, we don't have the full output anymore so we just pass the pruned output. RowDataSourceScanExec.fullOutput is actually meaningless so we should remove it.

Does this PR introduce any user-facing change?

No

How was this patch tested?

existing tests

@SparkQA
Copy link

SparkQA commented Aug 12, 2020

Test build #127371 has finished for PR 29415 at commit 9558823.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@huaxingao
Copy link
Contributor Author

cc @cloud-fan @MaxGekk @viirya


/** Physical plan node for scanning data from a relation. */
case class RowDataSourceScanExec(
fullOutput: Seq[Attribute],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you find out the PR that added it? I can't quite remember why we have it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was introduced in #18600 for plan equality comparison.
I manually print out the two canonicalized plans for df1 and df2 in https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala#L68 to check my change.
Before my change:

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#25]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [none#0,none#1] PushedFilters: [], ReadSchema: structnone:int,none:int

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#52]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [none#0,none#2] PushedFilters: [], ReadSchema: structnone:int,none:int

After my change :

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#25]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [A#0,B#1] PushedFilters: [], ReadSchema: struct<A:int,B:int>

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#52]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [A#0,C#2] PushedFilters: [], ReadSchema: struct<A:int,C:int>

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fullOutput seems having no actual usage except for plan comparison. If we can make sure we don't break it, looks ok to remove fullOutput.

// Don't care about `rdd` and `tableIdentifier` when canonicalizing.
override def doCanonicalize(): SparkPlan =
copy(
fullOutput.map(QueryPlan.normalizeExpressions(_, fullOutput)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to normalize output now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileSourceScanExec does it as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to add requiredSchema to RowDataSourceScanExec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't know that we need to use the normalized exprId in canonicalized plan. If we do, then probably we can't remove fullOutput from RowDataSourceScanExec, because using the normalized pruned output would cause problem. For example, in https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala#L68, normalized the pruned output will give none#0,none#1 for both df1 and df2, and then both of them have exactly the same plan

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#110]
   +- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
      +- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [none#0,none#1] PushedFilters: [], ReadSchema: struct<none:int,none:int>

Then in df1.union(df2), it takes ReusedExchange code path since both plans are equal

== Physical Plan ==
Union
:- *(2) HashAggregate(keys=[a#0], functions=[min(b#1)], output=[a#0, min(b)#12])
:  +- Exchange hashpartitioning(a#0, 5), true, [id=#34]
:     +- *(1) HashAggregate(keys=[a#0], functions=[partial_min(b#1)], output=[a#0, min#28])
:        +- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [A#0,B#1] PushedFilters: [], ReadSchema: struct<A:int,B:int>
+- *(4) HashAggregate(keys=[a#0], functions=[min(c#2)], output=[a#0, min(c)#24])
   +- ReusedExchange [a#0, min#30], Exchange hashpartitioning(a#0, 5), true, [id=#34]

The union result will be

+---+------+
|  a|min(b)|
+---+------+
|  1|     2|
|  1|     2|
+---+------+

instead of

+---+------+
|  a|min(b)|
+---+------+
|  1|     2|
|  1|     3|
+---+------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea that's why I propose to add requiredSchema, like what FileSourceScanExec does. But I'm not sure how hard it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I added requiredSchema, could you please take a look to see if that's what you want?

@SparkQA
Copy link

SparkQA commented Aug 13, 2020

Test build #127416 has finished for PR 29415 at commit bd58665.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 14003d4 Aug 14, 2020
@huaxingao
Copy link
Contributor Author

Thanks! @cloud-fan @viirya

@huaxingao huaxingao deleted the rm_full_output branch August 14, 2020 15:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants