-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Add support for multiple partitions with SortExec (#362) #378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Currently sort needs a single partition as otherwise the partitions are not sorted. A mergeexec currently is added based on this requirement. So this won't work I think untill we have the implementation to merge the sorted partitions which you are working on in #379 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @Dandandan pointed out, at the moment there is an assumption in the planner that a SortExec produces a single sorted partition as the output. When SortExec reports that it requires a single stream for its input, part of the planner puts a MergeExec (confusing name, I know) before to get a single stream. This is why the unit tests are failing on this PR (b/c the output order is wrong in some queries)
Perhaps one way to allow a SortExec to run on multiple partitions would be to pass in the desired output partitioning to the SortExec constructor and then use that request to decide what the input to the SortExec should be
|
I've added a new constructor that allows opting into the new behaviour. I wasn't aware of the way that MergeExec is plumbed into the plans and that this would break it. I do wonder if instead of relying on an |
I think you are right about that, we should not rely on the optimizer to make the execution plan correct. I think it would be better if the planner adds the |
|
I did some more digging into this and created #412 to track the fact that PhysicalPlanner currently creates plans that are incorrect. However, I think the issue is actually a bit more subtle than I first realised. Currently This means that the operators inserted by PhysicalPlan must somehow remember the partitioning they need to be correct, in order to prevent the optimiser from breaking them, simply adding MergeExec when generating the initial plan is insufficient. There are a couple of ways this gets handled that I can see:
I therefore think the addition of a However, it is unfortunately insufficient as nothing prevents Going to mark this as a draft for now, as the above will have implications for what the best way forward for this is |
I agree with @tustvold and @Dandandan on this -- I think the plan should generate correct results without requiring optimizer passes being run. The optimizer passes should just (potentially) make the plans faster.
I agree
Is there any reason we can't call |
|
I think this is "as correct as current master" and therefore marking this as ready for review. It is impacted by #423 (the issue alluded to above r.e. the |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only thing I think that would make this PR better is tests, but I believe tests are added in #362 so I think we should merge this PR in as is.
@Dandandan any thoughts?
Dandandan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's OK when the tests follow later 👍 thanks @tustvold
…e#378) * Add support for multiple partitions with SortExec * make SortExec partitioning optional
* Make expr member of PyExpr public * Add RexType to Expr * Add utility functions for mapping ScalarValue instances to DataTypeMap instances * Add function to get python_value from Expr instance * Fix syntax problems * Add function to get the operands for a Rex::Call * Add function to get operator for RexType::Call * expand types function to include variant support for BinaryExpr * Add variant coverage for Decimal128 and Decimal256 * add function for getting the column name of an Expr from a LogicalPlan * Make PyProjection::projection member public * Add projected_expressions to projection node * Adjust function signature * Add Distinct variant to to_variant function in PyLogicalPlan * Fill in variants for DataType::Timestamp * Address syntax issues * Refactor types() function to extend support for CAST * Update CAST variant handling * Cargo fmt * Cargo clippy * Coverage for INTERVAL in DataType * More cargo fmt changes
Which issue does this PR close?
re #362
Rationale for this change
Once an order preserving merge operator is added as part of #362 it will be possible to combine multiple sorted partitions together into a single partition - effectively yielding partitioned sort. Loosening the restriction on SortExec to a single partition allows it to form the sort part of this.
What changes are included in this PR?
SortExec is no longer restricted to a single partition, instead preserving the partitioning of its inputs