-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Push down preferred sorts into TableScan
logical plan node
#17337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Push down preferred sorts into TableScan
logical plan node
#17337
Conversation
TableScan
logical plan nodeTableScan
logical plan node
/// Optional preferred ordering for the scan | ||
pub preferred_ordering: Option<Vec<SortExpr>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@berkaysynnada do you think this is the right information to pass down? Or is there a world where it makes sense to pass down some sort of "equivalence" information?
cc @alamb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think @suremarc and @ozankabak may be interested in this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more future-proof API (that we could change the internal representation) might be something like
/// Preferred ordering
///
/// Preferred orderings can potentially help DataFusion optimize queries, even in cases
/// when the output does not completely follow that order. This is information passed
/// to the scan about what might help.
///
/// For example, a query with `ORDER BY time DESC LIMIT 10`, DataFusion's dynamic
/// predicates and TopK operator will work better if the data is roughly ordered by descending
/// time (more recent data first)
struct PreferredOrdering {
exprs: Vec<SortExpr>
}
And then change this API to
/// Optional preferred ordering for the scan | |
pub preferred_ordering: Option<Vec<SortExpr>>, | |
/// Optional preferred ordering for the scan | |
pub preferred_ordering: Option<PreferredOrdering>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@berkaysynnada do you think this is the right information to pass down? Or is there a world where it makes sense to pass down some sort of "equivalence" information?
cc @alamb
When we are registering the sources, we can provide multiple orderings if the table supports them. However, the requirements are singular, and I don't think there would be any meaning in ordering the table for both col_a
and col_b
simultaneously. So, I've always thought that requirements need only one ordering, but specs should be capable of having multiple orderings. So there isn't any obvious advantage of using equivalences here, IMO
TableScan
logical plan nodeTableScan
logical plan node
🤖 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks good but I was confused that the tests don't seem to show the preferred ordering. I think we should fix those tests before merging -- I also expect it to show that some of the pushdown isn't working quite as expected (aka pushing through a projection or filter)
I also recommend putting the prefered sort expressions in their own struct, but that is not required in my mind.
As I understand the plan, in the next few PRs, @adriangb will update the various APIs so that this preferred sort is provided to TableProvider::scan
(really via scan_with_args)
I also wonder if we should wait for the DataFusion 50 release before merging this or if it is ok to merge now.
/// Optional preferred ordering for the scan | ||
pub preferred_ordering: Option<Vec<SortExpr>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more future-proof API (that we could change the internal representation) might be something like
/// Preferred ordering
///
/// Preferred orderings can potentially help DataFusion optimize queries, even in cases
/// when the output does not completely follow that order. This is information passed
/// to the scan about what might help.
///
/// For example, a query with `ORDER BY time DESC LIMIT 10`, DataFusion's dynamic
/// predicates and TopK operator will work better if the data is roughly ordered by descending
/// time (more recent data first)
struct PreferredOrdering {
exprs: Vec<SortExpr>
}
And then change this API to
/// Optional preferred ordering for the scan | |
pub preferred_ordering: Option<Vec<SortExpr>>, | |
/// Optional preferred ordering for the scan | |
pub preferred_ordering: Option<PreferredOrdering>, |
/// Currently, we only support pushing down simple column references | ||
/// because table providers typically can't optimize complex expressions | ||
/// in sort pushdown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a fundamental limitation? I ask because @pepijnve was asking about "column only" support the other day at
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for context, we have externally prepared data files that contain filesystem paths. One column is the full parent path, another is the file name. The order of the rows in the file is replace(concat(parent, name), '/', chr(0))
and we make extensive use of this aspect of the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new implementation pushes down arbitrary sort expressions. Not only that: it's able to reverse projections out. E.g. given the projection a, b, c+d+1 as cd1
and the sort expression a, cd1
it will push down a, c+d+1
into the scan
Sort: t1.a ASC NULLS LAST | ||
Inner Join: t1.a = t2.a | ||
TableScan: t1 | ||
TableScan: t2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these tests don't really show that the preferred ordering is pushed through. Perhaps we can update the plan to show any preferred ordering
#[derive(Default, Debug)] | ||
pub struct PushDownSort {} | ||
|
||
impl PushDownSort { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the EnforceSorting
rule already pushes sorts down in the plan -- https://docs.rs/datafusion/latest/datafusion/physical_optimizer/enforce_sorting/struct.EnforceSorting.html
Do you think we will need more sort pushdown? Or will this always just be "pass down preferred sorts" to LogicalPlans?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I was hoping another optimizer rule does the "hard work" so we can do just the simple thing here (only a subset of node types we need to support).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm that's at the physical optimizer layer though. We need to do this optimization ad the logical layer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb do you have any guidance on pushing down sorts in logical plans? I don't see anything which is a bit surprising, I thought it would basically already be implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is all done in the physical plans
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay bummer. It seems to rely on methods on the traits, etc. I guess we need to figure this all out from scratch here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did implement it from scratch but luckily since we are just pushing down preferred ordering and not eliminating sorts altogether, and because this operates on LogicalPlan it is relatively simple and not a big deal to get wrong (slower queries, not incorrect results).
Here is a PR that avoids some clones, which might improve performance |
🤔 this seems to have caused a massive slowdown in the sql planner benchmark somehow: Benchmarking physical_sorted_union_order_by_300: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 3942.3s, or reduce sample count to 10.
physical_sorted_union_order_by_300
time: [38.914 s 38.997 s 39.079 s]
Benchmarking logical_plan_optimize: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 17189.6s, or reduce sample count to 10. It is still running... |
I'm sure it's just a dumb mistake on my end. Let me do a round of looking at your comments and investigating, thank you for your patience 🙏🏻 |
I think we should wait until after v50 |
c84cf55
to
39664ae
Compare
39664ae
to
724d476
Compare
@alamb sorry for never responding to your comments and the delay here. I realized that the approach was fundamentally flawed and needed some time to think and loop back. I've now rewritten it from scratch using the TreeNode API, added better tests, etc. I still need to do some cleanup (I see MSRV is failing, I want to update the PR description, go through your feedback and pull out the still relevant parts, etc.) but this is looking much better now. |
70b9d26
to
469e1af
Compare
Okay I've updated doc strings, the PR description, combed through the feedback... I think this is ready for review! |
/// A node context object beneficial for writing optimizer rules. | ||
/// This context encapsulates a [`LogicalPlan`] node with a payload. | ||
/// | ||
/// Since each wrapped node has its children within both the [`LogicalPlanContext.plan.inputs()`], | ||
/// as well as separately within the [`LogicalPlanContext.children`] (which are child nodes wrapped in the context), | ||
/// it's important to keep these child plans in sync when performing mutations. | ||
/// | ||
/// Since there are two ways to access child plans directly — it's recommended | ||
/// to perform mutable operations via [`Self::update_plan_from_children`]. | ||
/// After mutating the `LogicalPlanContext.children`, or after creating the `LogicalPlanContext`, | ||
/// call `update_plan_from_children` to sync. | ||
#[derive(Debug, Clone)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to break this out into it's own PR
let mut replace_map = HashMap::new(); | ||
for (i, (qualifier, field)) in | ||
subquery_alias.input.schema().iter().enumerate() | ||
{ | ||
let (sub_qualifier, sub_field) = | ||
subquery_alias.schema.qualified_field(i); | ||
replace_map.insert( | ||
qualified_name(sub_qualifier, sub_field.name()), | ||
Expr::Column(Column::new(qualifier.cloned(), field.name())), | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unified into helper function in utils.rs
} | ||
|
||
/// replaces columns by its name on the projection. | ||
pub fn replace_cols_by_name( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to utils.rs
so we can re-use in sort pushdown
/// - Input: "test.a" (from input_schema) | ||
/// - Output: "subquery.a" (from output_schema) | ||
/// - Map: {"subquery.a" -> Column("test", "a")} | ||
pub(crate) fn build_schema_remapping( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoiding a public symbol until someone asks for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a new logical optimization rule called "PushDownSort" that propagates sort expressions down to TableScan nodes to enable sort-aware optimizations by table providers. The optimization pushes preferred ordering information through transparent nodes (like Projection, Filter, Limit) while preserving the original Sort nodes to ensure correctness.
Key changes include:
- Addition of a new
PushDownSort
optimizer rule that pushes sort expressions to table scans - Extension of
TableScan
nodes with optional preferred ordering information viaScanOrdering
- Updates to test expectations to reflect the new preferred_ordering display in table scans
Reviewed Changes
Copilot reviewed 34 out of 34 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
datafusion/optimizer/src/push_down_sort.rs | New optimizer rule implementing sort pushdown logic with expression rewriting |
datafusion/expr/src/logical_plan/plan.rs | Added ScanOrdering struct and integrated it into TableScan nodes |
datafusion/optimizer/src/optimizer.rs | Integrated PushDownSort rule into the default optimization pipeline |
datafusion/optimizer/src/utils.rs | Added utility functions for column name replacement and schema remapping |
datafusion/sqllogictest/test_files/*.slt | Updated test expectations to show preferred_ordering in TableScan display |
Comments suppressed due to low confidence (1)
datafusion/optimizer/src/push_down_sort.rs:1
- Corrected spelling of 'eliminiate' to 'eliminate'.
// Licensed to the Apache Software Foundation (ASF) under one
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
/// Ordering for the scan | ||
pub ordering: Option<ScanOrdering>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this just be ScanOrdering
instead of Option<ScanOrdering>
?
/// If the scan produces this exact ordering and sets it's properties to reflect this upstream sorts may be optimized away. | ||
/// Otherwise the sorts may remain in place but partial ordering may be exploited e.g. to do early stopping or reduce complexity of the sort. | ||
/// Thus it is recommended for the scan to also do a best effort to produce partially sorted data if possible. | ||
pub preferred_ordering: Option<Vec<SortExpr>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be preferred_ordering: Vec<SortExpr>,
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It comes down to what does a Vec
of zero length mean? That would imply to me there is no preferred ordering, so an Option<Vec> is redundant
I would personally recommend making this non pub
and adding an accessor like fn preferred_ordering(&self) -> Option<&Vec<...>> {}
and documenting in doc comments what the invariants are
Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, | ||
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery, | ||
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, | ||
Projection, RecursiveQuery, Repartition, ScanOrdering, SkipType, Sort, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addition of the ScanOrdering
import results in reformatting of the rest
This commit adds a new optional field `preferred_ordering` to the `TableScan` logical plan node to support sort pushdown optimizations. Changes include: - Add `preferred_ordering: Option<Vec<SortExpr>>` field to `TableScan` struct - Add `try_new_with_preferred_ordering` constructor method - Update all `TableScan` constructors throughout the codebase to include the new field - Update `Debug`, `PartialEq`, `Hash`, and `PartialOrd` implementations - Update pattern matching in optimizer and other modules The preferred_ordering field is currently not used by any optimization rules but provides the foundation for future sort pushdown implementations. This is part 2 of 2 PRs split from apache#17273 as requested in apache#17273 (comment) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
0eba2fa
to
9eff254
Compare
I hope to review this more carefully later today |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb -- I just reviewed this PR again but I am confused.
Why are we introducing preferred sorts into the LogicalPlan?
I thought the eventual idea was to pass the preferred sorts into the DataSource exec so it could order files within a partition appropriately. However, I don't see any code to actually reorder files and I don't understand how it would work with information on the logical plan
It seems like the code that does reorder files is here in the ExecutionPlan:
datafusion/datafusion/datasource/src/file.rs
Lines 93 to 108 in d19bf52
fn repartitioned( | |
&self, | |
target_partitions: usize, | |
repartition_file_min_size: usize, | |
output_ordering: Option<LexOrdering>, | |
config: &FileScanConfig, | |
) -> Result<Option<FileScanConfig>> { | |
if config.file_compression_type.is_compressed() || config.new_lines_in_values { | |
return Ok(None); | |
} | |
let repartitioned_file_groups_option = FileGroupPartitioner::new() | |
.with_target_partitions(target_partitions) | |
.with_repartition_file_min_size(repartition_file_min_size) | |
.with_preserve_order_within_groups(output_ordering.is_some()) | |
.repartition_file_groups(&config.file_groups); |
Won't we have to thread the preferred sort to there?
/// when the output does not completely follow that order. This is information passed | ||
/// to the scan about what might help. | ||
/// | ||
/// For example, a query with `ORDER BY time DESC LIMIT 10`, DataFusion's dynamic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about also linking to the blog that explains this in more detail: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/
/// If the scan produces this exact ordering and sets it's properties to reflect this upstream sorts may be optimized away. | ||
/// Otherwise the sorts may remain in place but partial ordering may be exploited e.g. to do early stopping or reduce complexity of the sort. | ||
/// Thus it is recommended for the scan to also do a best effort to produce partially sorted data if possible. | ||
pub preferred_ordering: Option<Vec<SortExpr>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It comes down to what does a Vec
of zero length mean? That would imply to me there is no preferred ordering, so an Option<Vec> is redundant
I would personally recommend making this non pub
and adding an accessor like fn preferred_ordering(&self) -> Option<&Vec<...>> {}
and documenting in doc comments what the invariants are
} | ||
} | ||
|
||
/// Communicates the desired ordering of the output of a scan operation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of this text is about the preferred_ordering field, so maybe it would be best moved closer there.
I can imagine potentially adding other fields like required_ordering
in the future, which could communicate if the scan was required (if/when we extend the ExecutionPlan API to communicate what type of sort pushdowns are supported 🤔 )
Arc::new(EliminateOuterJoin::new()), | ||
// Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit | ||
Arc::new(PushDownLimit::new()), | ||
// Sort pushdown should happen before filter pushdown to maximize optimization opportunities |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the other sort operations happen at the ExecutionPlan layer. Can you remind me why this pushdown is happening at the LogicalLevel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per #17337 (comment) I'm doing it at the logical layer because that's where sort orders of scans is currently handled. But maybe this is different enough from the current handling of sort orders to warrant doing it at the physical layer?
That is the end goal. The ordering of files currently happens in Some code pointers: datafusion/datafusion/catalog-listing/src/table.rs Lines 450 to 476 in b5b7f9b
datafusion/datafusion/datasource/src/file_scan_config.rs Lines 851 to 872 in b5b7f9b
My previous PR that started down this path but got too big and made me think that we should break it up: #17273 The execution plan does then try to repartition again as you say but it passes in any existing ordering and tries to preserve it:
datafusion/datafusion/datasource/src/file.rs Line 107 in b5b7f9b
I'm guessing those two things must be compatible already - otherwise the existing implementation would be totally broken, right? |
TableProvider::scan_with_args
to support pushdown sorting #17273 (comment).This will enable TableProvider's to produce files in an order and partitioning that optimizes query execution, e.g. to make a TopK operator stop earlier via dynamic filters or to completely optimize away a sort if the files can be ordered to do so. I think this will also unlock a lot of cool physical optimizer tricks (e.g. pick
SortMergeJoin(Scan[order_by=a], Scan[order_by=a])
instead ofSortMergeJoin(SortExec(HashJoinExec(Scan, Scan)))
for a query likeselect * from t1 join t2 using (a) order by a
).This does not actually remove sort nodes. That is still done only at the physical optimizer level as some logical operators interact differently with sorts depending on which physical implementation is chosen (e.g. HashJoin vs. SortMergeJoin). It is up to the TableProvider to produce an ExecutionPlan that has ordering properties.