Skip to content

Commit 0e3389c

Browse files
committed
Switch to Streaming HashAgg for multiple phase aggregation.
This commit changes the Postgres planner to use Streaming HashAgg for multi-phase aggregation plans, matching Orca's approach. While non-streaming aggregation may spill to disk when data per segment is largely unique, streaming incurs minimal overhead and proves beneficial overall based on empirical evidence. Authored-by: Zhang Mingli avamingli@gmail.com
1 parent eaf15c5 commit 0e3389c

26 files changed

Lines changed: 69 additions & 69 deletions

src/backend/cdb/cdbgroupingpaths.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1264,7 +1264,7 @@ add_first_stage_hash_agg_path(PlannerInfo *root,
12641264
ctx->partial_grouping_target,
12651265
AGG_HASHED,
12661266
ctx->hasAggs ? AGGSPLIT_INITIAL_SERIAL : AGGSPLIT_SIMPLE,
1267-
false, /* streaming */
1267+
true, /* streaming */
12681268
ctx->groupClause,
12691269
NIL,
12701270
ctx->agg_partial_costs,

src/test/regress/expected/agg_pushdown.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ AS c1 ON c1.parent = p.x GROUP BY p.i;
204204
-> Redistribute Motion 3:3 (slice2; segments: 3)
205205
Output: p.i, (PARTIAL avg(c1.v))
206206
Hash Key: p.i
207-
-> Partial HashAggregate
207+
-> Streaming Partial HashAggregate
208208
Output: p.i, PARTIAL avg(c1.v)
209209
Group Key: p.i
210210
-> Hash Join

src/test/regress/expected/aggregates.out

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1315,7 +1315,7 @@ explain (costs off) select a,c from t1 group by a,c,d;
13151315
Group Key: a, c, d
13161316
-> Redistribute Motion 3:3 (slice2; segments: 3)
13171317
Hash Key: a, c, d
1318-
-> HashAggregate
1318+
-> Streaming HashAggregate
13191319
Group Key: a, c, d
13201320
-> Seq Scan on t1
13211321
Optimizer: Postgres query optimizer
@@ -3317,7 +3317,7 @@ having sum(tgb1.v3 * tgb2.v3) > 100 and
33173317
AggRefs(TargetList): [(0, 0)]
33183318
-> Redistribute Motion 3:3 (slice2; segments: 3) (cost=145944.41..145974.41 rows=1000 width=84)
33193319
Hash Key: tgb2.v2
3320-
-> Partial HashAggregate (cost=145944.41..145954.41 rows=1000 width=84)
3320+
-> Streaming Partial HashAggregate (cost=145944.41..145954.41 rows=1000 width=84)
33213321
Group Key: tgb2.v2
33223322
AggRefs(TargetList): [(0, 0), (1, 1), (2, 2), (3, 3)]
33233323
-> Hash Join (cost=618.25..105488.34 rows=2022803 width=20)
@@ -3372,7 +3372,7 @@ having sum(tgb1.v3 * tgb2.v3) > 100 and
33723372
AggRefs(TargetList): [(0, 0)]
33733373
-> Redistribute Motion 3:3 (slice2; segments: 3) (cost=145944.41..145974.41 rows=1000 width=84)
33743374
Hash Key: tgb2.v2
3375-
-> Partial HashAggregate (cost=145944.41..145954.41 rows=1000 width=84)
3375+
-> Streaming Partial HashAggregate (cost=145944.41..145954.41 rows=1000 width=84)
33763376
Group Key: tgb2.v2
33773377
AggRefs(TargetList): [(0, 0), (1, 1), (2, 2), (3, 3)]
33783378
-> Hash Join (cost=618.25..105488.34 rows=2022803 width=20)
@@ -3417,7 +3417,7 @@ explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
34173417
Group Key: v2, v3
34183418
-> Redistribute Motion 3:3 (slice2; segments: 3)
34193419
Hash Key: v2, v3
3420-
-> HashAggregate
3420+
-> Streaming HashAggregate
34213421
Group Key: v2, v3
34223422
-> Seq Scan on pds_t1
34233423
Optimizer: Postgres query optimizer
@@ -3432,7 +3432,7 @@ explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
34323432
Group Key: v2, v3
34333433
-> Redistribute Motion 3:3 (slice2; segments: 3)
34343434
Hash Key: v2, v3
3435-
-> HashAggregate
3435+
-> Streaming HashAggregate
34363436
Group Key: v2, v3
34373437
-> Seq Scan on pds_t1
34383438
Optimizer: Postgres query optimizer
@@ -3447,7 +3447,7 @@ explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
34473447
Group Key: v2, v3
34483448
-> Redistribute Motion 3:3 (slice2; segments: 3)
34493449
Hash Key: v2, v3
3450-
-> HashAggregate
3450+
-> Streaming HashAggregate
34513451
Group Key: v2, v3
34523452
-> Seq Scan on pds_t1
34533453
Optimizer: Postgres query optimizer
@@ -3462,7 +3462,7 @@ explain (costs off) select v2,v3,v4,v5,v6 from pds_t1 group by v2,v3,v4,v5,v6;
34623462
Group Key: v2, v3, v4, v5, v6
34633463
-> Redistribute Motion 3:3 (slice2; segments: 3)
34643464
Hash Key: v2, v3, v4, v5, v6
3465-
-> HashAggregate
3465+
-> Streaming HashAggregate
34663466
Group Key: v2, v3, v4, v5, v6
34673467
-> Seq Scan on pds_t1
34683468
Optimizer: Postgres query optimizer

src/test/regress/expected/aqumv.out

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3412,7 +3412,7 @@ select c, b, sum(a), count(b) from t0 group by b, c;
34123412
-> Redistribute Motion 3:3 (slice2; segments: 3)
34133413
Output: c, b, (PARTIAL sum(a)), (PARTIAL count(b))
34143414
Hash Key: b, c
3415-
-> Partial HashAggregate
3415+
-> Streaming Partial HashAggregate
34163416
Output: c, b, PARTIAL sum(a), PARTIAL count(b)
34173417
Group Key: t0.b, t0.c
34183418
-> Seq Scan on aqumv.t0
@@ -3468,7 +3468,7 @@ select b, sum(a), c, count(b) from t0 group by c, b;
34683468
-> Redistribute Motion 3:3 (slice2; segments: 3)
34693469
Output: b, c, (PARTIAL sum(a)), (PARTIAL count(b))
34703470
Hash Key: c, b
3471-
-> Partial HashAggregate
3471+
-> Streaming Partial HashAggregate
34723472
Output: b, c, PARTIAL sum(a), PARTIAL count(b)
34733473
Group Key: t0.c, t0.b
34743474
-> Seq Scan on aqumv.t0
@@ -3524,7 +3524,7 @@ select b + c + 1, sum(a) + count(b) from t0 group by c, b;
35243524
-> Redistribute Motion 3:3 (slice2; segments: 3)
35253525
Output: c, b, (PARTIAL sum(a)), (PARTIAL count(b))
35263526
Hash Key: c, b
3527-
-> Partial HashAggregate
3527+
-> Streaming Partial HashAggregate
35283528
Output: c, b, PARTIAL sum(a), PARTIAL count(b)
35293529
Group Key: t0.c, t0.b
35303530
-> Seq Scan on aqumv.t0
@@ -3580,7 +3580,7 @@ select c, count(b) from t0 group by c ;
35803580
-> Redistribute Motion 3:3 (slice2; segments: 3)
35813581
Output: c, (PARTIAL count(b))
35823582
Hash Key: c
3583-
-> Partial HashAggregate
3583+
-> Streaming Partial HashAggregate
35843584
Output: c, PARTIAL count(b)
35853585
Group Key: t0.c
35863586
-> Seq Scan on aqumv.t0
@@ -3612,7 +3612,7 @@ select c, count(b) from t0 group by c ;
36123612
-> Redistribute Motion 3:3 (slice2; segments: 3)
36133613
Output: c, (PARTIAL count(b))
36143614
Hash Key: c
3615-
-> Partial HashAggregate
3615+
-> Streaming Partial HashAggregate
36163616
Output: c, PARTIAL count(b)
36173617
Group Key: t0.c
36183618
-> Seq Scan on aqumv.t0
@@ -3645,7 +3645,7 @@ select c, b, count(b) from t0 where a > 3 group by c, b;
36453645
-> Redistribute Motion 3:3 (slice2; segments: 3)
36463646
Output: c, b, (PARTIAL count(b))
36473647
Hash Key: c, b
3648-
-> Partial HashAggregate
3648+
-> Streaming Partial HashAggregate
36493649
Output: c, b, PARTIAL count(b)
36503650
Group Key: t0.c, t0.b
36513651
-> Seq Scan on aqumv.t0
@@ -3696,7 +3696,7 @@ select count(b), b, c from t0 where a > 3 group by b, c;
36963696
-> Redistribute Motion 3:3 (slice2; segments: 3)
36973697
Output: b, c, (PARTIAL count(b))
36983698
Hash Key: b, c
3699-
-> Partial HashAggregate
3699+
-> Streaming Partial HashAggregate
37003700
Output: b, c, PARTIAL count(b)
37013701
Group Key: t0.b, t0.c
37023702
-> Seq Scan on aqumv.t0
@@ -3747,7 +3747,7 @@ select count(b) + 1, b + 1, c from t0 where a > 3 group by b, c;
37473747
-> Redistribute Motion 3:3 (slice2; segments: 3)
37483748
Output: c, b, (PARTIAL count(b))
37493749
Hash Key: b, c
3750-
-> Partial HashAggregate
3750+
-> Streaming Partial HashAggregate
37513751
Output: c, b, PARTIAL count(b)
37523752
Group Key: t0.b, t0.c
37533753
-> Seq Scan on aqumv.t0
@@ -3798,7 +3798,7 @@ select b, c, count(b) from t0 where a > 3 and b > 1 group by b, c;
37983798
-> Redistribute Motion 3:3 (slice2; segments: 3)
37993799
Output: b, c, (PARTIAL count(b))
38003800
Hash Key: b, c
3801-
-> Partial HashAggregate
3801+
-> Streaming Partial HashAggregate
38023802
Output: b, c, PARTIAL count(b)
38033803
Group Key: t0.b, t0.c
38043804
-> Seq Scan on aqumv.t0
@@ -3828,7 +3828,7 @@ select b, c, count(b) from t0 where a > 3 and b > 1 group by b, c;
38283828
-> Redistribute Motion 3:3 (slice2; segments: 3)
38293829
Output: b, c, (PARTIAL count(b))
38303830
Hash Key: b, c
3831-
-> Partial HashAggregate
3831+
-> Streaming Partial HashAggregate
38323832
Output: b, c, PARTIAL count(b)
38333833
Group Key: t0.b, t0.c
38343834
-> Seq Scan on aqumv.t0

src/test/regress/expected/bfv_aggregate.out

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,7 +1686,7 @@ SELECT a.x, sum(b.x) FROM pagg_tab1 a FULL OUTER JOIN pagg_tab2 b ON a.x = b.y G
16861686
Group Key: a.x
16871687
-> Redistribute Motion 3:3 (slice2; segments: 3)
16881688
Hash Key: a.x
1689-
-> Partial HashAggregate
1689+
-> Streaming Partial HashAggregate
16901690
Group Key: a.x
16911691
-> Hash Full Join
16921692
Hash Cond: (a.x = b.y)
@@ -1961,7 +1961,7 @@ explain (verbose on, costs off) select ex2.b/2, sum(ex1.a) from ex1, (select a,
19611961
-> Redistribute Motion 3:3 (slice2; segments: 3)
19621962
Output: ((COALESCE(ex2.b, 1) / 2)), (PARTIAL sum(ex1.a))
19631963
Hash Key: ((COALESCE(ex2.b, 1) / 2))
1964-
-> Partial HashAggregate
1964+
-> Streaming Partial HashAggregate
19651965
Output: ((COALESCE(ex2.b, 1) / 2)), PARTIAL sum(ex1.a)
19661966
Group Key: (COALESCE(ex2.b, 1) / 2)
19671967
-> Hash Join
@@ -1999,7 +1999,7 @@ explain (verbose on, costs off) SELECT b/2, sum(b) * (b/2) FROM ex1 GROUP BY b/
19991999
-> Redistribute Motion 3:3 (slice2; segments: 3)
20002000
Output: ((b / 2)), (PARTIAL sum(b))
20012001
Hash Key: ((b / 2))
2002-
-> Partial HashAggregate
2002+
-> Streaming Partial HashAggregate
20032003
Output: ((b / 2)), PARTIAL sum(b)
20042004
Group Key: (ex1.b / 2)
20052005
-> Seq Scan on bfv_aggregate.ex1

src/test/regress/expected/cbdb_parallel.out

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3105,7 +3105,7 @@ select distinct a from t_distinct_0;
31053105
Group Key: a
31063106
-> Redistribute Motion 3:3 (slice2; segments: 3)
31073107
Hash Key: a
3108-
-> HashAggregate
3108+
-> Streaming HashAggregate
31093109
Group Key: a
31103110
-> Seq Scan on t_distinct_0
31113111
Optimizer: Postgres query optimizer
@@ -3286,14 +3286,14 @@ select distinct a from t_distinct_0 union select distinct b from t_distinct_0;
32863286
Group Key: t_distinct_0.a
32873287
-> Redistribute Motion 3:3 (slice3; segments: 3)
32883288
Hash Key: t_distinct_0.a
3289-
-> HashAggregate
3289+
-> Streaming HashAggregate
32903290
Group Key: t_distinct_0.a
32913291
-> Seq Scan on t_distinct_0
32923292
-> HashAggregate
32933293
Group Key: t_distinct_0_1.b
32943294
-> Redistribute Motion 3:3 (slice4; segments: 3)
32953295
Hash Key: t_distinct_0_1.b
3296-
-> HashAggregate
3296+
-> Streaming HashAggregate
32973297
Group Key: t_distinct_0_1.b
32983298
-> Seq Scan on t_distinct_0 t_distinct_0_1
32993299
Optimizer: Postgres query optimizer
@@ -3459,7 +3459,7 @@ WHERE e.salary > (
34593459
Group Key: employees.department_id
34603460
-> Redistribute Motion 3:3 (slice3; segments: 3) (cost=90.50..122.67 rows=990 width=36)
34613461
Hash Key: employees.department_id
3462-
-> Partial HashAggregate (cost=90.50..102.87 rows=990 width=36)
3462+
-> Streaming Partial HashAggregate (cost=90.50..102.87 rows=990 width=36)
34633463
Group Key: employees.department_id
34643464
-> Seq Scan on employees (cost=0.00..71.67 rows=3767 width=36)
34653465
Optimizer: Postgres query optimizer

src/test/regress/expected/cte_prune.out

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1230,11 +1230,11 @@ LIMIT 10;
12301230
-> HashAggregate (cost=102580688686565152.00..102596190789768272.00 rows=247107000000000 width=48)
12311231
Output: cup.c, cup.d, cup.e, t.d, t.b
12321232
Group Key: cup.c, cup.d, cup.e, t.d, t.b
1233-
Planned Partitions: 512
1233+
Planned Partitions: 256
12341234
-> Redistribute Motion 3:3 (slice3; segments: 3) (cost=93196181903903024.00..102459992361252656.00 rows=741321000000000 width=48)
12351235
Output: cup.c, cup.d, cup.e, t.d, t.b
12361236
Hash Key: cup.c, cup.d, cup.e, t.d, t.b
1237-
-> HashAggregate (cost=93196181903903024.00..102445165941252656.00 rows=741321000000000 width=48)
1237+
-> Streaming HashAggregate (cost=93196181903903024.00..102445165941252656.00 rows=741321000000000 width=48)
12381238
Output: cup.c, cup.d, cup.e, t.d, t.b
12391239
Group Key: cup.c, cup.d, cup.e, t.d, t.b
12401240
Planned Partitions: 512
@@ -1590,7 +1590,7 @@ order by OUTERMOST_FOO.region,bad_headofstates.headofstate LIMIT 40;
15901590
-> Redistribute Motion 3:3 (slice4; segments: 3) (cost=2625.14..2655.14 rows=1000 width=64)
15911591
Output: country_2.region, (PARTIAL avg(country_2.population))
15921592
Hash Key: country_2.region
1593-
-> Partial HashAggregate (cost=2625.14..2635.14 rows=1000 width=64)
1593+
-> Streaming Partial HashAggregate (cost=2625.14..2635.14 rows=1000 width=64)
15941594
Output: country_2.region, PARTIAL avg(country_2.population)
15951595
Group Key: country_2.region
15961596
-> Hash Join (cost=1662.17..2425.51 rows=39927 width=36)

src/test/regress/expected/direct_dispatch.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ explain (costs off) select gp_segment_id, count(*) from bar_randDistr group by g
659659
Group Key: gp_segment_id
660660
-> Redistribute Motion 3:3 (slice2; segments: 3)
661661
Hash Key: gp_segment_id
662-
-> Partial HashAggregate
662+
-> Streaming Partial HashAggregate
663663
Group Key: gp_segment_id
664664
-> Seq Scan on bar_randdistr
665665
Optimizer: Postgres query optimizer

src/test/regress/expected/eagerfree.out

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ explain analyze select d, count(*) from smallt group by d;
4949
Peak Memory Usage: 0 kB
5050
-> Redistribute Motion 3:3 (slice2; segments: 3) (cost=1.50..3.47 rows=18 width=12) (actual time=0.395..1.643 rows=7 loops=1)
5151
Hash Key: d
52-
-> Partial HashAggregate (cost=1.50..1.68 rows=18 width=12) (actual time=0.058..0.061 rows=10 loops=1)
52+
-> Streaming Partial HashAggregate (cost=1.50..1.68 rows=18 width=12) (actual time=0.000..0.000 rows=10 loops=1)
5353
Group Key: d
5454
Peak Memory Usage: 0 kB
5555
-> Seq Scan on smallt (cost=0.00..1.33 rows=33 width=4) (actual time=0.017..0.024 rows=50 loops=1)
@@ -183,7 +183,7 @@ where t1.d = t2.d;
183183
Peak Memory Usage: 0 kB
184184
-> Redistribute Motion 3:3 (slice2; segments: 3) (cost=1.50..3.47 rows=18 width=12) (actual time=0.659..1.284 rows=7 loops=1)
185185
Hash Key: smallt.d
186-
-> Partial HashAggregate (cost=1.50..1.68 rows=18 width=12) (actual time=0.055..0.058 rows=10 loops=1)
186+
-> Streaming Partial HashAggregate (cost=1.50..1.68 rows=18 width=12) (actual time=0.000..0.000 rows=10 loops=1)
187187
Group Key: smallt.d
188188
Peak Memory Usage: 0 kB
189189
-> Seq Scan on smallt (cost=0.00..1.33 rows=33 width=4) (actual time=0.015..0.022 rows=50 loops=1)
@@ -193,7 +193,7 @@ where t1.d = t2.d;
193193
Peak Memory Usage: 0 kB
194194
-> Redistribute Motion 3:3 (slice3; segments: 3) (cost=1.50..3.47 rows=18 width=12) (actual time=0.003..0.470 rows=7 loops=1)
195195
Hash Key: smallt_1.d
196-
-> Partial HashAggregate (cost=1.50..1.68 rows=18 width=12) (actual time=0.084..0.090 rows=10 loops=1)
196+
-> Streaming Partial HashAggregate (cost=1.50..1.68 rows=18 width=12) (actual time=0.000..0.000 rows=10 loops=1)
197197
Group Key: smallt_1.d
198198
Peak Memory Usage: 0 kB
199199
-> Seq Scan on smallt smallt_1 (cost=0.00..1.33 rows=33 width=8) (actual time=0.022..0.033 rows=50 loops=1)
@@ -282,7 +282,7 @@ explain analyze select d, count(*) from smallt group by d limit 5;
282282
-> Sort (cost=2.05..2.10 rows=18 width=12) (actual time=0.173..0.178 rows=10 loops=1)
283283
Sort Key: d
284284
Sort Method: quicksort Memory: 81kB
285-
-> Partial HashAggregate (cost=1.50..1.68 rows=18 width=12) (actual time=0.108..0.130 rows=10 loops=1)
285+
-> Streaming Partial HashAggregate (cost=1.50..1.68 rows=18 width=12) (actual time=0.000..0.000 rows=10 loops=1)
286286
Group Key: d
287287
Extra Text: (seg0) Hash chain length 1.1 avg, 2 max, using 9 of 32 buckets; total 0 expansions.
288288
-> Seq Scan on smallt (cost=0.00..1.33 rows=33 width=4) (actual time=0.025..0.036 rows=50 loops=1)

src/test/regress/expected/gp_aggregates_costs.out

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ explain select avg(b) from cost_agg_t1 group by c;
2222
Group Key: c
2323
-> Redistribute Motion 3:3 (slice2; segments: 3) (cost=5449.00..5509.00 rows=2000 width=36)
2424
Hash Key: c
25-
-> Partial HashAggregate (cost=5449.00..5469.00 rows=2000 width=36)
25+
-> Streaming Partial HashAggregate (cost=5449.00..5469.00 rows=2000 width=36)
2626
Group Key: c
2727
-> Seq Scan on cost_agg_t1 (cost=0.00..3782.33 rows=333333 width=8)
2828
Optimizer: Postgres query optimizer
@@ -61,7 +61,7 @@ explain select avg(b) from cost_agg_t2 group by c;
6161
Planned Partitions: 8
6262
-> Redistribute Motion 3:3 (slice2; segments: 3) (cost=6538.00..9602.35 rows=102145 width=36)
6363
Hash Key: c
64-
-> Partial HashAggregate (cost=6538.00..7559.45 rows=102145 width=36)
64+
-> Streaming Partial HashAggregate (cost=6538.00..7574.83 rows=103683 width=36)
6565
Group Key: c
6666
Planned Partitions: 8
6767
-> Seq Scan on cost_agg_t2 (cost=0.00..4538.00 rows=400000 width=8)
@@ -103,7 +103,7 @@ explain (costs off) select b, sum(a) from t_planner_force_multi_stage group by b
103103
Finalize HashAggregate
104104
Group Key: b
105105
-> Gather Motion 3:1 (slice1; segments: 3)
106-
-> Partial HashAggregate
106+
-> Streaming Partial HashAggregate
107107
Group Key: b
108108
-> Seq Scan on t_planner_force_multi_stage
109109
Optimizer: Postgres query optimizer

0 commit comments

Comments
 (0)