-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
As identified in #423 and #378 (and #412) there needs to be a mechanism for physical operators to express their behaviour with respect to sort order, so that optimisation passes can handle it correctly.
It is assumed that the initial physical plan created from the logical plan is valid, and that the requirement is for the optimisation phase to not alter the plan in a way that violates its implicit ordering requirements. I think it is therefore sufficient to encode some notion of sort sensitivity, as opposed to what the sort order necessarily is. I believe any optimisations related to the specific sort orders being utilised would take place at the LogicalPlan level, and avoiding this at the physical layer sidesteps issues around equality for PhysicalExpressions, etc...
The proposal would be to introduce a new member function to ExecutionPlan called partition_order() that returns a variant of a new enum PartitionOrder. This would have three variants:
PartitionOrder::Preserving- operators that preserve the ordering of their input partition(s) - e.g.FilterExec,CoalesceBatchesExecPartitionOrder::Sensitive- operators which rely on the order of their input partition(s) - e.g.GlobalLimitExec,LocalLimitExecPartitionOrder::Agnostic- operators which do not rely on, nor preserve the order of their input partition(s) - e.g.HashAggregateExec,MergeExec,RepartitionExec
Note that the formulation does not distinguish between 1 or many partitions, as this is a detail already encapsulated by required_child_distribution (although I do wonder if this should be a property of the plan and not the operators themselves). There is no mechanism to express an ordering requirement across partitions, I'm not sure that this would be useful.
The default implementation of partition_order() would return PartitionOrder::Sensitive. Or to put it another way, unless explicitly told otherwise the optimiser cannot assume that an operator isn't sensitive to the ordering of its inputs.
The Repartition pass would then be modified to only insert a RepartitionExec on branches of the graph that have no PartitionOrder::Sensitive operators without an intervening PartitionOrder::Agnostic operator. This would fix #423. AddMergeExec could additionally be modified to error if it find itself needing to insert a MergeExec on an order sensitive branch.
Eventually as certain forms of RepartitionExec are order preserving, e.g. splitting a single partition into multiple, this could be codified and combined with a modified version of AddMergeExec that inserts an order preserving merge. This would naturally fit into the proposed framework.
I'm not sure how ordering is typically handled in query engines, so if there is a standard solution I'd be happy to go with that instead, but thought I'd write up the simplest solution I can see to the issue in #423