Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf0661c4

Browse files
committed
Make sequential scans parallel-aware.
In addition, this path fills in a number of missing bits and pieces inthe parallel infrastructure. Paths and plans now have a parallel_awareflag indicating whether whatever parallel-aware logic they have shouldbe engaged. It is believed that we will need this flag for a number ofpath/plan types, not just sequential scans, which is why the flag isgeneric rather than part of the SeqScan structures specifically.Also, execParallel.c now gives parallel nodes a chance to initializetheir PlanState nodes from the DSM during parallel worker startup.Amit Kapila, with a fair amount of adjustment by me. Review of previouspatch versions by Haribabu Kommi and others.
1 parentf764ecd commitf0661c4

File tree

18 files changed

+254
-73
lines changed

18 files changed

+254
-73
lines changed

‎src/backend/commands/explain.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
984984
appendStringInfoString(es->str,"-> ");
985985
es->indent+=2;
986986
}
987+
if (plan->parallel_aware)
988+
appendStringInfoString(es->str,"Parallel ");
987989
appendStringInfoString(es->str,pname);
988990
es->indent++;
989991
}
@@ -1000,6 +1002,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
10001002
ExplainPropertyText("Subplan Name",plan_name,es);
10011003
if (custom_name)
10021004
ExplainPropertyText("Custom Plan Provider",custom_name,es);
1005+
if (plan->parallel_aware)
1006+
ExplainPropertyText("Parallel Aware","true",es);
10031007
}
10041008

10051009
switch (nodeTag(plan))

‎src/backend/executor/execAmi.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,15 @@ ExecSupportsBackwardScan(Plan *node)
439439
if (node==NULL)
440440
return false;
441441

442+
/*
443+
* Parallel-aware nodes return a subset of the tuples in each worker,
444+
* and in general we can't expect to have enough bookkeeping state to
445+
* know which ones we returned in this worker as opposed to some other
446+
* worker.
447+
*/
448+
if (node->parallel_aware)
449+
return false;
450+
442451
switch (nodeTag(node))
443452
{
444453
caseT_Result:

‎src/backend/executor/execParallel.c

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include"executor/execParallel.h"
2727
#include"executor/executor.h"
28+
#include"executor/nodeSeqscan.h"
2829
#include"executor/tqueue.h"
2930
#include"nodes/nodeFuncs.h"
3031
#include"optimizer/planmain.h"
@@ -167,10 +168,16 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
167168
/* Count this node. */
168169
e->nnodes++;
169170

170-
/*
171-
* XXX. Call estimators for parallel-aware nodes here, when we have
172-
* some.
173-
*/
171+
/* Call estimators for parallel-aware nodes. */
172+
switch (nodeTag(planstate))
173+
{
174+
caseT_SeqScanState:
175+
ExecSeqScanEstimate((SeqScanState*)planstate,
176+
e->pcxt);
177+
break;
178+
default:
179+
break;
180+
}
174181

175182
returnplanstate_tree_walker(planstate,ExecParallelEstimate,e);
176183
}
@@ -205,10 +212,16 @@ ExecParallelInitializeDSM(PlanState *planstate,
205212
/* Count this node. */
206213
d->nnodes++;
207214

208-
/*
209-
* XXX. Call initializers for parallel-aware plan nodes, when we have
210-
* some.
211-
*/
215+
/* Call initializers for parallel-aware plan nodes. */
216+
switch (nodeTag(planstate))
217+
{
218+
caseT_SeqScanState:
219+
ExecSeqScanInitializeDSM((SeqScanState*)planstate,
220+
d->pcxt);
221+
break;
222+
default:
223+
break;
224+
}
212225

213226
returnplanstate_tree_walker(planstate,ExecParallelInitializeDSM,d);
214227
}
@@ -574,6 +587,30 @@ ExecParallelReportInstrumentation(PlanState *planstate,
574587
instrumentation);
575588
}
576589

