Skip to content

Conversation

ghost
Copy link

@ghost ghost commented Jun 5, 2014

spark-submit and run-example are in the same location.
if the code is "./bin/spark-submit ", following appears
"./run-example: line 55: ./bin/spark-submit: No such file or directory"
So modify it as "./spark-submit "

spark-submit and run-example are in the same location.
if the code is "./bin/spark-submit ", following appears 
"./run-example: line 55: ./bin/spark-submit: No such file or directory"
So modify it as "./spark-submit \"
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

sorry, this is not a bug
@ghost
Copy link
Author

ghost commented Jun 5, 2014

something wrong with my patch

@ghost ghost closed this Jun 5, 2014
cloud-fan pushed a commit that referenced this pull request Jun 21, 2021
…subquery reuse

### What changes were proposed in this pull request?
This PR:
1. Fixes an issue in `ReuseExchange` rule that can result a `ReusedExchange` node pointing to an invalid exchange. This can happen due to the 2 separate traversals in `ReuseExchange` when the 2nd traversal modifies an exchange that has already been referenced (reused) in the 1st traversal.
   Consider the following query:
   ```
   WITH t AS (
     SELECT df1.id, df2.k
     FROM df1 JOIN df2 ON df1.k = df2.k
     WHERE df2.id < 2
   )
   SELECT * FROM t AS a JOIN t AS b ON a.id = b.id
   ```
   Before this PR the plan of the query was (note the `<== this reuse node points to a non-existing node` marker):
   ```
   == Physical Plan ==
   *(7) SortMergeJoin [id#14L], [id#18L], Inner
   :- *(3) Sort [id#14L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#14L, 5), true, [id=#298]
   :     +- *(2) Project [id#14L, k#17L]
   :        +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight
   :           :- *(2) Project [id#14L, k#15L]
   :           :  +- *(2) Filter isnotnull(id#14L)
   :           :     +- *(2) ColumnarToRow
   :           :        +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :           :              +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#289]
   :           :                 +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179]
   :           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179]
   :              +- *(1) Project [k#17L]
   :                 +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L))
   :                    +- *(1) ColumnarToRow
   :                       +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   +- *(6) Sort [id#18L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#184] <== this reuse node points to a non-existing node
   ```
   After this PR:
   ```
   == Physical Plan ==
   *(7) SortMergeJoin [id#14L], [id#18L], Inner
   :- *(3) Sort [id#14L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#14L, 5), true, [id=#231]
   :     +- *(2) Project [id#14L, k#17L]
   :        +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight
   :           :- *(2) Project [id#14L, k#15L]
   :           :  +- *(2) Filter isnotnull(id#14L)
   :           :     +- *(2) ColumnarToRow
   :           :        +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :           :              +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#103]
   :           :                 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102]
   :           :                    +- *(1) Project [k#17L]
   :           :                       +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L))
   :           :                          +- *(1) ColumnarToRow
   :           :                             +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   :           +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102]
   +- *(6) Sort [id#18L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#231]
   ```
2. Fixes an issue with separate consecutive `ReuseExchange` and `ReuseSubquery` rules that can result a `ReusedExchange` node pointing to an invalid exchange. This can happen due to the 2 separate rules when `ReuseSubquery` rule modifies an exchange that has already been referenced (reused) in `ReuseExchange` rule.
   Consider the following query:
   ```
   WITH t AS (
     SELECT df1.id, df2.k
     FROM df1 JOIN df2 ON df1.k = df2.k
     WHERE df2.id < 2
   ),
   t2 AS (
     SELECT * FROM t
     UNION
     SELECT * FROM t
   )
   SELECT * FROM t2 AS a JOIN t2 AS b ON a.id = b.id
   ```
   Before this PR the plan of the query was (note the `<== this reuse node points to a non-existing node` marker):
   ```
   == Physical Plan ==
   *(15) SortMergeJoin [id#46L], [id#58L], Inner
   :- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#46L, 5), true, [id=#979]
   :     +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :        +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#975]
   :           +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :              +- Union
   :                 :- *(2) Project [id#46L, k#49L]
   :                 :  +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                 :     :- *(2) Project [id#46L, k#47L]
   :                 :     :  +- *(2) Filter isnotnull(id#46L)
   :                 :     :     +- *(2) ColumnarToRow
   :                 :     :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                 :     :              +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926]
   :                 :     :                 +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   :                 :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   :                 :        +- *(1) Project [k#49L]
   :                 :           +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L))
   :                 :              +- *(1) ColumnarToRow
   :                 :                 +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   :                 +- *(4) Project [id#46L, k#49L]
   :                    +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                       :- *(4) Project [id#46L, k#47L]
   :                       :  +- *(4) Filter isnotnull(id#46L)
   :                       :     +- *(4) ColumnarToRow
   :                       :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                       :              +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926]
   :                       +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   +- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#761] <== this reuse node points to a non-existing node
   ```
   After this PR:
   ```
   == Physical Plan ==
   *(15) SortMergeJoin [id#46L], [id#58L], Inner
   :- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#46L, 5), true, [id=#793]
   :     +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :        +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#789]
   :           +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :              +- Union
   :                 :- *(2) Project [id#46L, k#49L]
   :                 :  +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                 :     :- *(2) Project [id#46L, k#47L]
   :                 :     :  +- *(2) Filter isnotnull(id#46L)
   :                 :     :     +- *(2) ColumnarToRow
   :                 :     :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                 :     :              +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485]
   :                 :     :                 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
   :                 :     :                    +- *(1) Project [k#49L]
   :                 :     :                       +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L))
   :                 :     :                          +- *(1) ColumnarToRow
   :                 :     :                             +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   :                 :     +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
   :                 +- *(4) Project [id#46L, k#49L]
   :                    +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                       :- *(4) Project [id#46L, k#47L]
   :                       :  +- *(4) Filter isnotnull(id#46L)
   :                       :     +- *(4) ColumnarToRow
   :                       :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                       :              +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485]
   :                       +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
   +- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#793]
   ```
   (This example contains issue 1 as well.)

3. Improves the reuse of exchanges and subqueries by enabling reuse across the whole plan. This means that the new combined rule utilizes the reuse opportunities between parent and subqueries by traversing the whole plan. The traversal is started on the top level query only.

4. Due to the order of traversal this PR does while adding reuse nodes, the reuse nodes appear in parent queries if reuse is possible between different levels of queries (typical for DPP). This is not an issue from execution perspective, but this also means "forward references" in explain formatted output where parent queries come first. The changes I made to `ExplainUtils` are to handle these references properly.

This PR fixes the above 3 issues by unifying the separate rules into a `ReuseExchangeAndSubquery` rule that does a 1 pass, whole-plan, bottom-up traversal.

### Why are the changes needed?
Performance improvement.

### How was this patch tested?
- New UTs in `ReuseExchangeAndSubquerySuite` to cover 1. and 2.
- New UTs in `DynamicPartitionPruningSuite`, `SubquerySuite` and `ExchangeSuite` to cover 3.
- New `ReuseMapSuite` to test `ReuseMap`.
- Checked new golden files of `PlanStabilitySuite`s for invalid reuse references.
- TPCDS benchmarks.

Closes #28885 from peter-toth/SPARK-29375-SPARK-28940-whole-plan-reuse.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants