Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit691b8d5

Browse files
committed
Allow for parallel execution whenever ExecutorRun() is done only once.
Previously, it was unsafe to execute a plan in parallel ifExecutorRun() might be called with a non-zero row count. However,it's quite easy to fix things up so that we can support that case,provided that it is known that we will never call ExecutorRun() asecond time for the same QueryDesc. Add infrastructure to signalthis, and cross-checks to make sure that a caller who claims this istrue doesn't later reneg.While that pattern never happens with queries received directly from aclient -- there's no way to know whether multiple Execute messageswill be sent unless the first one requests all the rows -- it's prettycommon for queries originating from procedural languages, which oftenlimit the result to a single tuple or to a user-specified number oftuples.This commit doesn't actually enable parallelism in any additionalcases, because currently none of the places that would be able tobenefit from this infrastructure pass CURSOR_OPT_PARALLEL_OK in thefirst place, but it makes it much more palatable to passCURSOR_OPT_PARALLEL_OK in places where we currently don't, because iteliminates some cases where we'd end up having to run the parallelplan serially.Patch by me, based on some ideas from Rafia Sabih and corrected byRafia Sabih based on feedback from Dilip Kumar and myself.Discussion:http://postgr.es/m/CA+TgmobXEhvHbJtWDuPZM9bVSLiTj-kShxQJ2uM5GPDze9fRYA@mail.gmail.com
1 parent218f515 commit691b8d5

File tree

19 files changed

+73
-38
lines changed

19 files changed

+73
-38
lines changed

‎contrib/auto_explain/auto_explain.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ void_PG_fini(void);
6161
staticvoidexplain_ExecutorStart(QueryDesc*queryDesc,inteflags);
6262
staticvoidexplain_ExecutorRun(QueryDesc*queryDesc,
6363
ScanDirectiondirection,
64-
uint64count);
64+
uint64count,boolexecute_once);
6565
staticvoidexplain_ExecutorFinish(QueryDesc*queryDesc);
6666
staticvoidexplain_ExecutorEnd(QueryDesc*queryDesc);
6767

@@ -257,15 +257,16 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
257257
* ExecutorRun hook: all we need do is track nesting depth
258258
*/
259259
staticvoid
260-
explain_ExecutorRun(QueryDesc*queryDesc,ScanDirectiondirection,uint64count)
260+
explain_ExecutorRun(QueryDesc*queryDesc,ScanDirectiondirection,
261+
uint64count,boolexecute_once)
261262
{
262263
nesting_level++;
263264
PG_TRY();
264265
{
265266
if (prev_ExecutorRun)
266-
prev_ExecutorRun(queryDesc,direction,count);
267+
prev_ExecutorRun(queryDesc,direction,count,execute_once);
267268
else
268-
standard_ExecutorRun(queryDesc,direction,count);
269+
standard_ExecutorRun(queryDesc,direction,count,execute_once);
269270
nesting_level--;
270271
}
271272
PG_CATCH();

‎contrib/pg_stat_statements/pg_stat_statements.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ static void pgss_post_parse_analyze(ParseState *pstate, Query *query);
290290
staticvoidpgss_ExecutorStart(QueryDesc*queryDesc,inteflags);
291291
staticvoidpgss_ExecutorRun(QueryDesc*queryDesc,
292292
ScanDirectiondirection,
293-
uint64count);
293+
uint64count,boolexecute_once);
294294
staticvoidpgss_ExecutorFinish(QueryDesc*queryDesc);
295295
staticvoidpgss_ExecutorEnd(QueryDesc*queryDesc);
296296
staticvoidpgss_ProcessUtility(PlannedStmt*pstmt,constchar*queryString,
@@ -871,15 +871,16 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
871871
* ExecutorRun hook: all we need do is track nesting depth
872872
*/
873873
staticvoid
874-
pgss_ExecutorRun(QueryDesc*queryDesc,ScanDirectiondirection,uint64count)
874+
pgss_ExecutorRun(QueryDesc*queryDesc,ScanDirectiondirection,uint64count,
875+
boolexecute_once)
875876
{
876877
nested_level++;
877878
PG_TRY();
878879
{
879880
if (prev_ExecutorRun)
880-
prev_ExecutorRun(queryDesc,direction,count);
881+
prev_ExecutorRun(queryDesc,direction,count,execute_once);
881882
else
882-
standard_ExecutorRun(queryDesc,direction,count);
883+
standard_ExecutorRun(queryDesc,direction,count,execute_once);
883884
nested_level--;
884885
}
885886
PG_CATCH();

‎src/backend/commands/copy.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2074,7 +2074,7 @@ CopyTo(CopyState cstate)
20742074
else
20752075
{
20762076
/* run the plan --- the dest receiver will send tuples */
2077-
ExecutorRun(cstate->queryDesc,ForwardScanDirection,0L);
2077+
ExecutorRun(cstate->queryDesc,ForwardScanDirection,0L, true);
20782078
processed= ((DR_copy*)cstate->queryDesc->dest)->processed;
20792079
}
20802080

‎src/backend/commands/createas.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString,
347347
ExecutorStart(queryDesc,GetIntoRelEFlags(into));
348348

349349
/* run the plan to completion */
350-
ExecutorRun(queryDesc,ForwardScanDirection,0L);
350+
ExecutorRun(queryDesc,ForwardScanDirection,0L, true);
351351

352352
/* save the rowcount if we're given a completionTag to fill */
353353
if (completionTag)

‎src/backend/commands/explain.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
530530
dir=ForwardScanDirection;
531531

532532
/* run the plan */
533-
ExecutorRun(queryDesc,dir,0L);
533+
ExecutorRun(queryDesc,dir,0L, true);
534534

535535
/* run cleanup too */
536536
ExecutorFinish(queryDesc);

‎src/backend/commands/extension.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,7 @@ execute_sql_string(const char *sql, const char *filename)
742742
dest,NULL,0);
743743

744744
ExecutorStart(qdesc,0);
745-
ExecutorRun(qdesc,ForwardScanDirection,0);
745+
ExecutorRun(qdesc,ForwardScanDirection,0, true);
746746
ExecutorFinish(qdesc);
747747
ExecutorEnd(qdesc);
748748

‎src/backend/commands/matview.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
424424
ExecutorStart(queryDesc,EXEC_FLAG_WITHOUT_OIDS);
425425

426426
/* run the plan */
427-
ExecutorRun(queryDesc,ForwardScanDirection,0L);
427+
ExecutorRun(queryDesc,ForwardScanDirection,0L, true);
428428

429429
processed=queryDesc->estate->es_processed;
430430

‎src/backend/commands/portalcmds.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ PersistHoldablePortal(Portal portal)
395395
true);
396396

397397
/* Fetch the result set into the tuplestore */
398-
ExecutorRun(queryDesc,ForwardScanDirection,0L);
398+
ExecutorRun(queryDesc,ForwardScanDirection,0L, false);
399399

400400
(*queryDesc->dest->rDestroy) (queryDesc->dest);
401401
queryDesc->dest=NULL;

‎src/backend/commands/prepare.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause,
301301
*/
302302
PortalStart(portal,paramLI,eflags,GetActiveSnapshot());
303303

304-
(void)PortalRun(portal,count, false,dest,dest,completionTag);
304+
(void)PortalRun(portal,count, false,true,dest,dest,completionTag);
305305

306306
PortalDrop(portal, false);
307307

‎src/backend/executor/execMain.c

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ static void ExecutePlan(EState *estate, PlanState *planstate,
8585
boolsendTuples,
8686
uint64numberTuples,
8787
ScanDirectiondirection,
88-
DestReceiver*dest);
88+
DestReceiver*dest,
89+
boolexecute_once);
8990
staticboolExecCheckRTEPerms(RangeTblEntry*rte);
9091
staticboolExecCheckRTEPermsModified(OidrelOid,Oiduserid,
9192
Bitmapset*modifiedCols,
@@ -288,17 +289,18 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
288289
*/
289290
void
290291
ExecutorRun(QueryDesc*queryDesc,
291-
ScanDirectiondirection,uint64count)
292+
ScanDirectiondirection,uint64count,
293+
boolexecute_once)
292294
{
293295
if (ExecutorRun_hook)
294-
(*ExecutorRun_hook) (queryDesc,direction,count);
296+
(*ExecutorRun_hook) (queryDesc,direction,count,execute_once);
295297
else
296-
standard_ExecutorRun(queryDesc,direction,count);
298+
standard_ExecutorRun(queryDesc,direction,count,execute_once);
297299
}
298300

299301
void
300302
standard_ExecutorRun(QueryDesc*queryDesc,
301-
ScanDirectiondirection,uint64count)
303+
ScanDirectiondirection,uint64count,boolexecute_once)
302304
{
303305
EState*estate;
304306
CmdTypeoperation;
@@ -345,14 +347,21 @@ standard_ExecutorRun(QueryDesc *queryDesc,
345347
* run plan
346348
*/
347349
if (!ScanDirectionIsNoMovement(direction))
350+
{
351+
if (execute_once&&queryDesc->already_executed)
352+
elog(ERROR,"can't re-execute query flagged for single execution");
353+
queryDesc->already_executed= true;
354+
348355
ExecutePlan(estate,
349356
queryDesc->planstate,
350357
queryDesc->plannedstmt->parallelModeNeeded,
351358
operation,
352359
sendTuples,
353360
count,
354361
direction,
355-
dest);
362+
dest,
363+
execute_once);
364+
}
356365

357366
/*
358367
* shutdown tuple receiver, if we started it
@@ -1595,7 +1604,8 @@ ExecutePlan(EState *estate,
15951604
boolsendTuples,
15961605
uint64numberTuples,
15971606
ScanDirectiondirection,
1598-
DestReceiver*dest)
1607+
DestReceiver*dest,
1608+
boolexecute_once)
15991609
{
16001610
TupleTableSlot*slot;
16011611
uint64current_tuple_count;
@@ -1611,12 +1621,12 @@ ExecutePlan(EState *estate,
16111621
estate->es_direction=direction;
16121622

16131623
/*
1614-
* Ifa tuple count was supplied, we must force the plan to run without
1615-
* parallelism, because we might exit early. Also disable parallelism
1616-
* when writing into a relation, because no database changes are allowed
1617-
* in parallel mode.
1624+
* Ifthe plan might potentially be executed multiple times, we must force
1625+
*it to run withoutparallelism, because we might exit early. Also
1626+
*disable parallelismwhen writing into a relation, because no database
1627+
*changes are allowedin parallel mode.
16181628
*/
1619-
if (numberTuples||dest->mydest==DestIntoRel)
1629+
if (!execute_once||dest->mydest==DestIntoRel)
16201630
use_parallel_mode= false;
16211631

16221632
if (use_parallel_mode)
@@ -1687,7 +1697,11 @@ ExecutePlan(EState *estate,
16871697
*/
16881698
current_tuple_count++;
16891699
if (numberTuples&&numberTuples==current_tuple_count)
1700+
{
1701+
/* Allow nodes to release or shut down resources. */
1702+
(void)ExecShutdownNode(planstate);
16901703
break;
1704+
}
16911705
}
16921706

16931707
if (use_parallel_mode)

‎src/backend/executor/execParallel.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
853853
ExecParallelInitializeWorker(queryDesc->planstate,toc);
854854

855855
/* Run the plan */
856-
ExecutorRun(queryDesc,ForwardScanDirection,0L);
856+
ExecutorRun(queryDesc,ForwardScanDirection,0L, true);
857857

858858
/* Shut down the executor */
859859
ExecutorFinish(queryDesc);

‎src/backend/executor/functions.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -855,7 +855,7 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
855855
/* Run regular commands to completion unless lazyEval */
856856
uint64count= (es->lazyEval) ?1 :0;
857857

858-
ExecutorRun(es->qd,ForwardScanDirection,count);
858+
ExecutorRun(es->qd,ForwardScanDirection,count, !fcache->returnsSet|| !es->lazyEval);
859859

860860
/*
861861
* If we requested run to completion OR there was no tuple returned,

‎src/backend/executor/spi.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2305,7 +2305,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
23052305

23062306
ExecutorStart(queryDesc,eflags);
23072307

2308-
ExecutorRun(queryDesc,ForwardScanDirection,tcount);
2308+
ExecutorRun(queryDesc,ForwardScanDirection,tcount, true);
23092309

23102310
_SPI_current->processed=queryDesc->estate->es_processed;
23112311
_SPI_current->lastoid=queryDesc->estate->es_lastoid;

‎src/backend/tcop/postgres.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,6 +1101,7 @@ exec_simple_query(const char *query_string)
11011101
(void)PortalRun(portal,
11021102
FETCH_ALL,
11031103
isTopLevel,
1104+
true,
11041105
receiver,
11051106
receiver,
11061107
completionTag);
@@ -1985,6 +1986,7 @@ exec_execute_message(const char *portal_name, long max_rows)
19851986
completed=PortalRun(portal,
19861987
max_rows,
19871988
true,/* always top level */
1989+
!execute_is_fetch&&max_rows==FETCH_ALL,
19881990
receiver,
19891991
receiver,
19901992
completionTag);

‎src/backend/tcop/pquery.c

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ CreateQueryDesc(PlannedStmt *plannedstmt,
9090
qd->planstate=NULL;
9191
qd->totaltime=NULL;
9292

93+
/* not yet executed */
94+
qd->already_executed= false;
95+
9396
returnqd;
9497
}
9598

@@ -152,7 +155,7 @@ ProcessQuery(PlannedStmt *plan,
152155
/*
153156
* Run the plan to completion.
154157
*/
155-
ExecutorRun(queryDesc,ForwardScanDirection,0L);
158+
ExecutorRun(queryDesc,ForwardScanDirection,0L, true);
156159

157160
/*
158161
* Build command completion status string, if caller wants one.
@@ -679,7 +682,7 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
679682
* suspended due to exhaustion of the count parameter.
680683
*/
681684
bool
682-
PortalRun(Portalportal,longcount,boolisTopLevel,
685+
PortalRun(Portalportal,longcount,boolisTopLevel,boolrun_once,
683686
DestReceiver*dest,DestReceiver*altdest,
684687
char*completionTag)
685688
{
@@ -712,6 +715,10 @@ PortalRun(Portal portal, long count, bool isTopLevel,
712715
*/
713716
MarkPortalActive(portal);
714717

718+
/* Set run_once flag. Shouldn't be clear if previously set. */
719+
Assert(!portal->run_once||run_once);
720+
portal->run_once=run_once;
721+
715722
/*
716723
* Set up global portal context pointers.
717724
*
@@ -918,7 +925,8 @@ PortalRunSelect(Portal portal,
918925
else
919926
{
920927
PushActiveSnapshot(queryDesc->snapshot);
921-
ExecutorRun(queryDesc,direction, (uint64)count);
928+
ExecutorRun(queryDesc,direction, (uint64)count,
929+
portal->run_once);
922930
nprocessed=queryDesc->estate->es_processed;
923931
PopActiveSnapshot();
924932
}
@@ -957,7 +965,8 @@ PortalRunSelect(Portal portal,
957965
else
958966
{
959967
PushActiveSnapshot(queryDesc->snapshot);
960-
ExecutorRun(queryDesc,direction, (uint64)count);
968+
ExecutorRun(queryDesc,direction, (uint64)count,
969+
portal->run_once);
961970
nprocessed=queryDesc->estate->es_processed;
962971
PopActiveSnapshot();
963972
}
@@ -1394,6 +1403,9 @@ PortalRunFetch(Portal portal,
13941403
*/
13951404
MarkPortalActive(portal);
13961405

1406+
/* If supporting FETCH, portal can't be run-once. */
1407+
Assert(!portal->run_once);
1408+
13971409
/*
13981410
* Set up global portal context pointers.
13991411
*/

‎src/include/executor/execdesc.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ typedef struct QueryDesc
4747
EState*estate;/* executor's query-wide state */
4848
PlanState*planstate;/* tree of per-plan-node state */
4949

50+
/* This field is set by ExecutorRun */
51+
boolalready_executed;/* true if previously executed */
52+
5053
/* This is always set NULL by the core system, but plugins can change it */
5154
structInstrumentation*totaltime;/* total time spent in ExecutorRun */
5255
}QueryDesc;

‎src/include/executor/executor.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ extern PGDLLIMPORT ExecutorStart_hook_type ExecutorStart_hook;
8181
/* Hook for plugins to get control in ExecutorRun() */
8282
typedefvoid (*ExecutorRun_hook_type) (QueryDesc*queryDesc,
8383
ScanDirectiondirection,
84-
uint64count);
84+
uint64count,
85+
boolexecute_once);
8586
externPGDLLIMPORTExecutorRun_hook_typeExecutorRun_hook;
8687

8788
/* Hook for plugins to get control in ExecutorFinish() */
@@ -176,9 +177,9 @@ extern TupleTableSlot *ExecFilterJunk(JunkFilter *junkfilter,
176177
externvoidExecutorStart(QueryDesc*queryDesc,inteflags);
177178
externvoidstandard_ExecutorStart(QueryDesc*queryDesc,inteflags);
178179
externvoidExecutorRun(QueryDesc*queryDesc,
179-
ScanDirectiondirection,uint64count);
180+
ScanDirectiondirection,uint64count,boolexecute_once);
180181
externvoidstandard_ExecutorRun(QueryDesc*queryDesc,
181-
ScanDirectiondirection,uint64count);
182+
ScanDirectiondirection,uint64count,boolexecute_once);
182183
externvoidExecutorFinish(QueryDesc*queryDesc);
183184
externvoidstandard_ExecutorFinish(QueryDesc*queryDesc);
184185
externvoidExecutorEnd(QueryDesc*queryDesc);

‎src/include/tcop/pquery.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ extern void PortalSetResultFormat(Portal portal, int nFormats,
3434
int16*formats);
3535

3636
externboolPortalRun(Portalportal,longcount,boolisTopLevel,
37-
DestReceiver*dest,DestReceiver*altdest,
37+
boolrun_once,DestReceiver*dest,DestReceiver*altdest,
3838
char*completionTag);
3939

4040
externuint64PortalRunFetch(Portalportal,

‎src/include/utils/portal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ typedef struct PortalData
141141
/* Features/options */
142142
PortalStrategystrategy;/* see above */
143143
intcursorOptions;/* DECLARE CURSOR option bits */
144+
boolrun_once;/* portal will only be run once */
144145

145146
/* Status data */
146147
PortalStatusstatus;/* see above */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp