-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Open
Labels
enhancementNew feature or requestNew feature or request
Description
Is your feature request related to a problem or challenge?
These are examples and use cases for the design of #8078
InfluxDB IOx uses statistics Min and Max to optimize some queries and we found, for many queries, their statistics are either lost (become absent) or become inexact while being propagated upwards. Our request is to keep the statistics conservative that cover the exact bound rather then going it or making it inexact
Note that the reproducers below do not show Stats Min and Max so I use the Stats Row instead. Open #8110 to add Stats Minuend Max in the explain
Table for the reproducer
set datafusion.explain.show_statistics = true;
create table t1(state string, city string, min_temp float, area int, time timestamp) as values
('MA', 'Boston', 70.4, 1, 50),
('MA', 'Bedford', 71.59, 2, 150),
('CA', 'SF', 79.0, 1, 300),
('MA', 'Boston', 75.4, 3, 250),
('MA', 'Andover', 69.5, 4, 250),
('MA', 'Bedford', 78.2, 2, 150),
('MA', 'Boston', 65.0, 2, 250),
('CA', 'SJ', 78.4, 1, 300),
('MA', 'Reading', 53.0, 4, 250),
('CA', 'SJ', 75.4, 5, 350);stats of timestamp filter is lost
explain select * from t1 where time <= to_timestamp(350);
+---------------+---------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------+
| logical_plan | Filter: t1.time <= TimestampNanosecond(350000000000, None) |
| | TableScan: t1 projection=[state, city, min_temp, area, time] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent] |
| | FilterExec: time@4 <= 350000000000, statistics=[Rows=Absent, Bytes=Absent] |
| | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | |
+---------------+---------------------------------------------------------------------------------------------------+stats of string filter is lost
explain select * from t1 where state = 'MA';
+---------------+---------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------+
| logical_plan | Filter: t1.state = Utf8("MA") |
| | TableScan: t1 projection=[state, city, min_temp, area, time] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent] |
| | FilterExec: state@0 = MA, statistics=[Rows=Absent, Bytes=Absent] |
| | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | |
+---------------+---------------------------------------------------------------------------------------------------+stats of integer filter becomes inexact
explain select * from t1 where area <= 100;
+---------------+---------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------+
| logical_plan | Filter: t1.area <= Int32(100) |
| | TableScan: t1 projection=[state, city, min_temp, area, time] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Inexact(10), Bytes=Inexact(2960)] |
| | FilterExec: area@3 <= 100, statistics=[Rows=Inexact(10), Bytes=Inexact(2960)] |
| | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | |
+---------------+---------------------------------------------------------------------------------------------------+stats of float filter becomes inexact
explain select * from t1 where min_temp = 10.0;
+---------------+---------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------+
| logical_plan | Filter: t1.area <= Int32(100) |
| | TableScan: t1 projection=[state, city, min_temp, area, time] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Inexact(10), Bytes=Inexact(2960)] |
| | FilterExec: area@3 <= 100, statistics=[Rows=Inexact(10), Bytes=Inexact(2960)] |
| | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | |
+---------------+---------------------------------------------------------------------------------------------------+stats of filter IN is lost for any data type & subquery
explain select * from t1 where area in (1, 2);
+---------------+---------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------+
| logical_plan | Filter: t1.area = Int32(1) OR t1.area = Int32(2) |
| | TableScan: t1 projection=[state, city, min_temp, area, time] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent] |
| | FilterExec: area@3 = 1 OR area@3 = 2, statistics=[Rows=Absent, Bytes=Absent] |
| | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | |
+---------------+---------------------------------------------------------------------------------------------------+
explain select * from t1 where city in ('Boston', 'Reading');
+---------------+---------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------+
| logical_plan | Filter: t1.city = Utf8("Boston") OR t1.city = Utf8("Reading") |
| | TableScan: t1 projection=[state, city, min_temp, area, time] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent] |
| | FilterExec: city@1 = Boston OR city@1 = Reading, statistics=[Rows=Absent, Bytes=Absent] |
| | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | |
+---------------+---------------------------------------------------------------------------------------------------+
explain select * from t1 where city in (select city from t1);
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| logical_plan | LeftSemi Join: t1.city = __correlated_sq_1.city |
| | TableScan: t1 projection=[state, city, min_temp, area, time] |
| | SubqueryAlias: __correlated_sq_1 |
| | TableScan: t1 projection=[city] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent] |
| | HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(city@1, city@0)], statistics=[Rows=Absent, Bytes=Absent] |
| | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | RepartitionExec: partitioning=Hash([city@1], 10), input_partitions=1, statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | RepartitionExec: partitioning=Hash([city@0], 10), input_partitions=1, statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------+stats of join is lost
explain select * from t1, t1 as t2 where t1.city = t2.city;
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Inner Join: t1.city = t2.city |
| | TableScan: t1 projection=[state, city, min_temp, area, time] |
| | SubqueryAlias: t2 |
| | TableScan: t1 projection=[state, city, min_temp, area, time] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent] |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(city@1, city@1)], statistics=[Rows=Absent, Bytes=Absent] |
| | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | RepartitionExec: partitioning=Hash([city@1], 10), input_partitions=1, statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | RepartitionExec: partitioning=Hash([city@1], 10), input_partitions=1, statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------+stats of aggregation becomes inexact
explain select city, max(min_temp) as max_min_temp from t1 group by city order by max_min_temp DESC limit 5;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=5 |
| | Sort: max_min_temp DESC NULLS FIRST, fetch=5 |
| | Projection: t1.city, MAX(t1.min_temp) AS max_min_temp |
| | Aggregate: groupBy=[[t1.city]], aggr=[[MAX(t1.min_temp)]] |
| | TableScan: t1 projection=[city, min_temp] |
| physical_plan | GlobalLimitExec: skip=0, fetch=5, statistics=[Rows=Inexact(5), Bytes=Absent] |
| | SortPreservingMergeExec: [max_min_temp@1 DESC], fetch=5, statistics=[Rows=Inexact(10), Bytes=Absent] |
| | SortExec: TopK(fetch=5), expr=[max_min_temp@1 DESC], statistics=[Rows=Inexact(10), Bytes=Absent] |
| | ProjectionExec: expr=[city@0 as city, MAX(t1.min_temp)@1 as max_min_temp], statistics=[Rows=Inexact(10), Bytes=Absent] |
| | AggregateExec: mode=FinalPartitioned, gby=[city@0 as city], aggr=[MAX(t1.min_temp)], statistics=[Rows=Inexact(10), Bytes=Absent] |
| | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Inexact(10), Bytes=Absent] |
| | RepartitionExec: partitioning=Hash([city@0], 10), input_partitions=10, statistics=[Rows=Inexact(10), Bytes=Absent] |
| | RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, statistics=[Rows=Inexact(10), Bytes=Absent] |
| | AggregateExec: mode=Partial, gby=[city@0 as city], aggr=[MAX(t1.min_temp)], statistics=[Rows=Inexact(10), Bytes=Absent] |
| | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------+Describe the solution you'd like
Have another enum value conservative keep the stats.
A few examples: Row statistics of
. Filter : same value as non-filter
. Join: Cartesian product of 2 inputs
Describe alternatives you've considered
No response
Additional context
No response
alamb
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request