Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita363bc6

Browse files
author
Etsuro Fujita
committed
Fix EXPLAIN ANALYZE for async-capable nodes.
EXPLAIN ANALYZE for an async-capable ForeignScan node associated withpostgres_fdw is done just by using instrumentation for ExecProcNode()called from the node's callbacks, causing the following problems:1) If the remote table to scan is empty, the node is incorrectly considered as "never executed" by the command even if the node is executed, as ExecProcNode() isn't called from the node's callbacks at all in that case.2) The command fails to collect timings for things other than ExecProcNode() done in the node, such as creating a cursor for the node's remote query.To fix these problems, add instrumentation for async-capable nodes, andmodify postgres_fdw accordingly.My oversight in commit27e1f14.While at it, update a comment for the AsyncRequest struct in execnodes.hand the documentation for the ForeignAsyncRequest API in fdwhandler.sgmlto match the code in ExecAsyncAppendResponse() in nodeAppend.c, and fixtypos in comments in nodeAppend.c.Per report from Andrey Lepikhov, though I didn't use his patch.Reviewed-by: Andrey LepikhovDiscussion:https://postgr.es/m/2eb662bb-105d-fc20-7412-2f027cc3ca72%40postgrespro.ru
1 parente135743 commita363bc6

File tree

14 files changed

+134
-26
lines changed

14 files changed

+134
-26
lines changed

‎contrib/auto_explain/auto_explain.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
314314
MemoryContextoldcxt;
315315

316316
oldcxt=MemoryContextSwitchTo(queryDesc->estate->es_query_cxt);
317-
queryDesc->totaltime=InstrAlloc(1,INSTRUMENT_ALL);
317+
queryDesc->totaltime=InstrAlloc(1,INSTRUMENT_ALL, false);
318318
MemoryContextSwitchTo(oldcxt);
319319
}
320320
}

‎contrib/pg_stat_statements/pg_stat_statements.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -974,7 +974,7 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
974974
MemoryContextoldcxt;
975975

976976
oldcxt=MemoryContextSwitchTo(queryDesc->estate->es_query_cxt);
977-
queryDesc->totaltime=InstrAlloc(1,INSTRUMENT_ALL);
977+
queryDesc->totaltime=InstrAlloc(1,INSTRUMENT_ALL, false);
978978
MemoryContextSwitchTo(oldcxt);
979979
}
980980
}

‎contrib/postgres_fdw/expected/postgres_fdw.out‎

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10051,6 +10051,21 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
1005110051
Filter: (t1_3.b === 505)
1005210052
(14 rows)
1005310053

10054+
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
10055+
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
10056+
QUERY PLAN
10057+
-------------------------------------------------------------------------
10058+
Limit (actual rows=1 loops=1)
10059+
-> Append (actual rows=1 loops=1)
10060+
-> Async Foreign Scan on async_p1 t1_1 (actual rows=0 loops=1)
10061+
Filter: (b === 505)
10062+
-> Async Foreign Scan on async_p2 t1_2 (actual rows=0 loops=1)
10063+
Filter: (b === 505)
10064+
-> Seq Scan on async_p3 t1_3 (actual rows=1 loops=1)
10065+
Filter: (b === 505)
10066+
Rows Removed by Filter: 101
10067+
(9 rows)
10068+
1005410069
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
1005510070
a | b | c
1005610071
------+-----+------
@@ -10132,18 +10147,32 @@ SELECT * FROM join_tbl ORDER BY a1;
1013210147
(3 rows)
1013310148

1013410149
DELETE FROM join_tbl;
10150+
DROP TABLE local_tbl;
10151+
DROP FOREIGN TABLE remote_tbl;
10152+
DROP FOREIGN TABLE insert_tbl;
10153+
DROP TABLE base_tbl3;
10154+
DROP TABLE base_tbl4;
1013510155
RESET enable_mergejoin;
1013610156
RESET enable_hashjoin;
10157+
-- Check EXPLAIN ANALYZE for a query that scans empty partitions asynchronously
10158+
DELETE FROM async_p1;
10159+
DELETE FROM async_p2;
10160+
DELETE FROM async_p3;
10161+
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
10162+
SELECT * FROM async_pt;
10163+
QUERY PLAN
10164+
-------------------------------------------------------------------------
10165+
Append (actual rows=0 loops=1)
10166+
-> Async Foreign Scan on async_p1 async_pt_1 (actual rows=0 loops=1)
10167+
-> Async Foreign Scan on async_p2 async_pt_2 (actual rows=0 loops=1)
10168+
-> Seq Scan on async_p3 async_pt_3 (actual rows=0 loops=1)
10169+
(4 rows)
10170+
1013710171
-- Clean up
1013810172
DROP TABLE async_pt;
1013910173
DROP TABLE base_tbl1;
1014010174
DROP TABLE base_tbl2;
1014110175
DROP TABLE result_tbl;
10142-
DROP TABLE local_tbl;
10143-
DROP FOREIGN TABLE remote_tbl;
10144-
DROP FOREIGN TABLE insert_tbl;
10145-
DROP TABLE base_tbl3;
10146-
DROP TABLE base_tbl4;
1014710176
DROP TABLE join_tbl;
1014810177
ALTER SERVER loopback OPTIONS (DROP async_capable);
1014910178
ALTER SERVER loopback2 OPTIONS (DROP async_capable);

‎contrib/postgres_fdw/postgres_fdw.c‎

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,7 +1542,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
15421542
&fsstate->param_values);
15431543

15441544
/* Set the async-capable flag */
1545-
fsstate->async_capable=node->ss.ps.plan->async_capable;
1545+
fsstate->async_capable=node->ss.ps.async_capable;
15461546
}
15471547

15481548
/*
@@ -6867,7 +6867,7 @@ produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
68676867
}
68686868

68696869
/* Get a tuple from the ForeignScan node */
6870-
result=ExecProcNode((PlanState*)node);
6870+
result=areq->requestee->ExecProcNodeReal(areq->requestee);
68716871
if (!TupIsNull(result))
68726872
{
68736873
/* Mark the request as complete */
@@ -6956,6 +6956,11 @@ process_pending_request(AsyncRequest *areq)
69566956
/* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
69576957
ExecAsyncResponse(areq);
69586958

6959+
/* Also, we do instrumentation ourselves, if required */
6960+
if (areq->requestee->instrument)
6961+
InstrUpdateTupleCount(areq->requestee->instrument,
6962+
TupIsNull(areq->result) ?0.0 :1.0);
6963+
69596964
MemoryContextSwitchTo(oldcontext);
69606965
}
69616966

‎contrib/postgres_fdw/sql/postgres_fdw.sql‎

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3195,6 +3195,8 @@ SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
31953195

31963196
EXPLAIN (VERBOSE, COSTS OFF)
31973197
SELECT*FROM async_pt t1WHEREt1.b===505LIMIT1;
3198+
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
3199+
SELECT*FROM async_pt t1WHEREt1.b===505LIMIT1;
31983200
SELECT*FROM async_pt t1WHEREt1.b===505LIMIT1;
31993201

32003202
-- Check with foreign modify
@@ -3226,19 +3228,28 @@ INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND
32263228
SELECT*FROM join_tblORDER BY a1;
32273229
DELETEFROM join_tbl;
32283230

3231+
DROPTABLE local_tbl;
3232+
DROP FOREIGN TABLE remote_tbl;
3233+
DROP FOREIGN TABLE insert_tbl;
3234+
DROPTABLE base_tbl3;
3235+
DROPTABLE base_tbl4;
3236+
32293237
RESET enable_mergejoin;
32303238
RESET enable_hashjoin;
32313239

3240+
-- Check EXPLAIN ANALYZE for a query that scans empty partitions asynchronously
3241+
DELETEFROM async_p1;
3242+
DELETEFROM async_p2;
3243+
DELETEFROM async_p3;
3244+
3245+
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
3246+
SELECT*FROM async_pt;
3247+
32323248
-- Clean up
32333249
DROPTABLE async_pt;
32343250
DROPTABLE base_tbl1;
32353251
DROPTABLE base_tbl2;
32363252
DROPTABLE result_tbl;
3237-
DROPTABLE local_tbl;
3238-
DROP FOREIGN TABLE remote_tbl;
3239-
DROP FOREIGN TABLE insert_tbl;
3240-
DROPTABLE base_tbl3;
3241-
DROPTABLE base_tbl4;
32423253
DROPTABLE join_tbl;
32433254

32443255
ALTER SERVER loopback OPTIONS (DROP async_capable);

‎doc/src/sgml/fdwhandler.sgml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1597,7 +1597,7 @@ ForeignAsyncRequest(AsyncRequest *areq);
15971597
<literal>areq-&gt;callback_pending</literal> to <literal>true</literal>
15981598
for the <structname>ForeignScan</structname> node to get a callback from
15991599
the callback functions described below. If no more tuples are available,
1600-
set the slot to NULL, and the
1600+
set the slot to NULL or an empty slot, and the
16011601
<literal>areq-&gt;request_complete</literal> flag to
16021602
<literal>true</literal>. It's recommended to use
16031603
<function>ExecAsyncRequestDone</function> or

‎src/backend/executor/execAsync.c‎

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include"postgres.h"
1616

1717
#include"executor/execAsync.h"
18+
#include"executor/executor.h"
1819
#include"executor/nodeAppend.h"
1920
#include"executor/nodeForeignscan.h"
2021

@@ -24,6 +25,13 @@
2425
void
2526
ExecAsyncRequest(AsyncRequest*areq)
2627
{
28+
if (areq->requestee->chgParam!=NULL)/* something changed? */
29+
ExecReScan(areq->requestee);/* let ReScan handle this */
30+
31+
/* must provide our own instrumentation support */
32+
if (areq->requestee->instrument)
33+
InstrStartNode(areq->requestee->instrument);
34+
2735
switch (nodeTag(areq->requestee))
2836
{
2937
caseT_ForeignScanState:
@@ -36,6 +44,11 @@ ExecAsyncRequest(AsyncRequest *areq)
3644
}
3745

3846
ExecAsyncResponse(areq);
47+
48+
/* must provide our own instrumentation support */
49+
if (areq->requestee->instrument)
50+
InstrStopNode(areq->requestee->instrument,
51+
TupIsNull(areq->result) ?0.0 :1.0);
3952
}
4053

4154
/*
@@ -48,6 +61,10 @@ ExecAsyncRequest(AsyncRequest *areq)
4861
void
4962
ExecAsyncConfigureWait(AsyncRequest*areq)
5063
{
64+
/* must provide our own instrumentation support */
65+
if (areq->requestee->instrument)
66+
InstrStartNode(areq->requestee->instrument);
67+
5168
switch (nodeTag(areq->requestee))
5269
{
5370
caseT_ForeignScanState:
@@ -58,6 +75,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq)
5875
elog(ERROR,"unrecognized node type: %d",
5976
(int)nodeTag(areq->requestee));
6077
}
78+
79+
/* must provide our own instrumentation support */
80+
if (areq->requestee->instrument)
81+
InstrStopNode(areq->requestee->instrument,0.0);
6182
}
6283

6384
/*
@@ -66,6 +87,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq)
6687
void
6788
ExecAsyncNotify(AsyncRequest*areq)
6889
{
90+
/* must provide our own instrumentation support */
91+
if (areq->requestee->instrument)
92+
InstrStartNode(areq->requestee->instrument);
93+
6994
switch (nodeTag(areq->requestee))
7095
{
7196
caseT_ForeignScanState:
@@ -78,6 +103,11 @@ ExecAsyncNotify(AsyncRequest *areq)
78103
}
79104

80105
ExecAsyncResponse(areq);
106+
107+
/* must provide our own instrumentation support */
108+
if (areq->requestee->instrument)
109+
InstrStopNode(areq->requestee->instrument,
110+
TupIsNull(areq->result) ?0.0 :1.0);
81111
}
82112

83113
/*

‎src/backend/executor/execMain.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1214,7 +1214,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
12141214
resultRelInfo->ri_TrigWhenExprs= (ExprState**)
12151215
palloc0(n*sizeof(ExprState*));
12161216
if (instrument_options)
1217-
resultRelInfo->ri_TrigInstrument=InstrAlloc(n,instrument_options);
1217+
resultRelInfo->ri_TrigInstrument=InstrAlloc(n,instrument_options, false);
12181218
}
12191219
else
12201220
{

‎src/backend/executor/execProcnode.c‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,8 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
407407

408408
/* Set up instrumentation for this node if requested */
409409
if (estate->es_instrument)
410-
result->instrument=InstrAlloc(1,estate->es_instrument);
410+
result->instrument=InstrAlloc(1,estate->es_instrument,
411+
result->async_capable);
411412

412413
returnresult;
413414
}

‎src/backend/executor/instrument.c‎

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ static void WalUsageAdd(WalUsage *dst, WalUsage *add);
2828

2929
/* Allocate new instrumentation structure(s) */
3030
Instrumentation*
31-
InstrAlloc(intn,intinstrument_options)
31+
InstrAlloc(intn,intinstrument_options,boolasync_mode)
3232
{
3333
Instrumentation*instr;
3434

@@ -46,6 +46,7 @@ InstrAlloc(int n, int instrument_options)
4646
instr[i].need_bufusage=need_buffers;
4747
instr[i].need_walusage=need_wal;
4848
instr[i].need_timer=need_timer;
49+
instr[i].async_mode=async_mode;
4950
}
5051
}
5152

@@ -82,6 +83,7 @@ InstrStartNode(Instrumentation *instr)
8283
void
8384
InstrStopNode(Instrumentation*instr,doublenTuples)
8485
{
86+
doublesave_tuplecount=instr->tuplecount;
8587
instr_timeendtime;
8688

8789
/* count the returned tuples */
@@ -114,6 +116,23 @@ InstrStopNode(Instrumentation *instr, double nTuples)
114116
instr->running= true;
115117
instr->firsttuple=INSTR_TIME_GET_DOUBLE(instr->counter);
116118
}
119+
else
120+
{
121+
/*
122+
* In async mode, if the plan node hadn't emitted any tuples before,
123+
* this might be the first tuple
124+
*/
125+
if (instr->async_mode&&save_tuplecount<1.0)
126+
instr->firsttuple=INSTR_TIME_GET_DOUBLE(instr->counter);
127+
}
128+
}
129+
130+
/* Update tuple count */
131+
void
132+
InstrUpdateTupleCount(Instrumentation*instr,doublenTuples)
133+
{
134+
/* count the returned tuples */
135+
instr->tuplecount+=nTuples;
117136
}
118137

119138
/* Finish a run cycle for a plan node */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp