Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3bd909b

Browse files
committed
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equalnumber of workers and merges all of the results into a single tuplestream. It can also run the plan itself, if the workers areunavailable or haven't started up yet. It is intended to work withthe Partial Seq Scan node which will be added in future commits.It could also be used to implement parallel query of a different sortby itself, without help from Partial Seq Scan, if the single_copy modeis used. In that mode, a worker executes the plan, and the parallelleader does not, merely collecting the worker's results. So, a Gathernode could be inserted into a plan to split the execution of that planacross two processes. Nested Gather nodes aren't currently supported,but we might want to add support for that in the future.There's nothing in the planner to actually generate Gather nodes yet,so it's not quite time to break out the champagne. But we're gettingclose.Amit Kapila. Some designs suggestions were provided by me, and I alsoreviewed the patch. Single-copy mode, documentation, and other minorchanges also by me.
1 parent227d57f commit3bd909b

File tree

26 files changed

+709
-8
lines changed

26 files changed

+709
-8
lines changed

‎doc/src/sgml/config.sgml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1928,6 +1928,22 @@ include_dir 'conf.d'
19281928
</para>
19291929
</listitem>
19301930
</varlistentry>
1931+
1932+
<varlistentry id="guc-max-parallel-degree" xreflabel="max_parallel_degree">
1933+
<term><varname>max_parallel_degree</varname> (<type>integer</type>)
1934+
<indexterm>
1935+
<primary><varname>max_parallel_degree</> configuration parameter</primary>
1936+
</indexterm>
1937+
</term>
1938+
<listitem>
1939+
<para>
1940+
Sets the maximum degree of parallelism for an individual parallel
1941+
operation. Note that the requested number of workers may not actually
1942+
be available at runtime. Parallel workers are taken from the pool
1943+
of processes established by <xref linkend="guc-max-worker-processes">.
1944+
</para>
1945+
</listitem>
1946+
</varlistentry>
19311947
</variablelist>
19321948
</sect2>
19331949
</sect1>
@@ -3398,6 +3414,36 @@ include_dir 'conf.d'
33983414
</listitem>
33993415
</varlistentry>
34003416

3417+
<varlistentry id="parallel-tuple-cost" xreflabel="parallel_tuple_cost">
3418+
<term><varname>parallel_tuple_cost</varname> (<type>floating point</type>)
3419+
<indexterm>
3420+
<primary><varname>parallel_tuple_cost</> configuration parameter</primary>
3421+
</indexterm>
3422+
</term>
3423+
<listitem>
3424+
<para>
3425+
Sets the planner's estimate of the cost of transferring a tuple
3426+
from a parallel worker process to another process.
3427+
The default is 0.1.
3428+
</para>
3429+
</listitem>
3430+
</varlistentry>
3431+
3432+
<varlistentry id="parallel-setup-cost" xreflabel="parallel_setup_cost">
3433+
<term><varname>parallel_setup_cost</varname> (<type>floating point</type>)
3434+
<indexterm>
3435+
<primary><varname>parallel_setup_cost</> configuration parameter</primary>
3436+
</indexterm>
3437+
</term>
3438+
<listitem>
3439+
<para>
3440+
Sets the planner's estimate of the cost of launching parallel worker
3441+
processes.
3442+
The default is 1000.
3443+
</para>
3444+
</listitem>
3445+
</varlistentry>
3446+
34013447
<varlistentry id="guc-effective-cache-size" xreflabel="effective_cache_size">
34023448
<term><varname>effective_cache_size</varname> (<type>integer</type>)
34033449
<indexterm>

‎src/backend/commands/explain.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
853853
caseT_SampleScan:
854854
pname=sname="Sample Scan";
855855
break;
856+
caseT_Gather:
857+
pname=sname="Gather";
858+
break;
856859
caseT_IndexScan:
857860
pname=sname="Index Scan";
858861
break;
@@ -1276,6 +1279,22 @@ ExplainNode(PlanState *planstate, List *ancestors,
12761279
show_instrumentation_count("Rows Removed by Filter",1,
12771280
planstate,es);
12781281
break;
1282+
caseT_Gather:
1283+
{
1284+
Gather*gather= (Gather*)plan;
1285+
1286+
show_scan_qual(plan->qual,"Filter",planstate,ancestors,es);
1287+
if (plan->qual)
1288+
show_instrumentation_count("Rows Removed by Filter",1,
1289+
planstate,es);
1290+
ExplainPropertyInteger("Number of Workers",
1291+
gather->num_workers,es);
1292+
if (gather->single_copy)
1293+
ExplainPropertyText("Single Copy",
1294+
gather->single_copy ?"true" :"false",
1295+
es);
1296+
}
1297+
break;
12791298
caseT_FunctionScan:
12801299
if (es->verbose)
12811300
{

‎src/backend/executor/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
1717
execScan.o execTuples.o\
1818
execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o\
1919
nodeBitmapAnd.o nodeBitmapOr.o\
20-
nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.onodeHash.o\
21-
nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o\
20+
nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.onodeGather.o\
21+
nodeHash.onodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o\
2222
nodeLimit.o nodeLockRows.o\
2323
nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o\
2424
nodeNestloop.o nodeFunctionscan.o nodeRecursiveunion.o nodeResult.o\

‎src/backend/executor/execAmi.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include"executor/nodeCustom.h"
2525
#include"executor/nodeForeignscan.h"
2626
#include"executor/nodeFunctionscan.h"
27+
#include"executor/nodeGather.h"
2728
#include"executor/nodeGroup.h"
2829
#include"executor/nodeGroup.h"
2930
#include"executor/nodeHash.h"
@@ -160,6 +161,10 @@ ExecReScan(PlanState *node)
160161
ExecReScanSampleScan((SampleScanState*)node);
161162
break;
162163

164+
caseT_GatherState:
165+
ExecReScanGather((GatherState*)node);
166+
break;
167+
163168
caseT_IndexScanState:
164169
ExecReScanIndexScan((IndexScanState*)node);
165170
break;
@@ -467,6 +472,9 @@ ExecSupportsBackwardScan(Plan *node)
467472
/* Simplify life for tablesample methods by disallowing this */
468473
return false;
469474

475+
caseT_Gather:
476+
return false;
477+
470478
caseT_IndexScan:
471479
returnIndexSupportsBackwardScan(((IndexScan*)node)->indexid)&&
472480
TargetListSupportsBackwardScan(node->targetlist);

‎src/backend/executor/execMain.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,9 @@ standard_ExecutorRun(QueryDesc *queryDesc,
347347
direction,
348348
dest);
349349

350+
/* Allow nodes to release or shut down resources. */
351+
(void)ExecShutdownNode(queryDesc->planstate);
352+
350353
/*
351354
* shutdown tuple receiver, if we started it
352355
*/

‎src/backend/executor/execParallel.c

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ typedef struct ExecParallelInitializeDSMContext
7171
}ExecParallelInitializeDSMContext;
7272

7373
/* Helper functions that run in the parallel leader. */
74-
staticchar*ExecSerializePlan(Plan*plan,List*rangetable);
74+
staticchar*ExecSerializePlan(Plan*plan,EState*estate);
7575
staticboolExecParallelEstimate(PlanState*node,
7676
ExecParallelEstimateContext*e);
7777
staticboolExecParallelInitializeDSM(PlanState*node,
@@ -88,7 +88,7 @@ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
8888
* Create a serialized representation of the plan to be sent to each worker.
8989
*/
9090
staticchar*
91-
ExecSerializePlan(Plan*plan,List*rangetable)
91+
ExecSerializePlan(Plan*plan,EState*estate)
9292
{
9393
PlannedStmt*pstmt;
9494
ListCell*tlist;
@@ -125,13 +125,13 @@ ExecSerializePlan(Plan *plan, List *rangetable)
125125
pstmt->canSetTag=1;
126126
pstmt->transientPlan=0;
127127
pstmt->planTree=plan;
128-
pstmt->rtable=rangetable;
128+
pstmt->rtable=estate->es_range_table;
129129
pstmt->resultRelations=NIL;
130130
pstmt->utilityStmt=NULL;
131131
pstmt->subplans=NIL;
132132
pstmt->rewindPlanIDs=NULL;
133133
pstmt->rowMarks=NIL;
134-
pstmt->nParamExec=0;
134+
pstmt->nParamExec=estate->es_plannedstmt->nParamExec;
135135
pstmt->relationOids=NIL;
136136
pstmt->invalItems=NIL;/* workers can't replan anyway... */
137137
pstmt->hasRowSecurity= false;
@@ -271,7 +271,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
271271
pei->planstate=planstate;
272272

273273
/* Fix up and serialize plan to be sent to workers. */
274-
pstmt_data=ExecSerializePlan(planstate->plan,estate->es_range_table);
274+
pstmt_data=ExecSerializePlan(planstate->plan,estate);
275275

276276
/* Create a parallel context. */
277277
pcxt=CreateParallelContext(ParallelQueryMain,nworkers);
@@ -568,7 +568,6 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
568568
ExecutorStart(queryDesc,0);
569569
ExecutorRun(queryDesc,ForwardScanDirection,0L);
570570
ExecutorFinish(queryDesc);
571-
ExecutorEnd(queryDesc);
572571

573572
/* Report buffer usage during parallel execution. */
574573
buffer_usage=shm_toc_lookup(toc,PARALLEL_KEY_BUFFER_USAGE);
@@ -579,6 +578,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
579578
ExecParallelReportInstrumentation(queryDesc->planstate,
580579
instrumentation);
581580

581+
/* Must do this after capturing instrumentation. */
582+
ExecutorEnd(queryDesc);
583+
582584
/* Cleanup. */
583585
FreeQueryDesc(queryDesc);
584586
(*receiver->rDestroy) (receiver);

‎src/backend/executor/execProcnode.c

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
#include"executor/nodeMergejoin.h"
101101
#include"executor/nodeModifyTable.h"
102102
#include"executor/nodeNestloop.h"
103+
#include"executor/nodeGather.h"
103104
#include"executor/nodeRecursiveunion.h"
104105
#include"executor/nodeResult.h"
105106
#include"executor/nodeSamplescan.h"
@@ -113,6 +114,7 @@
113114
#include"executor/nodeValuesscan.h"
114115
#include"executor/nodeWindowAgg.h"
115116
#include"executor/nodeWorktablescan.h"
117+
#include"nodes/nodeFuncs.h"
116118
#include"miscadmin.h"
117119

118120

@@ -307,6 +309,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
307309
estate,eflags);
308310
break;
309311

312+
caseT_Gather:
313+
result= (PlanState*)ExecInitGather((Gather*)node,
314+
estate,eflags);
315+
break;
316+
310317
caseT_Hash:
311318
result= (PlanState*)ExecInitHash((Hash*)node,
312319
estate,eflags);
@@ -504,6 +511,10 @@ ExecProcNode(PlanState *node)
504511
result=ExecUnique((UniqueState*)node);
505512
break;
506513

514+
caseT_GatherState:
515+
result=ExecGather((GatherState*)node);
516+
break;
517+
507518
caseT_HashState:
508519
result=ExecHash((HashState*)node);
509520
break;
@@ -658,6 +669,10 @@ ExecEndNode(PlanState *node)
658669
ExecEndSampleScan((SampleScanState*)node);
659670
break;
660671

672+
caseT_GatherState:
673+
ExecEndGather((GatherState*)node);
674+
break;
675+
661676
caseT_IndexScanState:
662677
ExecEndIndexScan((IndexScanState*)node);
663678
break;
@@ -769,3 +784,34 @@ ExecEndNode(PlanState *node)
769784
break;
770785
}
771786
}
787+
788+
/*
789+
* ExecShutdownNode
790+
*
791+
* Give execution nodes a chance to stop asynchronous resource consumption
792+
* and release any resources still held. Currently, this is only used for
793+
* parallel query, but we might want to extend it to other cases also (e.g.
794+
* FDW). We might also want to call it sooner, as soon as it's evident that
795+
* no more rows will be needed (e.g. when a Limit is filled) rather than only
796+
* at the end of ExecutorRun.
797+
*/
798+
bool
799+
ExecShutdownNode(PlanState*node)
800+
{
801+
if (node==NULL)
802+
return false;
803+
804+
switch (nodeTag(node))
805+
{
806+
caseT_GatherState:
807+
{
808+
ExecShutdownGather((GatherState*)node);
809+
return true;
810+
}
811+
break;
812+
default:
813+
break;
814+
}
815+
816+
returnplanstate_tree_walker(node,ExecShutdownNode,NULL);
817+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp