Skip to content

Conversation

@aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Jul 10, 2020

What changes were proposed in this pull request?

This PR implements the core part of the design doc for SPARK-23889.

Note: This PR contains all changes in one place to simplify the review. Once we agree on the approach, I am going to split it into smaller PRs.

Why are the changes needed?

Data sources should be able to request a specific distribution and ordering of data on write. In particular, these scenarios are considered useful:

  • global sort
  • cluster data and sort within partitions
  • local sort within partitions
  • no sort

Please see the design doc above for a more detailed explanation of requirements.

Does this PR introduce any user-facing change?

This PR introduces public changes to the DS V2 by adding a logical write abstraction as we have on the read path as well as additional interfaces to represent distribution and ordering of data (please see the doc for more info).

Important pieces:

  • Write - a logical representation of a data source write
  • RequiresDistributionAndOrdering - a write that requires a specific distribution/ordering
  • V2Writes - a rule that constructs a logical write and inserts repartition/sort nodes
  • WriteDistributionAndOrderingSuite - a test case with samples

How was this patch tested?

The patch comes with a new test case.

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125632 has finished for PR 29066 at commit e5a42ab.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class Distributions

@SparkQA
Copy link

SparkQA commented Jul 11, 2020

Test build #125634 has finished for PR 29066 at commit 6fea82b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class Distributions

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Jul 12, 2020

Test build #125690 has finished for PR 29066 at commit 6fea82b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class Distributions

@dongjoon-hyun
Copy link
Member

Hi, @aokolnychyi .
Could you fix the following UT failure?

  • org.apache.spark.sql.execution.arrow.ArrowConvertersSuite.test Arrow Validator

cc @rdblue since this is DSv2.

@aokolnychyi
Copy link
Contributor Author

The test failure in ArrowConvertersSuite is related to the optimization I did to dedup sorts. I've created another PR to address that separately. Will rebase this one afterwards.

/**
* A rule that constructs [[Write]]s.
*/
object V2Writes extends Rule[LogicalPlan] with PredicateHelper {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule contains the same logic we had before except it is applied earlier now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, care to explain where are these rules before? I don't see anything moved to this new rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think buildAndRun methods in exec nodes still contain the old logic. Previously, it was called run.

@SparkQA
Copy link

SparkQA commented Jul 15, 2020

Test build #125856 has finished for PR 29066 at commit d75f0e4.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 15, 2020

Test build #125857 has finished for PR 29066 at commit 5e4d304.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dbtsai dbtsai requested review from cloud-fan and viirya July 16, 2020 21:23
@SparkQA
Copy link

SparkQA commented Jul 20, 2020

Test build #126190 has finished for PR 29066 at commit 98fb788.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class Distributions
  • case class WriteToDataSourceV2FallbackExec(
  • case class V2BatchWriteCommand(

@aokolnychyi
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 21, 2020

Test build #126195 has finished for PR 29066 at commit 98fb788.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class Distributions
  • case class WriteToDataSourceV2FallbackExec(
  • case class V2BatchWriteCommand(

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Aug 7, 2020

Test build #127158 has finished for PR 29066 at commit 98fb788.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class Distributions
  • case class WriteToDataSourceV2FallbackExec(
  • case class V2BatchWriteCommand(

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Aug 7, 2020

Hi, @aokolnychyi .
I know that you've been waiting for a long time in WIP status. How do you want to proceed this proposal?

  • Is there any updates from your side?
  • Or, do you want to remove WIP from now?

/**
* Returns a logical {@link Write} shared between batch and streaming.
*/
default Write build() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API looks like overlapping in function with buildForBatch and buildForStreaming? Which one we should use? build then toBatch/toStream or buildForBatch/buildForStreaming?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buildForBatch method (and stream equivalent) are already released, so this generic Write implementation makes the new structure, build + toBatch, work for existing sources. It also allows sources to implement the version that they choose. So if none of the features that require the Write are used, I guess they could avoid a mostly-boilerplate class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this method was introduced to keep the compatibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered to change the default impl for buildForBatch to:

  default BatchWrite buildForBatch() {
    build().toBatch()
  }

and also the build() to just return a simple anonymous new Write() {}?

Otherwise, I can see that we'll have the buildForBatch (and similarly buildForStreaming) logic in two different places: WriteBuilder and Write. It is easy to miss one or another.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understood. Could you elaborate a bit more, @sunchao?

Spark will now always call build() and work with the Write abstraction. I added the default implementation so that existing data sources that already implement the current API will continue to work as before. Spark will is not supposed to call buildForBatch after this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New data sources should be encouraged to implement only build.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably deprecate the other ones.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is now we can potentially have two copies of the toBatch implementation: one in WriteBuilder.buildForBatch and one in Write.toBatch, when users start to override build, buildForBatch and buildForStreaming. If moving forward we want build to be the canonical impl, perhaps we can make buildForBatch and buildForStreaming to just call build.toBatch() internally so that users just need to override build.

Comment on lines 45 to 61
case OverwriteByExpression(relation: DataSourceV2Relation, deleteExpr, query, options, _) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = splitConjunctivePredicates(deleteExpr).map {
filter => DataSourceStrategy.translateFilter(deleteExpr,
supportNestedPredicatePushdown = true).getOrElse(
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
}.toArray
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the change, we move catalyst expression -> sources.Filter conversion to logical plans. So we will see both catalyst expressions and sources.expressions in logical plans in optimization.

Is it more clear if we use only catalyst expressions in logical plans, and convert to sources.Filter in physical plans when we need to interact datasources?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And later in V2WriteRequirements, we also need to convert sources.Filter back to catalyst expressions.

Copy link
Contributor

@rdblue rdblue Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that it is possible to do the conversion later, because we need the write builder to be fully configured to produce a Write in this rule. That way, the Write can expose its order and distribution requirements in the other rule.

I think it is fine to convert to Filter here. That's the public API for filter expressions, so I don't think there is a requirement for it to be used only in physical plans. Filter is already used in the optimizer on the read path as well, because the Scan is similarly built and added to an optimizer plan so that the optimizer can handle stats based on the pushed filters.

And later in V2WriteRequirements, we also need to convert sources.Filter back to catalyst expressions.

I'll take a look at this as well since it sounds odd. I think we should probably keep the original expressions around instead of converting Filter back to Expression.

@HeartSaVioR
Copy link
Contributor

This is definitely a major missing piece on DSv2 compared to DSv1, as DSv1 writer can deal with Dataframe directly, hence able to do arbitrary changes (including repartition/sort) before doing actual write, like I did for state data source - https://github.com/HeartSaVioR/spark-state-tools/blob/8a74bdb1bc7911a6f71785cf68b784b0a331a1d9/src/main/scala/net/heartsavior/spark/sql/state/StateStoreWriter.scala#L67-L74

Lots of data sources are blocked to migrate to DSv2. Shall we consider prioritizing this?

@aokolnychyi
Copy link
Contributor Author

I'll go through the comments later this week and update the PR.

@aokolnychyi
Copy link
Contributor Author

@dbtsai, I will rebase this one once PR #30558 is in.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Dec 1, 2020

I know deprecating and then removing is usually a better idea and I will be okay evolving read and write path separately. The only concern I have is that while we use these interfaces in the write path here, the concept isn't really write-specific. There will be a chance we will have to move these interfaces from write package breaking things again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a public interface, do you think we should add some documents for the method?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM if you think this is obvious :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah, docs and annotations should be added for sure. I'll fix while rebasing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more docs. There may be places where we would want a bit more details but I think we can review those parts when I split this PR into smaller chunks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we add @since 3.1.0 following other existing expressions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether this will be part of 3.1.0. Once we have clarity, I'll add the annotation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the Write interface, perhaps we should mention that data sources must implement this if it returns V1_BATCH_WRITE capability?

and also the @since tag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a bit of description.

/**
* Returns a logical {@link Write} shared between batch and streaming.
*/
default Write build() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered to change the default impl for buildForBatch to:

  default BatchWrite buildForBatch() {
    build().toBatch()
  }

and also the build() to just return a simple anonymous new Write() {}?

Otherwise, I can see that we'll have the buildForBatch (and similarly buildForStreaming) logic in two different places: WriteBuilder and Write. It is easy to miss one or another.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me that what this rule does it to add the distribution/ordering info. Do we plan to add other functionalities to this in future? is V2Writes too general as a name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule not only inserts shuffle/sort but also build Writes. It is only applied if Write has not been constructed before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put SupportsOverwrite before SupportsTruncate? if a builder class extends SupportsOverwrite (and isTruncate returns true) then it will be matched by the first clause and call the truncate method, but we want it to call overwrite(filters) right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is existing logic, just moved.

The reason for this is that SupportsOverwrite extends SupportsTruncate and calls overwrite(true). Calling truncate ensures that the source can implement either one if it chooses. Sometimes truncate may be preferred, and it is easier for a source to receive that call directly rather than writing its own equivalent of isTruncate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I'm just not sure if some data source would extend SupportsOverwrite and decides to override/implement different behavior for overwrite and truncate (which no longer calls overwrite(true), and here we'd pick truncate but what they really want is overwrite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic did not change and should match the previous behavior.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the sporadic review comments. This PR looks mostly good to me and the main thing concerning me is the newly introduced Distribution interface and how that evolve with the existing one in read package. Happy to see discussions already happened on this and agree that we can move start moving in parallel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this ignored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether it is safe to do. @dongjoon-hyun @viirya, what's your take on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have to be an option? or it will always be non-empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept it as Option just in case we don't want to apply the new logic all the time and will introduce a flag to fallback to the old approach. If we are not going to have that flag, we can make this required.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, it would simplify the logic if we know the new code will always be applied. Perhaps worth creating a JIRA to track this.

/**
* A rule that constructs [[Write]]s.
*/
object V2Writes extends Rule[LogicalPlan] with PredicateHelper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, care to explain where are these rules before? I don't see anything moved to this new rule.

…writes

Lead-authored-by: Anton Okolnychyi <[email protected]>
Co-authored-by: Ryan Blue <[email protected]>
}

private[sql] final case class ClusteredDistributionImpl(
clusteringExprs: Seq[Expression]) extends ClusteredDistribution {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched to using Seq in fields to avoid reasoning about equality of arrays.

override val write: Option[V1Write] = None) extends V1FallbackWriters {

override protected def run(): Seq[InternalRow] = {
writeWithV1(newWriteBuilder().buildForV1Write(), refreshCache = refreshCache)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved refreshCache to parent.


override protected def run(): Seq[InternalRow] = {
val writtenRows = writeWithV2(newWriteBuilder().buildForBatch())
refreshCache()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refresh happens in V2ExistingTableWriteExec now.

@aokolnychyi
Copy link
Contributor Author

I've updated this PR and I am ready to split it into smaller mergeable parts. It would be great if everyone could take another look to make sure we are on the same page.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Dec 7, 2020

Seems like there is consensus about evolving this API alongside the interfaces in read package. I am not sure whether we need to move new interfaces to write, though. This concept isn't really read or write specific. I'd try to make new interfaces generic enough so that we don't have move things again.

@SparkQA
Copy link

SparkQA commented Dec 7, 2020

Test build #132373 has finished for PR 29066 at commit 32f5687.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class Distributions
  • trait V2ExistingTableWriteExec extends V2TableWriteExec

}
}

private[sql] final case class SortValue(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: SortValue sounds somehow confusing to me. Affected by the catalyst SortOrder, seems SortOrder sounds better. However you already define SortOrder as interface. Not strong option, can be ignored if you think it's okay.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am open to alternatives here for sure.

It seems like we are giving synonyms in this file. For example, FieldReference implements NamedReference. Unfortunately, I cannot use SortOrder as it is already taken in the public expression API.

// the conversion to catalyst expressions above produces SortOrder expressions
// for OrderedDistribution and generic expressions for ClusteredDistribution
// this allows RepartitionByExpression to pick either range or hash partitioning
RepartitionByExpression(distribution, query, numShufflePartitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible the required distribution be changed later by other optimization? The distribution requirement from data source is a hard requirement? Once if the distribution is changed and not matched the requirement, how will data source react to it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are inserting repartition/sort nodes directly before writing so my assumption that Spark will only remove them if the incoming plan already satisfies these requirements. WriteDistributionAndOrderingSuite is kind of meant for testing that. Do you have ideas when this assumption will break, @viirya?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. Mis-reading this part. Looks fine. Thanks.

Comment on lines +152 to +153
// we cannot perform this step in the analyzer since we need to optimize expressions
// in nodes like OverwriteByExpression before constructing a logical write
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we resolve it in the analyzer, cannot we optimize the resolved expressions later in the optimizer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this step, we construct a Write and pass the overwrite expressions to the data source. Expression optimization must have happened before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks for clarifying.

@aokolnychyi
Copy link
Contributor Author

It is a bit hard to keep this large PR up-to-date since it touches many places. As it seems like a reasonable approach, I am going to split the work and submit smaller PRs. We can perform detailed reviews on individual PRs.

@aokolnychyi
Copy link
Contributor Author

The first PR with interfaces only is out.

rdblue pushed a commit that referenced this pull request Dec 14, 2020
…ering on write

### What changes were proposed in this pull request?

This PR adds connector interfaces proposed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889.

**Note**: This PR contains a subset of changes discussed in PR #29066.

### Why are the changes needed?

Data sources should be able to request a specific distribution and ordering of data on write. In particular, these scenarios are considered useful:
- global sort
- cluster data and sort within partitions
- local sort within partitions
- no sort

Please see the design doc above for a more detailed explanation of requirements.

### Does this PR introduce _any_ user-facing change?

This PR introduces public changes to the DS V2 by adding a logical write abstraction as we have on the read path as well as additional interfaces to represent distribution and ordering of data (please see the doc for more info).

The existing `Distribution` interface in `read` package is read-specific and not flexible enough like discussed in the design doc. The current proposal is to evolve these interfaces separately until they converge.

### How was this patch tested?

This patch adds only interfaces.

Closes #30706 from aokolnychyi/spark-23889-interfaces.

Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Ryan Blue <[email protected]>
@HyukjinKwon
Copy link
Member

@aokolnychyi, so #30706 and #30577 were the splits? Yeah, I think splitting is a good approach for a big change like this. Should we close this PR BTW?

@aokolnychyi
Copy link
Contributor Author

Closing this one in favor of smaller PRs.

cloud-fan pushed a commit that referenced this pull request Dec 22, 2020
### What changes were proposed in this pull request?

This PR adds logic to build logical writes introduced in SPARK-33779.

Note: This PR contains a subset of changes discussed in PR #29066.

### Why are the changes needed?

These changes are the next step as discussed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #30806 from aokolnychyi/spark-33808.

Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Jan 26, 2021
…red distribution and ordering

### What changes were proposed in this pull request?

This PR adds repartition and sort nodes to satisfy the required distribution and ordering introduced in SPARK-33779.

Note: This PR contains the final part of changes discussed in PR #29066.

### Why are the changes needed?

These changes are the next step as discussed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR comes with a new test suite.

Closes #31083 from aokolnychyi/spark-34026.

Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
skestle pushed a commit to skestle/spark that referenced this pull request Feb 3, 2021
…red distribution and ordering

### What changes were proposed in this pull request?

This PR adds repartition and sort nodes to satisfy the required distribution and ordering introduced in SPARK-33779.

Note: This PR contains the final part of changes discussed in PR apache#29066.

### Why are the changes needed?

These changes are the next step as discussed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR comes with a new test suite.

Closes apache#31083 from aokolnychyi/spark-34026.

Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
…ering on write

### What changes were proposed in this pull request?

This PR adds connector interfaces proposed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889.

**Note**: This PR contains a subset of changes discussed in PR apache#29066.

### Why are the changes needed?

Data sources should be able to request a specific distribution and ordering of data on write. In particular, these scenarios are considered useful:
- global sort
- cluster data and sort within partitions
- local sort within partitions
- no sort

Please see the design doc above for a more detailed explanation of requirements.

### Does this PR introduce _any_ user-facing change?

This PR introduces public changes to the DS V2 by adding a logical write abstraction as we have on the read path as well as additional interfaces to represent distribution and ordering of data (please see the doc for more info).

The existing `Distribution` interface in `read` package is read-specific and not flexible enough like discussed in the design doc. The current proposal is to evolve these interfaces separately until they converge.

### How was this patch tested?

This patch adds only interfaces.

Closes apache#30706 from aokolnychyi/spark-23889-interfaces.

Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Ryan Blue <[email protected]>
(cherry picked from commit 82aca7e)
Signed-off-by: Dongjoon Hyun <[email protected]>
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
This PR adds logic to build logical writes introduced in SPARK-33779.

Note: This PR contains a subset of changes discussed in PR apache#29066.

These changes are the next step as discussed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889.

No.

Existing tests.
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
…red distribution and ordering (apache#905)

### What changes were proposed in this pull request?

This PR adds repartition and sort nodes to satisfy the required distribution and ordering introduced in SPARK-33779.

Note: This PR contains the final part of changes discussed in PR apache#29066.

### Why are the changes needed?

These changes are the next step as discussed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR comes with a new test suite.

Closes apache#31083 from aokolnychyi/spark-34026.

Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants