Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9bdb300

Browse files
committed
Fix EXPLAIN ANALYZE for parallel HashAgg plans
Since1f39bce, HashAgg nodes have had the ability to spill to disk whenmemory consumption exceeds work_mem. That commit added new properties toEXPLAIN ANALYZE to show the maximum memory usage and disk usage, however,it didn't quite go as far as showing that information for parallelworkers. Since workers may have experienced something very different fromthe main process, we should show this information per worker, as is donein Sort.Reviewed-by: Justin PryzbyReviewed-by: Jeff DavisDiscussion:https://postgr.es/m/CAApHDvpEKbfZa18mM1TD7qV6PG+w97pwCWq5tVD0dX7e11gRJw@mail.gmail.comBackpatch-through: 13, where the hashagg spilling code was added.
1 parentf219167 commit9bdb300

File tree

5 files changed

+244
-17
lines changed

5 files changed

+244
-17
lines changed

‎src/backend/commands/explain.c

Lines changed: 96 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3051,29 +3051,111 @@ show_hashagg_info(AggState *aggstate, ExplainState *es)
30513051
Agg*agg= (Agg*)aggstate->ss.ps.plan;
30523052
int64memPeakKb= (aggstate->hash_mem_peak+1023) /1024;
30533053

3054-
Assert(IsA(aggstate,AggState));
3055-
30563054
if (agg->aggstrategy!=AGG_HASHED&&
30573055
agg->aggstrategy!=AGG_MIXED)
30583056
return;
30593057

3060-
if (es->costs&&aggstate->hash_planned_partitions>0)
3058+
if (es->format!=EXPLAIN_FORMAT_TEXT)
30613059
{
3062-
ExplainPropertyInteger("Planned Partitions",NULL,
3063-
aggstate->hash_planned_partitions,es);
3060+
3061+
if (es->costs&&aggstate->hash_planned_partitions>0)
3062+
{
3063+
ExplainPropertyInteger("Planned Partitions",NULL,
3064+
aggstate->hash_planned_partitions,es);
3065+
}
3066+
3067+
if (!es->analyze)
3068+
return;
3069+
3070+
/* EXPLAIN ANALYZE */
3071+
ExplainPropertyInteger("Peak Memory Usage","kB",memPeakKb,es);
3072+
if (aggstate->hash_batches_used>0)
3073+
{
3074+
ExplainPropertyInteger("Disk Usage","kB",
3075+
aggstate->hash_disk_used,es);
3076+
ExplainPropertyInteger("HashAgg Batches",NULL,
3077+
aggstate->hash_batches_used,es);
3078+
}
30643079
}
3080+
else
3081+
{
3082+
boolgotone= false;
30653083

3066-
if (!es->analyze)
3067-
return;
3084+
if (es->costs&&aggstate->hash_planned_partitions>0)
3085+
{
3086+
ExplainIndentText(es);
3087+
appendStringInfo(es->str,"Planned Partitions: %d",
3088+
aggstate->hash_planned_partitions);
3089+
gotone= true;
3090+
}
3091+
3092+
if (!es->analyze)
3093+
{
3094+
if (gotone)
3095+
appendStringInfoChar(es->str,'\n');
3096+
return;
3097+
}
3098+
3099+
if (!gotone)
3100+
ExplainIndentText(es);
3101+
else
3102+
appendStringInfoString(es->str," ");
3103+
3104+
appendStringInfo(es->str,"Peak Memory Usage: "INT64_FORMAT" kB",
3105+
memPeakKb);
30683106

3069-
/* EXPLAIN ANALYZE */
3070-
ExplainPropertyInteger("Peak Memory Usage","kB",memPeakKb,es);
3071-
if (aggstate->hash_batches_used>0)
3107+
if (aggstate->hash_batches_used>0)
3108+
appendStringInfo(es->str," Disk Usage: "UINT64_FORMAT" kB HashAgg Batches: %d",
3109+
aggstate->hash_disk_used,
3110+
aggstate->hash_batches_used);
3111+
appendStringInfoChar(es->str,'\n');
3112+
}
3113+
3114+
/* Display stats for each parallel worker */
3115+
if (es->analyze&&aggstate->shared_info!=NULL)
30723116
{
3073-
ExplainPropertyInteger("Disk Usage","kB",
3074-
aggstate->hash_disk_used,es);
3075-
ExplainPropertyInteger("HashAgg Batches",NULL,
3076-
aggstate->hash_batches_used,es);
3117+
for (intn=0;n<aggstate->shared_info->num_workers;n++)
3118+
{
3119+
AggregateInstrumentation*sinstrument;
3120+
uint64hash_disk_used;
3121+
inthash_batches_used;
3122+
3123+
sinstrument=&aggstate->shared_info->sinstrument[n];
3124+
hash_disk_used=sinstrument->hash_disk_used;
3125+
hash_batches_used=sinstrument->hash_batches_used;
3126+
memPeakKb= (sinstrument->hash_mem_peak+1023) /1024;
3127+
3128+
if (es->workers_state)
3129+
ExplainOpenWorker(n,es);
3130+
3131+
if (es->format==EXPLAIN_FORMAT_TEXT)
3132+
{
3133+
ExplainIndentText(es);
3134+
3135+
appendStringInfo(es->str,"Peak Memory Usage: "INT64_FORMAT" kB",
3136+
memPeakKb);
3137+
3138+
if (hash_batches_used>0)
3139+
appendStringInfo(es->str," Disk Usage: "UINT64_FORMAT" kB HashAgg Batches: %d",
3140+
hash_disk_used,hash_batches_used);
3141+
appendStringInfoChar(es->str,'\n');
3142+
}
3143+
else
3144+
{
3145+
ExplainPropertyInteger("Peak Memory Usage","kB",memPeakKb,
3146+
es);
3147+
if (hash_batches_used>0)
3148+
{
3149+
ExplainPropertyInteger("Disk Usage","kB",hash_disk_used,
3150+
es);
3151+
ExplainPropertyInteger("HashAgg Batches",NULL,
3152+
hash_batches_used,es);
3153+
}
3154+
}
3155+
3156+
if (es->workers_state)
3157+
ExplainCloseWorker(n,es);
3158+
}
30773159
}
30783160
}
30793161

‎src/backend/executor/execParallel.c

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include"executor/execParallel.h"
2727
#include"executor/executor.h"
28+
#include"executor/nodeAgg.h"
2829
#include"executor/nodeAppend.h"
2930
#include"executor/nodeBitmapHeapscan.h"
3031
#include"executor/nodeCustom.h"
@@ -288,7 +289,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
288289
/* even when not parallel-aware, for EXPLAIN ANALYZE */
289290
ExecIncrementalSortEstimate((IncrementalSortState*)planstate,e->pcxt);
290291
break;
291-
292+
caseT_AggState:
293+
/* even when not parallel-aware, for EXPLAIN ANALYZE */
294+
ExecAggEstimate((AggState*)planstate,e->pcxt);
295+
break;
292296
default:
293297
break;
294298
}
@@ -505,7 +509,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
505509
/* even when not parallel-aware, for EXPLAIN ANALYZE */
506510
ExecIncrementalSortInitializeDSM((IncrementalSortState*)planstate,d->pcxt);
507511
break;
508-
512+
caseT_AggState:
513+
/* even when not parallel-aware, for EXPLAIN ANALYZE */
514+
ExecAggInitializeDSM((AggState*)planstate,d->pcxt);
515+
break;
509516
default:
510517
break;
511518
}
@@ -1048,6 +1055,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
10481055
caseT_HashState:
10491056
ExecHashRetrieveInstrumentation((HashState*)planstate);
10501057
break;
1058+
caseT_AggState:
1059+
ExecAggRetrieveInstrumentation((AggState*)planstate);
1060+
break;
10511061
default:
10521062
break;
10531063
}
@@ -1336,7 +1346,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
13361346
ExecIncrementalSortInitializeWorker((IncrementalSortState*)planstate,
13371347
pwcxt);
13381348
break;
1339-
1349+
caseT_AggState:
1350+
/* even when not parallel-aware, for EXPLAIN ANALYZE */
1351+
ExecAggInitializeWorker((AggState*)planstate,pwcxt);
1352+
break;
13401353
default:
13411354
break;
13421355
}

‎src/backend/executor/nodeAgg.c

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@
240240
#include"postgres.h"
241241

242242
#include"access/htup_details.h"
243+
#include"access/parallel.h"
243244
#include"catalog/objectaccess.h"
244245
#include"catalog/pg_aggregate.h"
245246
#include"catalog/pg_proc.h"
@@ -4483,6 +4484,22 @@ ExecEndAgg(AggState *node)
44834484
intnumGroupingSets=Max(node->maxsets,1);
44844485
intsetno;
44854486

4487+
/*
4488+
* When ending a parallel worker, copy the statistics gathered by the
4489+
* worker back into shared memory so that it can be picked up by the main
4490+
* process to report in EXPLAIN ANALYZE.
4491+
*/
4492+
if (node->shared_info&&IsParallelWorker())
4493+
{
4494+
AggregateInstrumentation*si;
4495+
4496+
Assert(ParallelWorkerNumber <=node->shared_info->num_workers);
4497+
si=&node->shared_info->sinstrument[ParallelWorkerNumber];
4498+
si->hash_batches_used=node->hash_batches_used;
4499+
si->hash_disk_used=node->hash_disk_used;
4500+
si->hash_mem_peak=node->hash_mem_peak;
4501+
}
4502+
44864503
/* Make sure we have closed any open tuplesorts */
44874504

44884505
if (node->sort_in)
@@ -4854,3 +4871,89 @@ aggregate_dummy(PG_FUNCTION_ARGS)
48544871
fcinfo->flinfo->fn_oid);
48554872
return (Datum)0;/* keep compiler quiet */
48564873
}
4874+
4875+
/* ----------------------------------------------------------------
4876+
*Parallel Query Support
4877+
* ----------------------------------------------------------------
4878+
*/
4879+
4880+
/* ----------------------------------------------------------------
4881+
*ExecAggEstimate
4882+
*
4883+
*Estimate space required to propagate aggregate statistics.
4884+
* ----------------------------------------------------------------
4885+
*/
4886+
void
4887+
ExecAggEstimate(AggState*node,ParallelContext*pcxt)
4888+
{
4889+
Sizesize;
4890+
4891+
/* don't need this if not instrumenting or no workers */
4892+
if (!node->ss.ps.instrument||pcxt->nworkers==0)
4893+
return;
4894+
4895+
size=mul_size(pcxt->nworkers,sizeof(AggregateInstrumentation));
4896+
size=add_size(size, offsetof(SharedAggInfo,sinstrument));
4897+
shm_toc_estimate_chunk(&pcxt->estimator,size);
4898+
shm_toc_estimate_keys(&pcxt->estimator,1);
4899+
}
4900+
4901+
/* ----------------------------------------------------------------
4902+
*ExecAggInitializeDSM
4903+
*
4904+
*Initialize DSM space for aggregate statistics.
4905+
* ----------------------------------------------------------------
4906+
*/
4907+
void
4908+
ExecAggInitializeDSM(AggState*node,ParallelContext*pcxt)
4909+
{
4910+
Sizesize;
4911+
4912+
/* don't need this if not instrumenting or no workers */
4913+
if (!node->ss.ps.instrument||pcxt->nworkers==0)
4914+
return;
4915+
4916+
size= offsetof(SharedAggInfo,sinstrument)
4917+
+pcxt->nworkers*sizeof(AggregateInstrumentation);
4918+
node->shared_info=shm_toc_allocate(pcxt->toc,size);
4919+
/* ensure any unfilled slots will contain zeroes */
4920+
memset(node->shared_info,0,size);
4921+
node->shared_info->num_workers=pcxt->nworkers;
4922+
shm_toc_insert(pcxt->toc,node->ss.ps.plan->plan_node_id,
4923+
node->shared_info);
4924+
}
4925+
4926+
/* ----------------------------------------------------------------
4927+
*ExecAggInitializeWorker
4928+
*
4929+
*Attach worker to DSM space for aggregate statistics.
4930+
* ----------------------------------------------------------------
4931+
*/
4932+
void
4933+
ExecAggInitializeWorker(AggState*node,ParallelWorkerContext*pwcxt)
4934+
{
4935+
node->shared_info=
4936+
shm_toc_lookup(pwcxt->toc,node->ss.ps.plan->plan_node_id, true);
4937+
}
4938+
4939+
/* ----------------------------------------------------------------
4940+
*ExecAggRetrieveInstrumentation
4941+
*
4942+
*Transfer aggregate statistics from DSM to private memory.
4943+
* ----------------------------------------------------------------
4944+
*/
4945+
void
4946+
ExecAggRetrieveInstrumentation(AggState*node)
4947+
{
4948+
Sizesize;
4949+
SharedAggInfo*si;
4950+
4951+
if (node->shared_info==NULL)
4952+
return;
4953+
4954+
size= offsetof(SharedAggInfo,sinstrument)
4955+
+node->shared_info->num_workers*sizeof(AggregateInstrumentation);
4956+
si=palloc(size);
4957+
memcpy(si,node->shared_info,size);
4958+
node->shared_info=si;
4959+
}

‎src/include/executor/nodeAgg.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#ifndefNODEAGG_H
1515
#defineNODEAGG_H
1616

17+
#include"access/parallel.h"
1718
#include"nodes/execnodes.h"
1819

1920

@@ -323,4 +324,10 @@ extern void hash_agg_set_limits(double hashentrysize, uint64 input_groups,
323324
intused_bits,Size*mem_limit,
324325
uint64*ngroups_limit,int*num_partitions);
325326

327+
/* parallel instrumentation support */
328+
externvoidExecAggEstimate(AggState*node,ParallelContext*pcxt);
329+
externvoidExecAggInitializeDSM(AggState*node,ParallelContext*pcxt);
330+
externvoidExecAggInitializeWorker(AggState*node,ParallelWorkerContext*pwcxt);
331+
externvoidExecAggRetrieveInstrumentation(AggState*node);
332+
326333
#endif/* NODEAGG_H */

‎src/include/nodes/execnodes.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2101,6 +2101,27 @@ typedef struct GroupState
21012101
boolgrp_done;/* indicates completion of Group scan */
21022102
}GroupState;
21032103

2104+
/* ---------------------
2105+
*per-worker aggregate information
2106+
* ---------------------
2107+
*/
2108+
typedefstructAggregateInstrumentation
2109+
{
2110+
Sizehash_mem_peak;/* peak hash table memory usage */
2111+
uint64hash_disk_used;/* kB of disk space used */
2112+
inthash_batches_used;/* batches used during entire execution */
2113+
}AggregateInstrumentation;
2114+
2115+
/* ----------------
2116+
* Shared memory container for per-worker aggregate information
2117+
* ----------------
2118+
*/
2119+
typedefstructSharedAggInfo
2120+
{
2121+
intnum_workers;
2122+
AggregateInstrumentationsinstrument[FLEXIBLE_ARRAY_MEMBER];
2123+
}SharedAggInfo;
2124+
21042125
/* ---------------------
21052126
*AggState information
21062127
*
@@ -2190,6 +2211,7 @@ typedef struct AggState
21902211
AggStatePerGroup*all_pergroups;/* array of first ->pergroups, than
21912212
* ->hash_pergroup */
21922213
ProjectionInfo*combinedproj;/* projection machinery */
2214+
SharedAggInfo*shared_info;/* one entry per worker */
21932215
}AggState;
21942216

21952217
/* ----------------

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp