Skip to content

Queries that lost statistics or their statistics become inexact #8099

@NGA-TRAN

Description

@NGA-TRAN

Is your feature request related to a problem or challenge?

These are examples and use cases for the design of #8078

InfluxDB IOx uses statistics Min and Max to optimize some queries and we found, for many queries, their statistics are either lost (become absent) or become inexact while being propagated upwards. Our request is to keep the statistics conservative that cover the exact bound rather then going it or making it inexact

Note that the reproducers below do not show Stats Min and Max so I use the Stats Row instead. Open #8110 to add Stats Minuend Max in the explain

Table for the reproducer

set datafusion.explain.show_statistics = true;


create table t1(state string, city string, min_temp float, area int, time timestamp) as values 
    ('MA', 'Boston', 70.4, 1, 50),
    ('MA', 'Bedford', 71.59, 2, 150),
    ('CA', 'SF', 79.0, 1, 300),
    ('MA', 'Boston', 75.4, 3, 250),
    ('MA', 'Andover', 69.5, 4, 250),
    ('MA', 'Bedford', 78.2, 2, 150),
    ('MA', 'Boston', 65.0, 2, 250),
    ('CA', 'SJ', 78.4, 1, 300),
    ('MA', 'Reading', 53.0, 4, 250),
    ('CA', 'SJ', 75.4, 5, 350);

stats of timestamp filter is lost

explain select * from t1 where time <= to_timestamp(350);
+---------------+---------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                              |
+---------------+---------------------------------------------------------------------------------------------------+
| logical_plan  | Filter: t1.time <= TimestampNanosecond(350000000000, None)                                        |
|               |   TableScan: t1 projection=[state, city, min_temp, area, time]                                    |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent]               |
|               |   FilterExec: time@4 <= 350000000000, statistics=[Rows=Absent, Bytes=Absent]                      |
|               |     MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
|               |                                                                                                   |
+---------------+---------------------------------------------------------------------------------------------------+

stats of string filter is lost

explain select * from t1 where state = 'MA';
+---------------+---------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                              |
+---------------+---------------------------------------------------------------------------------------------------+
| logical_plan  | Filter: t1.state = Utf8("MA")                                                                     |
|               |   TableScan: t1 projection=[state, city, min_temp, area, time]                                    |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent]               |
|               |   FilterExec: state@0 = MA, statistics=[Rows=Absent, Bytes=Absent]                                |
|               |     MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
|               |                                                                                                   |
+---------------+---------------------------------------------------------------------------------------------------+

stats of integer filter becomes inexact

explain select * from t1 where area <= 100;
+---------------+---------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                              |
+---------------+---------------------------------------------------------------------------------------------------+
| logical_plan  | Filter: t1.area <= Int32(100)                                                                     |
|               |   TableScan: t1 projection=[state, city, min_temp, area, time]                                    |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Inexact(10), Bytes=Inexact(2960)]   |
|               |   FilterExec: area@3 <= 100, statistics=[Rows=Inexact(10), Bytes=Inexact(2960)]                   |
|               |     MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
|               |                                                                                                   |
+---------------+---------------------------------------------------------------------------------------------------+

stats of float filter becomes inexact

explain select * from t1 where min_temp = 10.0;
+---------------+---------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                              |
+---------------+---------------------------------------------------------------------------------------------------+
| logical_plan  | Filter: t1.area <= Int32(100)                                                                     |
|               |   TableScan: t1 projection=[state, city, min_temp, area, time]                                    |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Inexact(10), Bytes=Inexact(2960)]   |
|               |   FilterExec: area@3 <= 100, statistics=[Rows=Inexact(10), Bytes=Inexact(2960)]                   |
|               |     MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
|               |                                                                                                   |
+---------------+---------------------------------------------------------------------------------------------------+

stats of filter IN is lost for any data type & subquery

explain select * from t1 where area in (1, 2);
+---------------+---------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                              |
+---------------+---------------------------------------------------------------------------------------------------+
| logical_plan  | Filter: t1.area = Int32(1) OR t1.area = Int32(2)                                                  |
|               |   TableScan: t1 projection=[state, city, min_temp, area, time]                                    |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent]               |
|               |   FilterExec: area@3 = 1 OR area@3 = 2, statistics=[Rows=Absent, Bytes=Absent]                    |
|               |     MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
|               |                                                                                                   |
+---------------+---------------------------------------------------------------------------------------------------+

explain select * from t1 where city in ('Boston', 'Reading');
+---------------+---------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                              |
+---------------+---------------------------------------------------------------------------------------------------+
| logical_plan  | Filter: t1.city = Utf8("Boston") OR t1.city = Utf8("Reading")                                     |
|               |   TableScan: t1 projection=[state, city, min_temp, area, time]                                    |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent]               |
|               |   FilterExec: city@1 = Boston OR city@1 = Reading, statistics=[Rows=Absent, Bytes=Absent]         |
|               |     MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
|               |                                                                                                   |
+---------------+---------------------------------------------------------------------------------------------------+

explain select * from t1 where city in (select city from t1);
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                       |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | LeftSemi Join: t1.city = __correlated_sq_1.city                                                                            |
|               |   TableScan: t1 projection=[state, city, min_temp, area, time]                                                             |
|               |   SubqueryAlias: __correlated_sq_1                                                                                         |
|               |     TableScan: t1 projection=[city]                                                                                        |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent]                                        |
|               |   HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(city@1, city@0)], statistics=[Rows=Absent, Bytes=Absent]        |
|               |     CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Exact(10), Bytes=Exact(2960)]                            |
|               |       RepartitionExec: partitioning=Hash([city@1], 10), input_partitions=1, statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
|               |         MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)]                      |
|               |     CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Exact(10), Bytes=Exact(2960)]                            |
|               |       RepartitionExec: partitioning=Hash([city@0], 10), input_partitions=1, statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
|               |         MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)]                      |
|               |                                                                                                                            |
+---------------+----------------------------------------------------------------------------------------------------------------------------+

stats of join is lost

explain select * from t1, t1 as t2 where t1.city = t2.city;
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                       |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Inner Join: t1.city = t2.city                                                                                              |
|               |   TableScan: t1 projection=[state, city, min_temp, area, time]                                                             |
|               |   SubqueryAlias: t2                                                                                                        |
|               |     TableScan: t1 projection=[state, city, min_temp, area, time]                                                           |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent]                                        |
|               |   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(city@1, city@1)], statistics=[Rows=Absent, Bytes=Absent]           |
|               |     CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Exact(10), Bytes=Exact(2960)]                            |
|               |       RepartitionExec: partitioning=Hash([city@1], 10), input_partitions=1, statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
|               |         MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)]                      |
|               |     CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Exact(10), Bytes=Exact(2960)]                            |
|               |       RepartitionExec: partitioning=Hash([city@1], 10), input_partitions=1, statistics=[Rows=Exact(10), Bytes=Exact(2960)] |
|               |         MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)]                      |
|               |                                                                                                                            |
+---------------+----------------------------------------------------------------------------------------------------------------------------+

stats of aggregation becomes inexact

explain select city, max(min_temp) as max_min_temp from t1 group by city order by max_min_temp DESC limit 5;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                     |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Limit: skip=0, fetch=5                                                                                                                   |
|               |   Sort: max_min_temp DESC NULLS FIRST, fetch=5                                                                                           |
|               |     Projection: t1.city, MAX(t1.min_temp) AS max_min_temp                                                                                |
|               |       Aggregate: groupBy=[[t1.city]], aggr=[[MAX(t1.min_temp)]]                                                                          |
|               |         TableScan: t1 projection=[city, min_temp]                                                                                        |
| physical_plan | GlobalLimitExec: skip=0, fetch=5, statistics=[Rows=Inexact(5), Bytes=Absent]                                                             |
|               |   SortPreservingMergeExec: [max_min_temp@1 DESC], fetch=5, statistics=[Rows=Inexact(10), Bytes=Absent]                                   |
|               |     SortExec: TopK(fetch=5), expr=[max_min_temp@1 DESC], statistics=[Rows=Inexact(10), Bytes=Absent]                                     |
|               |       ProjectionExec: expr=[city@0 as city, MAX(t1.min_temp)@1 as max_min_temp], statistics=[Rows=Inexact(10), Bytes=Absent]             |
|               |         AggregateExec: mode=FinalPartitioned, gby=[city@0 as city], aggr=[MAX(t1.min_temp)], statistics=[Rows=Inexact(10), Bytes=Absent] |
|               |           CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Inexact(10), Bytes=Absent]                                       |
|               |             RepartitionExec: partitioning=Hash([city@0], 10), input_partitions=10, statistics=[Rows=Inexact(10), Bytes=Absent]           |
|               |               RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, statistics=[Rows=Inexact(10), Bytes=Absent]         |
|               |                 AggregateExec: mode=Partial, gby=[city@0 as city], aggr=[MAX(t1.min_temp)], statistics=[Rows=Inexact(10), Bytes=Absent]  |
|               |                   MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)]                          |
|               |                                                                                                                                          |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------+

Describe the solution you'd like

Have another enum value conservative keep the stats.

A few examples: Row statistics of
. Filter : same value as non-filter
. Join: Cartesian product of 2 inputs

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions