Support Parallel Processing of Window Functions. #1216
Replies: 3 comments 5 replies
-
When large amounts of data, the window function is performance low. If parallel execution is possible, the performance will be greatly improved. |
Beta Was this translation helpful? Give feedback.
-
Leverage parallel approach of MPP to implent executor parallelism seems to be a viable path. In fact, because Cloudberry data has distribution information, we can implement executor parallelism more easily than Postgres? |
Beta Was this translation helpful? Give feedback.
-
During develop, I found that the result of Window Agg without Order By clause is unstable. Referring to the SQL 2011 standard, it states that if ORDER BY is omitted, the order of rows in the partition is undefined. SELECT sum(unique1) OVER (ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING),
unique1, four
FROM tenk1
WHERE unique1 < 10; The case is in window.sql of regression. explain(costs off) SELECT sum(unique1) over (rows between current row and unbounded following),
unique1, four
FROM tenk1 WHERE unique1 < 10;
QUERY PLAN
--------------------------------------------------------------------
WindowAgg
Window: w1 AS (ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
-> Index Scan using tenk1_unique1 on tenk1
Index Cond: (unique1 < 10)
(4 rows)
regression=# SELECT sum(unique1) over (rows between current row and unbounded following),
unique1, four
FROM tenk1 WHERE unique1 < 10;
sum | unique1 | four
-----+---------+------
45 | 0 | 0
45 | 1 | 1
44 | 2 | 2
42 | 3 | 3
39 | 4 | 0
35 | 5 | 1
30 | 6 | 2
24 | 7 | 3
17 | 8 | 0
9 | 9 | 1
(10 rows) However, after setting enable_indexscan = off, the results changed: regression=# set enable_indexscan = off;
SET
regression=# SELECT sum(unique1) over (rows between current row and unbounded following),
unique1, four
FROM tenk1 WHERE unique1 < 10;
sum | unique1 | four
-----+---------+------
45 | 4 | 0
41 | 2 | 2
39 | 1 | 1
38 | 6 | 2
32 | 9 | 1
23 | 8 | 0
15 | 5 | 1
10 | 3 | 3
7 | 7 | 3
0 | 0 | 0
(10 rows)
regression=# explain(costs off) SELECT sum(unique1) over (rows between current row and unbounded following),
unique1, four
FROM tenk1 WHERE unique1 < 10;
QUERY PLAN
--------------------------------------------------------------------
WindowAgg
Window: w1 AS (ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
-> Seq Scan on tenk1
Filter: (unique1 < 10)
(4 rows) And the parallel process of Window function make it worse. To pass parallel test cases, we need to modify the SQL in that case. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Description
Building upon PostgreSQL and Greenplum, Cloudberry has implemented numerous parallel processing capabilities that significantly enhance query performance. Based on real customer requirements, there is an urgent need to support parallel processing of window functions, as most production environments contain SQL statements utilizing window functions. Window functions are critically important and widely used in complex production scenarios.
In PostgreSQL, window functions cannot be parallelized and can only be executed in the leader process of parallel plan. This limitation exists because PostgreSQL, as a single-machine database, lacks the concept of data distribution. When processing window functions, subnodes cannot guarantee outputting data to upper nodes according to PARTITION BY clauses.
In our distributed environment, parallel scan operators compete randomly between processes to read data. Therefore, even with distribution keys, data appears randomly distributed from the perspective of upper nodes.
Examining MPP distribution characteristics, when window functions contain PARTITION BY clauses, their semantics are very similar to GROUP BY. We can leverage the redistribution feature of Motion to enable parallel processing of window functions based on different PARTITION BY keys.
Furthermore, by implementing parallel processing of window functions, we can integrate their parallel paths with other operators, enabling end-to-end parallelization of entire SQL queries. Combined with our existing support for parallel Join, Union, and DISTINCT operations, this would further optimize complex queries like TPC-DS Q98.
Use case/motivation
No response
Related issues
No response
Are you willing to submit a PR?
Beta Was this translation helpful? Give feedback.
All reactions