Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit41b0dd9

Browse files
committed
Separate reinitialization of shared parallel-scan state from ExecReScan.
Previously, the parallel executor logic did reinitialization of sharedstate within the ExecReScan code for parallel-aware scan nodes. This isproblematic, because it means that the ExecReScan call has to occursynchronously (ie, during the parent Gather node's ReScan call). That isswimming very much against the tide so far as the ExecReScan machinery isconcerned; the fact that it works at all today depends on a lot of fragileassumptions, such as that no plan node between Gather and a parallel-awarescan node is parameterized. Another objection is that because ExecReScanmight be called in workers as well as the leader, hacky extra tests areneeded in some places to prevent unwanted shared-state resets.Hence, let's separate this code into two functions, a ReInitializeDSMcall and the ReScan call proper. ReInitializeDSM is called only inthe leader and is guaranteed to run before we start new workers.ReScan is returned to its traditional function of resetting only localstate, which means that ExecReScan's usual habits of delaying oreliminating child rescan calls are safe again.As with the preceding commit7df2c1f, it doesn't seem to be necessaryto make these changes in 9.6, which is a good thing because the FDW andCustomScan APIs are impacted.Discussion:https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
1 parent6c2c5be commit41b0dd9

24 files changed

+328
-133
lines changed

‎doc/src/sgml/custom-scan.sgml

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -320,22 +320,39 @@ void (*InitializeDSMCustomScan) (CustomScanState *node,
320320
void *coordinate);
321321
</programlisting>
322322
Initialize the dynamic shared memory that will be required for parallel
323-
operation;<literal>coordinate</> points toan amount of allocated space
324-
equal to the return value of <function>EstimateDSMCustomScan</>.
323+
operation.<literal>coordinate</> points toa shared memory area of
324+
sizeequal to the return value of <function>EstimateDSMCustomScan</>.
325325
This callback is optional, and need only be supplied if this custom
326326
scan provider supports parallel execution.
327327
</para>
328328

329329
<para>
330330
<programlisting>
331+
void (*ReInitializeDSMCustomScan) (CustomScanState *node,
332+
ParallelContext *pcxt,
333+
void *coordinate);
334+
</programlisting>
335+
Re-initialize the dynamic shared memory required for parallel operation
336+
when the custom-scan plan node is about to be re-scanned.
337+
This callback is optional, and need only be supplied if this custom
338+
scan provider supports parallel execution.
339+
Recommended practice is that this callback reset only shared state,
340+
while the <function>ReScanCustomScan</> callback resets only local
341+
state. Currently, this callback will be called
342+
before <function>ReScanCustomScan</>, but it's best not to rely on
343+
that ordering.
344+
</para>
345+
346+
<para>
347+
<programlisting>
331348
void (*InitializeWorkerCustomScan) (CustomScanState *node,
332349
shm_toc *toc,
333350
void *coordinate);
334351
</programlisting>
335-
Initialize a parallel worker'scustom state based on the shared state
336-
set upin the leaderby <literal>InitializeDSMCustomScan</>.
337-
This callback is optional, andneeds only be supplied if this
338-
custom path supports parallel execution.
352+
Initialize a parallel worker'slocal state based on the shared state
353+
set upby the leaderduring <function>InitializeDSMCustomScan</>.
354+
This callback is optional, andneed only be supplied if this custom
355+
scan provider supports parallel execution.
339356
</para>
340357

341358
<para>

‎doc/src/sgml/fdwhandler.sgml

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,12 +1191,12 @@ ImportForeignSchema (ImportForeignSchemaStmt *stmt, Oid serverOid);
11911191
<para>
11921192
A <structname>ForeignScan</> node can, optionally, support parallel
11931193
execution. A parallel <structname>ForeignScan</> will be executed
1194-
in multiple processes andshould return each rowonly once across
1194+
in multiple processes andmust return each rowexactly once across
11951195
all cooperating processes. To do this, processes can coordinate through
1196-
fixedsize chunks of dynamic shared memory. This shared memory is not
1197-
guaranteed to be mapped at the same address in every process, sopointers
1198-
may notbe used. The followingcallbacks are all optional in general,
1199-
but required if parallel execution is to be supported.
1196+
fixed-size chunks of dynamic shared memory. This shared memory is not
1197+
guaranteed to be mapped at the same address in every process, soit
1198+
must notcontain pointers. The followingfunctions are all optional,
1199+
butmost arerequired if parallel execution is to be supported.
12001200
</para>
12011201

12021202
<para>
@@ -1215,7 +1215,7 @@ IsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
12151215
</para>
12161216

12171217
<para>
1218-
If thiscallback is not defined, it is assumed that the scan must take
1218+
If thisfunction is not defined, it is assumed that the scan must take
12191219
place within the parallel leader. Note that returning true does not mean
12201220
that the scan itself can be done in parallel, only that the scan can be
12211221
performed within a parallel worker. Therefore, it can be useful to define
@@ -1230,6 +1230,9 @@ EstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt);
12301230
Estimate the amount of dynamic shared memory that will be required
12311231
for parallel operation. This may be higher than the amount that will
12321232
actually be used, but it must not be lower. The return value is in bytes.
1233+
This function is optional, and can be omitted if not needed; but if it
1234+
is omitted, the next three functions must be omitted as well, because
1235+
no shared memory will be allocated for the FDW's use.
12331236
</para>
12341237

12351238
<para>
@@ -1239,8 +1242,25 @@ InitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
12391242
void *coordinate);
12401243
</programlisting>
12411244
Initialize the dynamic shared memory that will be required for parallel
1242-
operation; <literal>coordinate</> points to an amount of allocated space
1243-
equal to the return value of <function>EstimateDSMForeignScan</>.
1245+
operation. <literal>coordinate</> points to a shared memory area of
1246+
size equal to the return value of <function>EstimateDSMForeignScan</>.
1247+
This function is optional, and can be omitted if not needed.
1248+
</para>
1249+
1250+
<para>
1251+
<programlisting>
1252+
void
1253+
ReInitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
1254+
void *coordinate);
1255+
</programlisting>
1256+
Re-initialize the dynamic shared memory required for parallel operation
1257+
when the foreign-scan plan node is about to be re-scanned.
1258+
This function is optional, and can be omitted if not needed.
1259+
Recommended practice is that this function reset only shared state,
1260+
while the <function>ReScanForeignScan</> function resets only local
1261+
state. Currently, this function will be called
1262+
before <function>ReScanForeignScan</>, but it's best not to rely on
1263+
that ordering.
12441264
</para>
12451265

12461266
<para>
@@ -1249,10 +1269,9 @@ void
12491269
InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc,
12501270
void *coordinate);
12511271
</programlisting>
1252-
Initialize a parallel worker's custom state based on the shared state
1253-
set up in the leader by <literal>InitializeDSMForeignScan</>.
1254-
This callback is optional, and needs only be supplied if this
1255-
custom path supports parallel execution.
1272+
Initialize a parallel worker's local state based on the shared state
1273+
set up by the leader during <function>InitializeDSMForeignScan</>.
1274+
This function is optional, and can be omitted if not needed.
12561275
</para>
12571276

12581277
<para>

‎src/backend/access/heap/heapam.c

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1531,21 +1531,6 @@ heap_rescan(HeapScanDesc scan,
15311531
* reinitialize scan descriptor
15321532
*/
15331533
initscan(scan,key, true);
1534-
1535-
/*
1536-
* reset parallel scan, if present
1537-
*/
1538-
if (scan->rs_parallel!=NULL)
1539-
{
1540-
ParallelHeapScanDescparallel_scan;
1541-
1542-
/*
1543-
* Caller is responsible for making sure that all workers have
1544-
* finished the scan before calling this.
1545-
*/
1546-
parallel_scan=scan->rs_parallel;
1547-
pg_atomic_write_u64(&parallel_scan->phs_nallocated,0);
1548-
}
15491534
}
15501535

15511536
/* ----------------
@@ -1642,6 +1627,19 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
16421627
SerializeSnapshot(snapshot,target->phs_snapshot_data);
16431628
}
16441629

1630+
/* ----------------
1631+
*heap_parallelscan_reinitialize - reset a parallel scan
1632+
*
1633+
*Call this in the leader process. Caller is responsible for
1634+
*making sure that all workers have finished the scan beforehand.
1635+
* ----------------
1636+
*/
1637+
void
1638+
heap_parallelscan_reinitialize(ParallelHeapScanDescparallel_scan)
1639+
{
1640+
pg_atomic_write_u64(&parallel_scan->phs_nallocated,0);
1641+
}
1642+
16451643
/* ----------------
16461644
*heap_beginscan_parallel - join a parallel scan
16471645
*

‎src/backend/executor/execParallel.c

Lines changed: 86 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ static bool ExecParallelInitializeDSM(PlanState *node,
119119
ExecParallelInitializeDSMContext*d);
120120
staticshm_mq_handle**ExecParallelSetupTupleQueues(ParallelContext*pcxt,
121121
boolreinitialize);
122+
staticboolExecParallelReInitializeDSM(PlanState*planstate,
123+
ParallelContext*pcxt);
122124
staticboolExecParallelRetrieveInstrumentation(PlanState*planstate,
123125
SharedExecutorInstrumentation*instrumentation);
124126

@@ -255,6 +257,8 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
255257
caseT_SortState:
256258
/* even when not parallel-aware */
257259
ExecSortEstimate((SortState*)planstate,e->pcxt);
260+
break;
261+
258262
default:
259263
break;
260264
}
@@ -325,6 +329,8 @@ ExecParallelInitializeDSM(PlanState *planstate,
325329
caseT_SortState:
326330
/* even when not parallel-aware */
327331
ExecSortInitializeDSM((SortState*)planstate,d->pcxt);
332+
break;
333+
328334
default:
329335
break;
330336
}
@@ -384,18 +390,6 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
384390
returnresponseq;
385391
}
386392

387-
/*
388-
* Re-initialize the parallel executor info such that it can be reused by
389-
* workers.
390-
*/
391-
void
392-
ExecParallelReinitialize(ParallelExecutorInfo*pei)
393-
{
394-
ReinitializeParallelDSM(pei->pcxt);
395-
pei->tqueue=ExecParallelSetupTupleQueues(pei->pcxt, true);
396-
pei->finished= false;
397-
}
398-
399393
/*
400394
* Sets up the required infrastructure for backend workers to perform
401395
* execution and return results to the main backend.
@@ -599,7 +593,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
599593
ExecParallelInitializeDSM(planstate,&d);
600594

601595
/*
602-
* Make sure that the world hasn't shifted under ourfeat. This could
596+
* Make sure that the world hasn't shifted under ourfeet. This could
603597
* probably just be an Assert(), but let's be conservative for now.
604598
*/
605599
if (e.nnodes!=d.nnodes)
@@ -609,6 +603,82 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
609603
returnpei;
610604
}
611605

606+
/*
607+
* Re-initialize the parallel executor shared memory state before launching
608+
* a fresh batch of workers.
609+
*/
610+
void
611+
ExecParallelReinitialize(PlanState*planstate,
612+
ParallelExecutorInfo*pei)
613+
{
614+
/* Old workers must already be shut down */
615+
Assert(pei->finished);
616+
617+
ReinitializeParallelDSM(pei->pcxt);
618+
pei->tqueue=ExecParallelSetupTupleQueues(pei->pcxt, true);
619+
pei->finished= false;
620+
621+
/* Traverse plan tree and let each child node reset associated state. */
622+
ExecParallelReInitializeDSM(planstate,pei->pcxt);
623+
}
624+
625+
/*
626+
* Traverse plan tree to reinitialize per-node dynamic shared memory state
627+
*/
628+
staticbool
629+
ExecParallelReInitializeDSM(PlanState*planstate,
630+
ParallelContext*pcxt)
631+
{
632+
if (planstate==NULL)
633+
return false;
634+
635+
/*
636+
* Call reinitializers for DSM-using plan nodes.
637+
*/
638+
switch (nodeTag(planstate))
639+
{
640+
caseT_SeqScanState:
641+
if (planstate->plan->parallel_aware)
642+
ExecSeqScanReInitializeDSM((SeqScanState*)planstate,
643+
pcxt);
644+
break;
645+
caseT_IndexScanState:
646+
if (planstate->plan->parallel_aware)
647+
ExecIndexScanReInitializeDSM((IndexScanState*)planstate,
648+
pcxt);
649+
break;
650+
caseT_IndexOnlyScanState:
651+
if (planstate->plan->parallel_aware)
652+
ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState*)planstate,
653+
pcxt);
654+
break;
655+
caseT_ForeignScanState:
656+
if (planstate->plan->parallel_aware)
657+
ExecForeignScanReInitializeDSM((ForeignScanState*)planstate,
658+
pcxt);
659+
break;
660+
caseT_CustomScanState:
661+
if (planstate->plan->parallel_aware)
662+
ExecCustomScanReInitializeDSM((CustomScanState*)planstate,
663+
pcxt);
664+
break;
665+
caseT_BitmapHeapScanState:
666+
if (planstate->plan->parallel_aware)
667+
ExecBitmapHeapReInitializeDSM((BitmapHeapScanState*)planstate,
668+
pcxt);
669+
break;
670+
caseT_SortState:
671+
/* even when not parallel-aware */
672+
ExecSortReInitializeDSM((SortState*)planstate,pcxt);
673+
break;
674+
675+
default:
676+
break;
677+
}
678+
679+
returnplanstate_tree_walker(planstate,ExecParallelReInitializeDSM,pcxt);
680+
}
681+
612682
/*
613683
* Copy instrumentation information about this node and its descendants from
614684
* dynamic shared memory.
@@ -845,12 +915,13 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
845915
break;
846916
caseT_BitmapHeapScanState:
847917
if (planstate->plan->parallel_aware)
848-
ExecBitmapHeapInitializeWorker(
849-
(BitmapHeapScanState*)planstate,toc);
918+
ExecBitmapHeapInitializeWorker((BitmapHeapScanState*)planstate,toc);
850919
break;
851920
caseT_SortState:
852921
/* even when not parallel-aware */
853922
ExecSortInitializeWorker((SortState*)planstate,toc);
923+
break;
924+
854925
default:
855926
break;
856927
}

‎src/backend/executor/nodeBitmapHeapscan.c

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -705,23 +705,6 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
705705
node->shared_tbmiterator=NULL;
706706
node->shared_prefetch_iterator=NULL;
707707

708-
/* Reset parallel bitmap state, if present */
709-
if (node->pstate)
710-
{
711-
dsa_area*dsa=node->ss.ps.state->es_query_dsa;
712-
713-
node->pstate->state=BM_INITIAL;
714-
715-
if (DsaPointerIsValid(node->pstate->tbmiterator))
716-
tbm_free_shared_area(dsa,node->pstate->tbmiterator);
717-
718-
if (DsaPointerIsValid(node->pstate->prefetch_iterator))
719-
tbm_free_shared_area(dsa,node->pstate->prefetch_iterator);
720-
721-
node->pstate->tbmiterator=InvalidDsaPointer;
722-
node->pstate->prefetch_iterator=InvalidDsaPointer;
723-
}
724-
725708
ExecScanReScan(&node->ss);
726709

727710
/*
@@ -999,6 +982,31 @@ ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
999982
node->pstate=pstate;
1000983
}
1001984

985+
/* ----------------------------------------------------------------
986+
*ExecBitmapHeapReInitializeDSM
987+
*
988+
*Reset shared state before beginning a fresh scan.
989+
* ----------------------------------------------------------------
990+
*/
991+
void
992+
ExecBitmapHeapReInitializeDSM(BitmapHeapScanState*node,
993+
ParallelContext*pcxt)
994+
{
995+
ParallelBitmapHeapState*pstate=node->pstate;
996+
dsa_area*dsa=node->ss.ps.state->es_query_dsa;
997+
998+
pstate->state=BM_INITIAL;
999+
1000+
if (DsaPointerIsValid(pstate->tbmiterator))
1001+
tbm_free_shared_area(dsa,pstate->tbmiterator);
1002+
1003+
if (DsaPointerIsValid(pstate->prefetch_iterator))
1004+
tbm_free_shared_area(dsa,pstate->prefetch_iterator);
1005+
1006+
pstate->tbmiterator=InvalidDsaPointer;
1007+
pstate->prefetch_iterator=InvalidDsaPointer;
1008+
}
1009+
10021010
/* ----------------------------------------------------------------
10031011
*ExecBitmapHeapInitializeWorker
10041012
*

‎src/backend/executor/nodeCustom.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,21 @@ ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
194194
}
195195
}
196196

197+
void
198+
ExecCustomScanReInitializeDSM(CustomScanState*node,ParallelContext*pcxt)
199+
{
200+
constCustomExecMethods*methods=node->methods;
201+
202+
if (methods->ReInitializeDSMCustomScan)
203+
{
204+
intplan_node_id=node->ss.ps.plan->plan_node_id;
205+
void*coordinate;
206+
207+
coordinate=shm_toc_lookup(pcxt->toc,plan_node_id, false);
208+
methods->ReInitializeDSMCustomScan(node,pcxt,coordinate);
209+
}
210+
}
211+
197212
void
198213
ExecCustomScanInitializeWorker(CustomScanState*node,shm_toc*toc)
199214
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp