Push Down Aggregation Operations
Aggregation pushdown is an optimization technique that moves the aggregation operation closer to the data source. SynxDB supports pushing down aggregation operations, which means computing the aggregation operator before the join operator.
In suitable scenarios, aggregation pushdown can significantly reduce the size of the input set for the join or aggregation operator, thereby improving the operator’s execution performance.
Note
In the optimizer logic of the native PostgreSQL kernel, aggregation operations in each query are always performed after all join operations are completed (excluding subqueries). Therefore, SynxDB introduces the aggregation pushdown feature, allowing it to choose to perform aggregation operations in advance in suitable scenarios.
To determine if the execution plan chosen by the optimizer has applied aggregation pushdown optimization, observe the positional relationship between Aggregation and Join in the execution plan tree. If an execution plan first performs a Partial Aggregation and then a Join operation, and finally a Final Aggregation, it indicates that the optimizer has applied aggregation pushdown.
Usage example
Before using this aggregation pushdown optimization, you need to manually enable the GUC parameter gp_enable_agg_pushdown
.
In addition, you need to manually set optimizer=off
to disable the GPORCA optimizer, as this optimization is currently effective only in the PostgreSQL optimizer.
The following is a usage example of aggregation pushdown optimization.
-- Create two tables, t1 and t2
CREATE TABLE t1(id INT, val1 INT);
CREATE TABLE t2(id INT, val2 INT);
SET OPTIMIZER=OFF; -- Disable the GPORCA optimizer
SET gp_enable_agg_pushdown=ON; -- Enable the GUC parameter
-- Execute a query with aggregation and join operations
EXPLAIN (COSTS OFF) SELECT id, SUM(val1) FROM t1 NATURAL JOIN t2 GROUP BY id;
QUERY PLAN
-----------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Finalize GroupAggregate
Group Key: t1.id
-> Sort
Sort Key: t1.id
-> Hash Join
Hash Cond: (t2.id = t1.id)
-> Seq Scan on t2
-> Hash
-> Partial HashAggregate
Group Key: t1.id
-> Seq Scan on t1
Optimizer: Postgres query optimizer
(13 rows)
From the execution plan result in the example above, you can see that before performing the HashJoin operation, SynxDB performs an aggregation operation on the t1 table based on the id column in advance. This aggregation operation does not compromise the correctness of the statement’s execution result and is likely to reduce the amount of data entering the HashJoin, thereby improving the efficiency of the statement’s execution.
Use cases
A significant query performance improvement is expected when using aggregation pushdown in the following scenarios.
Scenario one
Scenario description: Each piece of data in one table corresponds to multiple pieces of data in another table, and the two tables need to be joined for group aggregation.
For example, you need to join the order table (order_tbl
) and the order line item table (order_line_tbl
) and sum the prices of the corresponding order items for each order, that is, calculate the total amount SUM(price)
for each order:
SELECT o.order_id, SUM(price)
FROM order_tbl o, order_line_tbl ol
WHERE o.order_id = ol.order_id
GROUP BY o.order_id;
Execution method in the native PostgreSQL optimizer: The native PostgreSQL optimizer can only perform the aggregation operation after joining the two tables. Because each order item in the
order_line_tbl
table must have corresponding order information in theorder_tbl
table, this Join operator will not filter out any data.Execution method in SynxDB: Assuming each order contains an average of 10 order items, the data volume is expected to decrease by a factor of 10 after aggregation by the Aggregation operator. With aggregation pushdown enabled, the database will first aggregate the data in
order_line_tbl
byorder_id
. This reduces the amount of data passed to the Join operator by a factor of 10, thus significantly reducing the overhead of the Join operator. The corresponding execution plan is as follows:EXPLAIN SELECT o.order_id, SUM(price) FROM order_tbl o, order_line_tbl ol WHERE o.order_id = ol.order_id GROUP BY o.order_id; QUERY PLAN ----------------------------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) (cost=712.89..879.56 rows=10000 width=12) -> Finalize HashAggregate (cost=712.89..746.23 rows=3333 width=12) Group Key: o.order_id -> Hash Join (cost=617.00..696.23 rows=3333 width=12) Hash Cond: (ol.order_id = o.order_id) -> Partial HashAggregate (cost=538.00..571.38 rows=3338 width=12) Group Key: ol.order_id -> Seq Scan on order_line_tbl ol (cost=0.00..371.33 rows=33333 width=8) -> Hash (cost=37.33..37.33 rows=3333 width=4) -> Seq Scan on order_tbl o (cost=0.00..37.33 rows=3333 width=4) Optimizer: Postgres query optimizer
A similar scenario is: joining the project table (project
) and the experiment table (experiment
) to calculate the total experiment cost for each project project
over the past year. The corresponding reference SQL statement is as follows:
SELECT proj_name, sum(cost)
FROM experiment e, project p
WHERE e.e_pid = p.p_pid AND e.start_time > now() - interval '1 year'
GROUP BY proj_name;
For this query, with aggregation pushdown enabled, SynxDB will pre-aggregate the experiment
table by the e_pid
column, gathering the information for the same project
first.
However, if this query also has many filters on the project
table, it might currently cause the Join’s selectivity to be too high, leading to inefficient execution. Therefore, aggregation pushdown is temporarily not suitable for this situation.
Scenario two
Scenario description: The Join operator in the query statement significantly increases the result set size, and the final result needs to be calculated by grouping.
For example, to join the person_1
table with the person_2
table to find out how many different pairs each identical name can form between the two tables, the SQL query is as follows:
SELECT p1.name, COUNT(p1.name) FROM person_1 p1, person_2 p2 WHERE p1.name = p2.name GROUP BY p1.name;
In this example, if a certain name
appears X times in the p1
table and Y times in the p2
table, then this name
will appear X*Y times in the final result. If a large amount of data is in this situation, the final result set after the Join might be very large.
In the example above, if the aggregation operation is pushed down to the p1
or p2
side in advance, each name
will appear at most once on that side after aggregation, which can effectively reduce the overhead of the Join operator and the size of the input set that the subsequent Aggregation operator needs to process. The corresponding execution plan is as follows:
EXPLAIN SELECT p1.name, COUNT(p1.name) FROM person_1 p1, person_2 p2 WHERE p1.name = p2.name GROUP BY p1.name;
QUERY PLAN
-----------------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3) (cost=1758.62..1925.23 rows=9997 width=12)
-> Finalize HashAggregate (cost=1758.62..1791.94 rows=3332 width=12)
Group Key: p1.name
-> Hash Join (cost=762.93..1592.17 rows=33290 width=12)
Hash Cond: (p2.name = p1.name)
-> Seq Scan on p2 (cost=0.00..371.33 rows=33333 width=4)
-> Hash (cost=637.97..637.97 rows=9997 width=12)
-> Partial HashAggregate (cost=538.00..637.97 rows=9997 width=12)
Group Key: p1.name
-> Seq Scan on p1 (cost=0.00..371.33 rows=33333 width=4)
Optimizer: Postgres query optimizer
(11 rows)
Not recommended scenarios
It is difficult to achieve performance improvement through aggregation pushdown in the following scenarios, and it is not recommended to enable it.
Not applicable scenario one
Scenario description: Scenarios where the data volume does not change much after aggregation.
Contrary to scenario one and scenario two above, if performing Aggregation in advance does not change the data volume and cannot reduce the input set size for subsequent calculations, the Join operator should be executed first to avoid unnecessary overhead.
Not applicable scenario two
Scenario description: If the join key is different from the group key, aggregation pushdown will cause the pushed-down group key to change. In this case, the aggregation after rewriting the group key cannot reduce the data volume, leading to poor performance of the pushed-down aggregation:
SELECT t1.value, COUNT(*) FROM t1, t2 WHERE t1.key = t2.key GROUP BY t1.value;
For the query example above, directly pushing the aggregation down to the t1 side will lead to incorrect results. The specific details are similar to the restriction one scenario. To ensure the correctness of the calculation result, the group key equivalent to the actual pushed-down aggregation operation will become `GROUP BY t1.key, t1.value`.
In this case, if the key
and value
of the t1
table are completely unrelated, each group might have only a single tuple, so this aggregation pushdown will not produce any positive effect. However, if key
and value
are strongly correlated, or the same key
always corresponds to the same value
, the grouping effect will not be affected.
For the example above, grouping by t1.value
originally has a significant effect. But after aggregation pushdown, the group key becomes t1.key, t1.value
, and the correlation between key
and value
is weak, causing the grouping of this aggregation to have no significant effect.
Usage restrictions
This section describes some restrictions of the aggregation pushdown feature, including cases where this optimization cannot be applied logically and cases that are not yet supported in the engineering implementation.
Restriction one
Restriction description: Aggregation pushdown cannot be applied when filtering is done on columns other than the GROUP BY
column during the join and subsequent calculations. Consider the following SQL query:
SELECT id, SUM(val) FROM t1, t2 WHERE t1.id = t2.id AND t1.val > t2.val GROUP BY id;
In the example above, assume two tuples A and B from the t1
table with an id
of 100
, and a tuple C from the t2
table also with an id
of 100
.
During the join of AB and C, although A and B have the same id
, it cannot be guaranteed that they will both pass or fail the filter condition AB.val > C.val
simultaneously. In this case, summing the val
based on id
in advance will inevitably add the val
of A and B together. However, because they do not necessarily pass or fail the filter condition at the same time, this will lead to incorrect results.
In contrast, the following similar query example can apply aggregation pushdown:
SELECT id, SUM(val) FROM t1, t2 WHERE t1.id = t2.id AND t1.id < t2.id_thre GROUP BY id;
In this example, considering the same three tuples ABC as in the previous example, because the additional filter condition only uses the id
column from t1
, the two tuples AB with the same id
, when joined with a given tuple C, will either both pass or both fail the filter. Therefore, the val
of tuples AB can be summed in advance through an aggregation operation.
Restriction two
Restriction description: Pushing down aggregation to both sides of a Join simultaneously is not supported. Consider the following SQL query:
SELECT id, SUM(val) FROM t1, t2 WHERE t1.id = t2.id GROUP BY id;
We can actually rewrite the statement to get an equivalent one:
SELECT id, sum1 * cnt2 FROM
(SELECT id, SUM(val) FROM t1 GROUP BY id) AT1(id, sum1),
(SELECT id, COUNT(*) FROM t2 GROUP BY id) AT2(id, cnt2)
WHERE AT1.id = AT2.id GROUP BY id;
In this example, the aggregation operation is pushed down to both sides of the Join. For all tuples in the t1
table with an id
of 100
, SynxDB sums their val
in advance to get the corresponding sum1
.
During the actual join process, for each tuple in the t2
table with an id
of 100
, it will be joined with the tuple to which sum1
belongs, and a corresponding tuple will be obtained. This means that for every id
of 100
in t2
, sum1
will appear once in the final summation result. Therefore, SynxDB can pre-aggregate t2
to calculate that there are a total of cnt2
tuples with id
of 100
, and finally calculate the final result through sum1 * cnt2
.
Because this scenario involves relatively complex statement and expression rewriting, it is not currently supported in the product.