590+
/*
591+
* Initialize the PlanState and its descendents with the information
592+
* retrieved from shared memory. This has to be done once the PlanState
593+
* is allocated and initialized by executor; that is, after ExecutorStart().
594+
*/
595+
staticbool
596+
ExecParallelInitializeWorker(PlanState*planstate,shm_toc*toc)
597+
{
598+
if (planstate==NULL)
599+
return false;
600+
601+
/* Call initializers for parallel-aware plan nodes. */
602+
switch (nodeTag(planstate))
603+
{
604+
caseT_SeqScanState:
605+
ExecSeqScanInitializeWorker((SeqScanState*)planstate,toc);
606+
break;
607+
default:
608+
break;
609+
}
610+
611+
returnplanstate_tree_walker(planstate,ExecParallelInitializeWorker,toc);
612+
}
613+
577614
/*
578615
* Main entrypoint for parallel query worker processes.
579616
*
@@ -610,6 +647,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
610647

611648
/* Start up the executor, have it run the plan, and then shut it down. */
612649
ExecutorStart(queryDesc,0);
650+
ExecParallelInitializeWorker(queryDesc->planstate,toc);
613651
ExecutorRun(queryDesc,ForwardScanDirection,0L);
614652
ExecutorFinish(queryDesc);
615653

‎src/backend/executor/nodeSeqscan.c

Lines changed: 103 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
*ExecInitSeqScancreates and initializes a seqscan node.
2020
*ExecEndSeqScanreleases any storage allocated.
2121
*ExecReScanSeqScanrescans the relation
22+
*
23+
*ExecSeqScanEstimateestimates DSM space needed for parallel scan
24+
*ExecSeqScanInitializeDSM initialize DSM for parallel scan
25+
*ExecSeqScanInitializeWorker attach to DSM info in parallel worker
2226
*/
2327
#include"postgres.h"
2428

@@ -53,10 +57,22 @@ SeqNext(SeqScanState *node)
5357
/*
5458
* get information from the estate and scan state
5559
*/
56-
scandesc=node->ss_currentScanDesc;
57-
estate=node->ps.state;
60+
scandesc=node->ss.ss_currentScanDesc;
61+
estate=node->ss.ps.state;
5862
direction=estate->es_direction;
59-
slot=node->ss_ScanTupleSlot;
63+
slot=node->ss.ss_ScanTupleSlot;
64+
65+
if (scandesc==NULL)
66+
{
67+
/*
68+
* We reach here if the scan is not parallel, or if we're executing
69+
* a scan that was intended to be parallel serially.
70+
*/
71+
scandesc=heap_beginscan(node->ss.ss_currentRelation,
72+
estate->es_snapshot,
73+
0,NULL);
74+
node->ss.ss_currentScanDesc=scandesc;
75+
}
6076

6177
/*
6278
* get the next tuple from the table
@@ -123,27 +139,19 @@ static void
123139
InitScanRelation(SeqScanState*node,EState*estate,inteflags)
124140
{
125141
RelationcurrentRelation;
126-
HeapScanDesccurrentScanDesc;
127142

128143
/*
129144
* get the relation object id from the relid'th entry in the range table,
130145
* open that relation and acquire appropriate lock on it.
131146
*/
132147
currentRelation=ExecOpenScanRelation(estate,
133-
((SeqScan*)node->ps.plan)->scanrelid,
148+
((SeqScan*)node->ss.ps.plan)->scanrelid,
134149
eflags);
135150

136-
/* initialize a heapscan */
137-
currentScanDesc=heap_beginscan(currentRelation,
138-
estate->es_snapshot,
139-
0,
140-
NULL);
141-
142-
node->ss_currentRelation=currentRelation;
143-
node->ss_currentScanDesc=currentScanDesc;
151+
node->ss.ss_currentRelation=currentRelation;
144152

145153
/* and report the scan tuple slot's rowtype */
146-
ExecAssignScanType(node,RelationGetDescr(currentRelation));
154+
ExecAssignScanType(&node->ss,RelationGetDescr(currentRelation));
147155
}
148156

149157

@@ -167,44 +175,44 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
167175
* create state structure
168176
*/
169177
scanstate=makeNode(SeqScanState);
170-
scanstate->ps.plan= (Plan*)node;
171-
scanstate->ps.state=estate;
178+
scanstate->ss.ps.plan= (Plan*)node;
179+
scanstate->ss.ps.state=estate;
172180

