- 
        Couldn't load subscription status. 
- Fork 1.7k
Description
Describe the bug
I'm testing performance of querying a number of Parquet files, where I can make some assumptions about the Parquet files.
- Each Parquet file is already sorted on the column "timestamp".
- Each Parquet file does not overlap values on the column "timestamp". For instance, file A has values for timestamps for 2022, and file B has values for timestamps 2023.
The schema of the files are:
- "timestamp": TimestampMillisecond
- "value": Float64
Consider the following query and it's query plan:
SELECT timestamp, value 
FROM samples 
ORDER BY timestamp ASC+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | SortPreservingMergeExec: [timestamp@0 ASC], metrics=[output_rows=1000000, elapsed_compute=572.526968ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                   |   ParquetExec: file_groups={20 groups: [[0.parquet], [1.parquet], [2.parquet], [3.parquet], [4.parquet], ...]}, projection=[timestamp, value], output_ordering=[timestamp@0 ASC], metrics=[output_rows=1000000, elapsed_compute=20ns, num_predicate_creation_errors=0, predicate_evaluation_errors=0, bytes_scanned=57972, page_index_rows_filtered=0, row_groups_pruned=0, pushdown_rows_filtered=0, time_elapsed_processing=51.918935ms, page_index_eval_time=40ns, time_elapsed_scanning_total=48.94925ms, time_elapsed_opening=2.996325ms, time_elapsed_scanning_until_data=48.311008ms, pushdown_eval_time=40ns] |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
The 572 milliseconds on the SortPreservingMergeExec seems to be the bottleneck in the query, so I would like to optimize it.
Given the assumptions I can make about the Parquet files, I think that the SortPreservingMergeExec can be replaced by what is essentially a concatenation of each of the Parquet files.
What would be the best approach to remove the SortPreservingMergeExec?
My ideas:
- Manually re-partition the Parquet files into a single Parquet file using this new API: https://docs.rs/parquet/latest/parquet/file/writer/struct.SerializedRowGroupWriter.html#method.append_column
- I have an idea of implementing a custom PhysicalOptimizerRulethat looks for theSortPreservingMergeExec ParquetExecpattern, and replaces it with a concatenation instead.
But I would like to hear if there are any better ways.
Related
- Blog post about this optimization: https://www.influxdata.com/blog/making-recent-value-queries-hundreds-times-faster/
Infrastructure Tasks 🚧
-  Support mergeforDistribution#15290
- Support computing statistics for FileGroup #15432
-  Refactor: add FileGroupstructure forVec<PartitionedFile>#15379
- ListingTable statistics improperly merges statistics when files have different schemas #15689
-  Analysis to supportSortPreservingMerge-->ProgressiveEval#15191
-  Fix: after repartitioning, the PartitionedFileandFileGroupstatistics should be inexact/recomputed #15539
Major Tasks
-  Add statistics_by_partitionAPI toExecutionPlan#15495
-  Optimized version of SortPreservingMergethat doesn't actually compare sort keys of the key ranges are ordered #10316