-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-23889][SQL] DataSourceV2: required sorting and clustering for writes #29066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #125632 has finished for PR 29066 at commit
|
e5a42ab to
6fea82b
Compare
|
Test build #125634 has finished for PR 29066 at commit
|
|
Retest this please. |
|
Test build #125690 has finished for PR 29066 at commit
|
|
Hi, @aokolnychyi .
cc @rdblue since this is DSv2. |
|
The test failure in |
| /** | ||
| * A rule that constructs [[Write]]s. | ||
| */ | ||
| object V2Writes extends Rule[LogicalPlan] with PredicateHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rule contains the same logic we had before except it is applied earlier now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, care to explain where are these rules before? I don't see anything moved to this new rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think buildAndRun methods in exec nodes still contain the old logic. Previously, it was called run.
|
Test build #125856 has finished for PR 29066 at commit
|
|
Test build #125857 has finished for PR 29066 at commit
|
5e4d304 to
98fb788
Compare
|
Test build #126190 has finished for PR 29066 at commit
|
|
retest this please |
|
Test build #126195 has finished for PR 29066 at commit
|
|
Retest this please. |
|
Test build #127158 has finished for PR 29066 at commit
|
|
Hi, @aokolnychyi .
|
| /** | ||
| * Returns a logical {@link Write} shared between batch and streaming. | ||
| */ | ||
| default Write build() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API looks like overlapping in function with buildForBatch and buildForStreaming? Which one we should use? build then toBatch/toStream or buildForBatch/buildForStreaming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buildForBatch method (and stream equivalent) are already released, so this generic Write implementation makes the new structure, build + toBatch, work for existing sources. It also allows sources to implement the version that they choose. So if none of the features that require the Write are used, I guess they could avoid a mostly-boilerplate class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, this method was introduced to keep the compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered to change the default impl for buildForBatch to:
default BatchWrite buildForBatch() {
build().toBatch()
}and also the build() to just return a simple anonymous new Write() {}?
Otherwise, I can see that we'll have the buildForBatch (and similarly buildForStreaming) logic in two different places: WriteBuilder and Write. It is easy to miss one or another.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I understood. Could you elaborate a bit more, @sunchao?
Spark will now always call build() and work with the Write abstraction. I added the default implementation so that existing data sources that already implement the current API will continue to work as before. Spark will is not supposed to call buildForBatch after this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New data sources should be encouraged to implement only build.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably deprecate the other ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is now we can potentially have two copies of the toBatch implementation: one in WriteBuilder.buildForBatch and one in Write.toBatch, when users start to override build, buildForBatch and buildForStreaming. If moving forward we want build to be the canonical impl, perhaps we can make buildForBatch and buildForStreaming to just call build.toBatch() internally so that users just need to override build.
| case OverwriteByExpression(relation: DataSourceV2Relation, deleteExpr, query, options, _) => | ||
| // fail if any filter cannot be converted. correctness depends on removing all matching data. | ||
| val filters = splitConjunctivePredicates(deleteExpr).map { | ||
| filter => DataSourceStrategy.translateFilter(deleteExpr, | ||
| supportNestedPredicatePushdown = true).getOrElse( | ||
| throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) | ||
| }.toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the change, we move catalyst expression -> sources.Filter conversion to logical plans. So we will see both catalyst expressions and sources.expressions in logical plans in optimization.
Is it more clear if we use only catalyst expressions in logical plans, and convert to sources.Filter in physical plans when we need to interact datasources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And later in V2WriteRequirements, we also need to convert sources.Filter back to catalyst expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that it is possible to do the conversion later, because we need the write builder to be fully configured to produce a Write in this rule. That way, the Write can expose its order and distribution requirements in the other rule.
I think it is fine to convert to Filter here. That's the public API for filter expressions, so I don't think there is a requirement for it to be used only in physical plans. Filter is already used in the optimizer on the read path as well, because the Scan is similarly built and added to an optimizer plan so that the optimizer can handle stats based on the pushed filters.
And later in V2WriteRequirements, we also need to convert sources.Filter back to catalyst expressions.
I'll take a look at this as well since it sounds odd. I think we should probably keep the original expressions around instead of converting Filter back to Expression.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
Outdated
Show resolved
Hide resolved
|
This is definitely a major missing piece on DSv2 compared to DSv1, as DSv1 writer can deal with Dataframe directly, hence able to do arbitrary changes (including repartition/sort) before doing actual write, like I did for state data source - https://github.com/HeartSaVioR/spark-state-tools/blob/8a74bdb1bc7911a6f71785cf68b784b0a331a1d9/src/main/scala/net/heartsavior/spark/sql/state/StateStoreWriter.scala#L67-L74 Lots of data sources are blocked to migrate to DSv2. Shall we consider prioritizing this? |
|
I'll go through the comments later this week and update the PR. |
sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/NullOrdering.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/SortDirection.java
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
Outdated
Show resolved
Hide resolved
|
I know deprecating and then removing is usually a better idea and I will be okay evolving read and write path separately. The only concern I have is that while we use these interfaces in the write path here, the concept isn't really write-specific. There will be a chance we will have to move these interfaces from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a public interface, do you think we should add some documents for the method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NVM if you think this is obvious :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yeah, docs and annotations should be added for sure. I'll fix while rebasing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added more docs. There may be places where we would want a bit more details but I think we can review those parts when I split this PR into smaller chunks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we add @since 3.1.0 following other existing expressions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure whether this will be part of 3.1.0. Once we have clarity, I'll add the annotation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the Write interface, perhaps we should mention that data sources must implement this if it returns V1_BATCH_WRITE capability?
and also the @since tag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a bit of description.
| /** | ||
| * Returns a logical {@link Write} shared between batch and streaming. | ||
| */ | ||
| default Write build() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered to change the default impl for buildForBatch to:
default BatchWrite buildForBatch() {
build().toBatch()
}and also the build() to just return a simple anonymous new Write() {}?
Otherwise, I can see that we'll have the buildForBatch (and similarly buildForStreaming) logic in two different places: WriteBuilder and Write. It is easy to miss one or another.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to me that what this rule does it to add the distribution/ordering info. Do we plan to add other functionalities to this in future? is V2Writes too general as a name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rule not only inserts shuffle/sort but also build Writes. It is only applied if Write has not been constructed before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we put SupportsOverwrite before SupportsTruncate? if a builder class extends SupportsOverwrite (and isTruncate returns true) then it will be matched by the first clause and call the truncate method, but we want it to call overwrite(filters) right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is existing logic, just moved.
The reason for this is that SupportsOverwrite extends SupportsTruncate and calls overwrite(true). Calling truncate ensures that the source can implement either one if it chooses. Sometimes truncate may be preferred, and it is easier for a source to receive that call directly rather than writing its own equivalent of isTruncate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. I'm just not sure if some data source would extend SupportsOverwrite and decides to override/implement different behavior for overwrite and truncate (which no longer calls overwrite(true), and here we'd pick truncate but what they really want is overwrite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the source doesn't implement truncate, then it gets overwrite(true): https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/SupportsOverwrite.java#L46-L48
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic did not change and should match the previous behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the sporadic review comments. This PR looks mostly good to me and the main thing concerning me is the newly introduced Distribution interface and how that evolve with the existing one in read package. Happy to see discussions already happened on this and agree that we can move start moving in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this ignored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure whether it is safe to do. @dongjoon-hyun @viirya, what's your take on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this have to be an option? or it will always be non-empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept it as Option just in case we don't want to apply the new logic all the time and will introduce a flag to fallback to the old approach. If we are not going to have that flag, we can make this required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, it would simplify the logic if we know the new code will always be applied. Perhaps worth creating a JIRA to track this.
| /** | ||
| * A rule that constructs [[Write]]s. | ||
| */ | ||
| object V2Writes extends Rule[LogicalPlan] with PredicateHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, care to explain where are these rules before? I don't see anything moved to this new rule.
…writes Lead-authored-by: Anton Okolnychyi <[email protected]> Co-authored-by: Ryan Blue <[email protected]>
f7e8444 to
32f5687
Compare
| } | ||
|
|
||
| private[sql] final case class ClusteredDistributionImpl( | ||
| clusteringExprs: Seq[Expression]) extends ClusteredDistribution { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched to using Seq in fields to avoid reasoning about equality of arrays.
| override val write: Option[V1Write] = None) extends V1FallbackWriters { | ||
|
|
||
| override protected def run(): Seq[InternalRow] = { | ||
| writeWithV1(newWriteBuilder().buildForV1Write(), refreshCache = refreshCache) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved refreshCache to parent.
|
|
||
| override protected def run(): Seq[InternalRow] = { | ||
| val writtenRows = writeWithV2(newWriteBuilder().buildForBatch()) | ||
| refreshCache() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refresh happens in V2ExistingTableWriteExec now.
|
I've updated this PR and I am ready to split it into smaller mergeable parts. It would be great if everyone could take another look to make sure we are on the same page. |
|
Seems like there is consensus about evolving this API alongside the interfaces in |
|
Test build #132373 has finished for PR 29066 at commit
|
| } | ||
| } | ||
|
|
||
| private[sql] final case class SortValue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: SortValue sounds somehow confusing to me. Affected by the catalyst SortOrder, seems SortOrder sounds better. However you already define SortOrder as interface. Not strong option, can be ignored if you think it's okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am open to alternatives here for sure.
It seems like we are giving synonyms in this file. For example, FieldReference implements NamedReference. Unfortunately, I cannot use SortOrder as it is already taken in the public expression API.
| // the conversion to catalyst expressions above produces SortOrder expressions | ||
| // for OrderedDistribution and generic expressions for ClusteredDistribution | ||
| // this allows RepartitionByExpression to pick either range or hash partitioning | ||
| RepartitionByExpression(distribution, query, numShufflePartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible the required distribution be changed later by other optimization? The distribution requirement from data source is a hard requirement? Once if the distribution is changed and not matched the requirement, how will data source react to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are inserting repartition/sort nodes directly before writing so my assumption that Spark will only remove them if the incoming plan already satisfies these requirements. WriteDistributionAndOrderingSuite is kind of meant for testing that. Do you have ideas when this assumption will break, @viirya?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. Mis-reading this part. Looks fine. Thanks.
| // we cannot perform this step in the analyzer since we need to optimize expressions | ||
| // in nodes like OverwriteByExpression before constructing a logical write |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we resolve it in the analyzer, cannot we optimize the resolved expressions later in the optimizer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this step, we construct a Write and pass the overwrite expressions to the data source. Expression optimization must have happened before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks for clarifying.
|
It is a bit hard to keep this large PR up-to-date since it touches many places. As it seems like a reasonable approach, I am going to split the work and submit smaller PRs. We can perform detailed reviews on individual PRs. |
|
The first PR with interfaces only is out. |
…ering on write ### What changes were proposed in this pull request? This PR adds connector interfaces proposed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889. **Note**: This PR contains a subset of changes discussed in PR #29066. ### Why are the changes needed? Data sources should be able to request a specific distribution and ordering of data on write. In particular, these scenarios are considered useful: - global sort - cluster data and sort within partitions - local sort within partitions - no sort Please see the design doc above for a more detailed explanation of requirements. ### Does this PR introduce _any_ user-facing change? This PR introduces public changes to the DS V2 by adding a logical write abstraction as we have on the read path as well as additional interfaces to represent distribution and ordering of data (please see the doc for more info). The existing `Distribution` interface in `read` package is read-specific and not flexible enough like discussed in the design doc. The current proposal is to evolve these interfaces separately until they converge. ### How was this patch tested? This patch adds only interfaces. Closes #30706 from aokolnychyi/spark-23889-interfaces. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Ryan Blue <[email protected]>
|
@aokolnychyi, so #30706 and #30577 were the splits? Yeah, I think splitting is a good approach for a big change like this. Should we close this PR BTW? |
|
Closing this one in favor of smaller PRs. |
### What changes were proposed in this pull request? This PR adds logic to build logical writes introduced in SPARK-33779. Note: This PR contains a subset of changes discussed in PR #29066. ### Why are the changes needed? These changes are the next step as discussed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #30806 from aokolnychyi/spark-33808. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…red distribution and ordering ### What changes were proposed in this pull request? This PR adds repartition and sort nodes to satisfy the required distribution and ordering introduced in SPARK-33779. Note: This PR contains the final part of changes discussed in PR #29066. ### Why are the changes needed? These changes are the next step as discussed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR comes with a new test suite. Closes #31083 from aokolnychyi/spark-34026. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…red distribution and ordering ### What changes were proposed in this pull request? This PR adds repartition and sort nodes to satisfy the required distribution and ordering introduced in SPARK-33779. Note: This PR contains the final part of changes discussed in PR apache#29066. ### Why are the changes needed? These changes are the next step as discussed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR comes with a new test suite. Closes apache#31083 from aokolnychyi/spark-34026. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…ering on write ### What changes were proposed in this pull request? This PR adds connector interfaces proposed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889. **Note**: This PR contains a subset of changes discussed in PR apache#29066. ### Why are the changes needed? Data sources should be able to request a specific distribution and ordering of data on write. In particular, these scenarios are considered useful: - global sort - cluster data and sort within partitions - local sort within partitions - no sort Please see the design doc above for a more detailed explanation of requirements. ### Does this PR introduce _any_ user-facing change? This PR introduces public changes to the DS V2 by adding a logical write abstraction as we have on the read path as well as additional interfaces to represent distribution and ordering of data (please see the doc for more info). The existing `Distribution` interface in `read` package is read-specific and not flexible enough like discussed in the design doc. The current proposal is to evolve these interfaces separately until they converge. ### How was this patch tested? This patch adds only interfaces. Closes apache#30706 from aokolnychyi/spark-23889-interfaces. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Ryan Blue <[email protected]> (cherry picked from commit 82aca7e) Signed-off-by: Dongjoon Hyun <[email protected]>
This PR adds logic to build logical writes introduced in SPARK-33779. Note: This PR contains a subset of changes discussed in PR apache#29066. These changes are the next step as discussed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889. No. Existing tests.
…red distribution and ordering (apache#905) ### What changes were proposed in this pull request? This PR adds repartition and sort nodes to satisfy the required distribution and ordering introduced in SPARK-33779. Note: This PR contains the final part of changes discussed in PR apache#29066. ### Why are the changes needed? These changes are the next step as discussed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR comes with a new test suite. Closes apache#31083 from aokolnychyi/spark-34026. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This PR implements the core part of the design doc for SPARK-23889.
Note: This PR contains all changes in one place to simplify the review. Once we agree on the approach, I am going to split it into smaller PRs.
Why are the changes needed?
Data sources should be able to request a specific distribution and ordering of data on write. In particular, these scenarios are considered useful:
Please see the design doc above for a more detailed explanation of requirements.
Does this PR introduce any user-facing change?
This PR introduces public changes to the DS V2 by adding a logical write abstraction as we have on the read path as well as additional interfaces to represent distribution and ordering of data (please see the doc for more info).
Important pieces:
Write- a logical representation of a data source writeRequiresDistributionAndOrdering- a write that requires a specific distribution/orderingV2Writes- a rule that constructs a logical write and inserts repartition/sort nodesWriteDistributionAndOrderingSuite- a test case with samplesHow was this patch tested?
The patch comes with a new test case.