173181
/*
174182
* Miscellaneous initialization
175183
*
176184
* create expression context for node
177185
*/
178-
ExecAssignExprContext(estate,&scanstate->ps);
186+
ExecAssignExprContext(estate,&scanstate->ss.ps);
179187

180188
/*
181189
* initialize child expressions
182190
*/
183-
scanstate->ps.targetlist= (List*)
191+
scanstate->ss.ps.targetlist= (List*)
184192
ExecInitExpr((Expr*)node->plan.targetlist,
185193
(PlanState*)scanstate);
186-
scanstate->ps.qual= (List*)
194+
scanstate->ss.ps.qual= (List*)
187195
ExecInitExpr((Expr*)node->plan.qual,
188196
(PlanState*)scanstate);
189197

190198
/*
191199
* tuple table initialization
192200
*/
193-
ExecInitResultTupleSlot(estate,&scanstate->ps);
194-
ExecInitScanTupleSlot(estate,scanstate);
201+
ExecInitResultTupleSlot(estate,&scanstate->ss.ps);
202+
ExecInitScanTupleSlot(estate,&scanstate->ss);
195203

196204
/*
197205
* initialize scan relation
198206
*/
199207
InitScanRelation(scanstate,estate,eflags);
200208

201-
scanstate->ps.ps_TupFromTlist= false;
209+
scanstate->ss.ps.ps_TupFromTlist= false;
202210

203211
/*
204212
* Initialize result tuple type and projection info.
205213
*/
206-
ExecAssignResultTypeFromTL(&scanstate->ps);
207-
ExecAssignScanProjectionInfo(scanstate);
214+
ExecAssignResultTypeFromTL(&scanstate->ss.ps);
215+
ExecAssignScanProjectionInfo(&scanstate->ss);
208216

209217
returnscanstate;
210218
}
@@ -224,24 +232,25 @@ ExecEndSeqScan(SeqScanState *node)
224232
/*
225233
* get information from node
226234
*/
227-
relation=node->ss_currentRelation;
228-
scanDesc=node->ss_currentScanDesc;
235+
relation=node->ss.ss_currentRelation;
236+
scanDesc=node->ss.ss_currentScanDesc;
229237

230238
/*
231239
* Free the exprcontext
232240
*/
233-
ExecFreeExprContext(&node->ps);
241+
ExecFreeExprContext(&node->ss.ps);
234242

235243
/*
236244
* clean out the tuple table
237245
*/
238-
ExecClearTuple(node->ps.ps_ResultTupleSlot);
239-
ExecClearTuple(node->ss_ScanTupleSlot);
246+
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
247+
ExecClearTuple(node->ss.ss_ScanTupleSlot);
240248

241249
/*
242250
* close heap scan
243251
*/
244-
heap_endscan(scanDesc);
252+
if (scanDesc!=NULL)
253+
heap_endscan(scanDesc);
245254

246255
/*
247256
* close the heap relation.
@@ -265,10 +274,71 @@ ExecReScanSeqScan(SeqScanState *node)
265274
{
266275
HeapScanDescscan;
267276

268-
scan=node->ss_currentScanDesc;
277+
scan=node->ss.ss_currentScanDesc;
269278

270-
heap_rescan(scan,/* scan desc */
271-
NULL);/* new scan keys */
279+
if (scan!=NULL)
280+
heap_rescan(scan,/* scan desc */
281+
NULL);/* new scan keys */
272282

273283
ExecScanReScan((ScanState*)node);
274284
}
285+
286+
/* ----------------------------------------------------------------
287+
*Parallel Scan Support
288+
* ----------------------------------------------------------------
289+
*/
290+
291+
/* ----------------------------------------------------------------
292+
*ExecSeqScanEstimate
293+
*
294+
*estimates the space required to serialize seqscan node.
295+
* ----------------------------------------------------------------
296+
*/
297+
void
298+
ExecSeqScanEstimate(SeqScanState*node,
299+
ParallelContext*pcxt)
300+
{
301+
EState*estate=node->ss.ps.state;
302+
303+
node->pscan_len=heap_parallelscan_estimate(estate->es_snapshot);
304+
shm_toc_estimate_chunk(&pcxt->estimator,node->pscan_len);
305+
shm_toc_estimate_keys(&pcxt->estimator,1);
306+
}
307+
308+
/* ----------------------------------------------------------------
309+
*ExecSeqScanInitializeDSM
310+
*
311+
*Set up a parallel heap scan descriptor.
312+
* ----------------------------------------------------------------
313+
*/
314+
void
315+
ExecSeqScanInitializeDSM(SeqScanState*node,
316+
ParallelContext*pcxt)
317+
{
318+
EState*estate=node->ss.ps.state;
319+
ParallelHeapScanDescpscan;
320+
321+
pscan=shm_toc_allocate(pcxt->toc,node->pscan_len);
322+
heap_parallelscan_initialize(pscan,
323+
node->ss.ss_currentRelation,
324+
estate->es_snapshot);
325+
shm_toc_insert(pcxt->toc,node->ss.ps.plan->plan_node_id,pscan);
326+
node->ss.ss_currentScanDesc=
327+
heap_beginscan_parallel(node->ss.ss_currentRelation,pscan);
328+
}
329+
330+
/* ----------------------------------------------------------------
331+
*ExecSeqScanInitializeWorker
332+
*
333+
*Copy relevant information from TOC into planstate.
334+
* ----------------------------------------------------------------
335+
*/
336+
void
337+
ExecSeqScanInitializeWorker(SeqScanState*node,shm_toc*toc)
338+
{
339+
ParallelHeapScanDescpscan;
340+
341+
pscan=shm_toc_lookup(toc,node->ss.ps.plan->plan_node_id);
342+
node->ss.ss_currentScanDesc=
343+
heap_beginscan_parallel(node->ss.ss_currentRelation,pscan);
344+
}

‎src/backend/nodes/copyfuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ CopyPlanFields(const Plan *from, Plan *newnode)
112112
COPY_SCALAR_FIELD(total_cost);
113113
COPY_SCALAR_FIELD(plan_rows);
114114
COPY_SCALAR_FIELD(plan_width);
115+
COPY_SCALAR_FIELD(parallel_aware);
115116
COPY_SCALAR_FIELD(plan_node_id);
116117
COPY_NODE_FIELD(targetlist);
117118
COPY_NODE_FIELD(qual);

‎src/backend/nodes/outfuncs.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ _outPlanInfo(StringInfo str, const Plan *node)
271271
WRITE_FLOAT_FIELD(total_cost,"%.2f");
272272
WRITE_FLOAT_FIELD(plan_rows,"%.0f");
273273
WRITE_INT_FIELD(plan_width);
274+
WRITE_BOOL_FIELD(parallel_aware);
274275
WRITE_INT_FIELD(plan_node_id);
275276
WRITE_NODE_FIELD(targetlist);
276277
WRITE_NODE_FIELD(qual);
@@ -1585,6 +1586,7 @@ _outPathInfo(StringInfo str, const Path *node)
15851586
_outBitmapset(str,node->param_info->ppi_req_outer);
15861587
else
15871588
_outBitmapset(str,NULL);
1589+
WRITE_BOOL_FIELD(parallel_aware);
15881590
WRITE_FLOAT_FIELD(rows,"%.0f");
15891591
WRITE_FLOAT_FIELD(startup_cost,"%.2f");
15901592
WRITE_FLOAT_FIELD(total_cost,"%.2f");

‎src/backend/nodes/readfuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1412,6 +1412,7 @@ ReadCommonPlan(Plan *local_node)
14121412
READ_FLOAT_FIELD(total_cost);
14131413
READ_FLOAT_FIELD(plan_rows);
14141414
READ_INT_FIELD(plan_width);
1415+
READ_BOOL_FIELD(parallel_aware);
14151416
READ_INT_FIELD(plan_node_id);
14161417
READ_NODE_FIELD(targetlist);
14171418
READ_NODE_FIELD(qual);

‎src/backend/optimizer/path/allpaths.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
475475
required_outer=rel->lateral_relids;
476476

477477
/* Consider sequential scan */
478-
add_path(rel,create_seqscan_path(root,rel,required_outer));
478+
add_path(rel,create_seqscan_path(root,rel,required_outer,0));
479479

480480
/* Consider index scans */
481481
create_index_paths(root,rel);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp