Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit69d3440

Browse files
committed
Allow parallel custom and foreign scans.
This patch doesn't put the new infrastructure to use anywhere, andindeed it's not clear how it could ever be used for something likepostgres_fdw which has to send an SQL query and wait for a reply,but there might be FDWs or custom scan providers that are CPU-bound,so let's give them a way to join club parallel.KaiGai Kohei, reviewed by me.
1 parent25e4451 commit69d3440

File tree

9 files changed

+263
-1
lines changed

9 files changed

+263
-1
lines changed

‎doc/src/sgml/custom-scan.sgml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,43 @@ void (*RestrPosCustomScan) (CustomScanState *node);
303303

304304
<para>
305305
<programlisting>
306+
Size (*EstimateDSMCustomScan) (CustomScanState *node,
307+
ParallelContext *pcxt);
308+
</programlisting>
309+
Estimate the amount of dynamic shared memory that will be required
310+
for parallel operation. This may be higher than the amount that will
311+
actually be used, but it must not be lower. The return value is in bytes.
312+
This callback is optional, and need only be supplied if this custom
313+
scan provider supports parallel execution.
314+
</para>
315+
316+
<para>
317+
<programlisting>
318+
void (*InitializeDSMCustomScan) (CustomScanState *node,
319+
ParallelContext *pcxt,
320+
void *coordinate);
321+
</programlisting>
322+
Initialize the dynamic shared memory that will be required for parallel
323+
operation; <literal>coordinate</> points to an amount of allocated space
324+
equal to the return value of <function>EstimateDSMCustomScan</>.
325+
This callback is optional, and need only be supplied if this custom
326+
scan provider supports parallel execution.
327+
</para>
328+
329+
<para>
330+
<programlisting>
331+
void (*InitializeWorkerCustomScan) (CustomScanState *node,
332+
shm_toc *toc,
333+
void *coordinate);
334+
</programlisting>
335+
Initialize a parallel worker's custom state based on the shared state
336+
set up in the leader by <literal>InitializeDSMCustomScan</>.
337+
This callback is optional, and needs only be supplied if this
338+
custom path supports parallel execution.
339+
</para>
340+
341+
<para>
342+
<programlisting>
306343
void (*ExplainCustomScan) (CustomScanState *node,
307344
List *ancestors,
308345
ExplainState *es);

‎doc/src/sgml/fdwhandler.sgml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,53 @@ ImportForeignSchema (ImportForeignSchemaStmt *stmt, Oid serverOid);
955955

956956
</sect2>
957957

958+
<sect2 id="fdw-callbacks-parallel">
959+
<title>FDW Routines for Parallel Execution</title>
960+
<para>
961+
A <structname>ForeignScan</> node can, optionally, support parallel
962+
execution. A parallel <structname>ForeignScan</> will be executed
963+
in multiple processes and should return each row only once across
964+
all cooperating processes. To do this, processes can coordinate through
965+
fixed size chunks of dynamic shared memory. This shared memory is not
966+
guaranteed to be mapped at the same address in every process, so pointers
967+
may not be used. The following callbacks are all optional in general,
968+
but required if parallel execution is to be supported.
969+
</para>
970+
971+
<para>
972+
<programlisting>
973+
Size
974+
EstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt);
975+
</programlisting>
976+
Estimate the amount of dynamic shared memory that will be required
977+
for parallel operation. This may be higher than the amount that will
978+
actually be used, but it must not be lower. The return value is in bytes.
979+
</para>
980+
981+
<para>
982+
<programlisting>
983+
void
984+
InitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
985+
void *coordinate);
986+
</programlisting>
987+
Initialize the dynamic shared memory that will be required for parallel
988+
operation; <literal>coordinate</> points to an amount of allocated space
989+
equal to the return value of <function>EstimateDSMForeignScan</>.
990+
</para>
991+
992+
<para>
993+
<programlisting>
994+
void
995+
InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc,
996+
void *coordinate);
997+
</programlisting>
998+
Initialize a parallel worker's custom state based on the shared state
999+
set up in the leader by <literal>InitializeDSMForeignScan</>.
1000+
This callback is optional, and needs only be supplied if this
1001+
custom path supports parallel execution.
1002+
</para>
1003+
</sect2>
1004+
9581005
</sect1>
9591006

9601007
<sect1 id="fdw-helpers">

‎src/backend/executor/execParallel.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
#include"executor/execParallel.h"
2727
#include"executor/executor.h"
28+
#include"executor/nodeCustom.h"
29+
#include"executor/nodeForeignscan.h"
2830
#include"executor/nodeSeqscan.h"
2931
#include"executor/tqueue.h"
3032
#include"nodes/nodeFuncs.h"
@@ -176,6 +178,14 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
176178
ExecSeqScanEstimate((SeqScanState*)planstate,
177179
e->pcxt);
178180
break;
181+
caseT_ForeignScanState:
182+
ExecForeignScanEstimate((ForeignScanState*)planstate,
183+
e->pcxt);
184+
break;
185+
caseT_CustomScanState:
186+
ExecCustomScanEstimate((CustomScanState*)planstate,
187+
e->pcxt);
188+
break;
179189
default:
180190
break;
181191
}
@@ -220,6 +230,14 @@ ExecParallelInitializeDSM(PlanState *planstate,
220230
ExecSeqScanInitializeDSM((SeqScanState*)planstate,
221231
d->pcxt);
222232
break;
233+
caseT_ForeignScanState:
234+
ExecForeignScanInitializeDSM((ForeignScanState*)planstate,
235+
d->pcxt);
236+
break;
237+
caseT_CustomScanState:
238+
ExecCustomScanInitializeDSM((CustomScanState*)planstate,
239+
d->pcxt);
240+
break;
223241
default:
224242
break;
225243
}
@@ -642,6 +660,14 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
642660
caseT_SeqScanState:
643661
ExecSeqScanInitializeWorker((SeqScanState*)planstate,toc);
644662
break;
663+
caseT_ForeignScanState:
664+
ExecForeignScanInitializeWorker((ForeignScanState*)planstate,
665+
toc);
666+
break;
667+
caseT_CustomScanState:
668+
ExecCustomScanInitializeWorker((CustomScanState*)planstate,
669+
toc);
670+
break;
645671
default:
646672
break;
647673
}

‎src/backend/executor/nodeCustom.c

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
*/
1111
#include"postgres.h"
1212

13+
#include"access/parallel.h"
1314
#include"executor/executor.h"
1415
#include"executor/nodeCustom.h"
1516
#include"nodes/execnodes.h"
@@ -159,3 +160,47 @@ ExecCustomRestrPos(CustomScanState *node)
159160
node->methods->CustomName)));
160161
node->methods->RestrPosCustomScan(node);
161162
}
163+
164+
void
165+
ExecCustomScanEstimate(CustomScanState*node,ParallelContext*pcxt)
166+
{
167+
constCustomExecMethods*methods=node->methods;
168+
169+
if (methods->EstimateDSMCustomScan)
170+
{
171+
node->pscan_len=methods->EstimateDSMCustomScan(node,pcxt);
172+
shm_toc_estimate_chunk(&pcxt->estimator,node->pscan_len);
173+
shm_toc_estimate_keys(&pcxt->estimator,1);
174+
}
175+
}
176+
177+
void
178+
ExecCustomScanInitializeDSM(CustomScanState*node,ParallelContext*pcxt)
179+
{
180+
constCustomExecMethods*methods=node->methods;
181+
182+
if (methods->InitializeDSMCustomScan)
183+
{
184+
intplan_node_id=node->ss.ps.plan->plan_node_id;
185+
void*coordinate;
186+
187+
coordinate=shm_toc_allocate(pcxt->toc,node->pscan_len);
188+
methods->InitializeDSMCustomScan(node,pcxt,coordinate);
189+
shm_toc_insert(pcxt->toc,plan_node_id,coordinate);
190+
}
191+
}
192+
193+
void
194+
ExecCustomScanInitializeWorker(CustomScanState*node,shm_toc*toc)
195+
{
196+
constCustomExecMethods*methods=node->methods;
197+
198+
if (methods->InitializeWorkerCustomScan)
199+
{
200+
intplan_node_id=node->ss.ps.plan->plan_node_id;
201+
void*coordinate;
202+
203+
coordinate=shm_toc_lookup(toc,plan_node_id);
204+
methods->InitializeWorkerCustomScan(node,toc,coordinate);
205+
}
206+
}

‎src/backend/executor/nodeForeignscan.c

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,3 +282,65 @@ ExecReScanForeignScan(ForeignScanState *node)
282282

283283
ExecScanReScan(&node->ss);
284284
}
285+
286+
/* ----------------------------------------------------------------
287+
*ExecForeignScanEstimate
288+
*
289+
*Informs size of the parallel coordination information, if any
290+
* ----------------------------------------------------------------
291+
*/
292+
void
293+
ExecForeignScanEstimate(ForeignScanState*node,ParallelContext*pcxt)
294+
{
295+
FdwRoutine*fdwroutine=node->fdwroutine;
296+
297+
if (fdwroutine->EstimateDSMForeignScan)
298+
{
299+
node->pscan_len=fdwroutine->EstimateDSMForeignScan(node,pcxt);
300+
shm_toc_estimate_chunk(&pcxt->estimator,node->pscan_len);
301+
shm_toc_estimate_keys(&pcxt->estimator,1);
302+
}
303+
}
304+
305+
/* ----------------------------------------------------------------
306+
*ExecForeignScanInitializeDSM
307+
*
308+
*Initialize the parallel coordination information
309+
* ----------------------------------------------------------------
310+
*/
311+
void
312+
ExecForeignScanInitializeDSM(ForeignScanState*node,ParallelContext*pcxt)
313+
{
314+
FdwRoutine*fdwroutine=node->fdwroutine;
315+
316+
if (fdwroutine->InitializeDSMForeignScan)
317+
{
318+
intplan_node_id=node->ss.ps.plan->plan_node_id;
319+
void*coordinate;
320+
321+
coordinate=shm_toc_allocate(pcxt->toc,node->pscan_len);
322+
fdwroutine->InitializeDSMForeignScan(node,pcxt,coordinate);
323+
shm_toc_insert(pcxt->toc,plan_node_id,coordinate);
324+
}
325+
}
326+
327+
/* ----------------------------------------------------------------
328+
*ExecForeignScanInitializeDSM
329+
*
330+
*Initialization according to the parallel coordination information
331+
* ----------------------------------------------------------------
332+
*/
333+
void
334+
ExecForeignScanInitializeWorker(ForeignScanState*node,shm_toc*toc)
335+
{
336+
FdwRoutine*fdwroutine=node->fdwroutine;
337+
338+
if (fdwroutine->InitializeWorkerForeignScan)
339+
{
340+
intplan_node_id=node->ss.ps.plan->plan_node_id;
341+
void*coordinate;
342+
343+
coordinate=shm_toc_lookup(toc,plan_node_id);
344+
fdwroutine->InitializeWorkerForeignScan(node,toc,coordinate);
345+
}
346+
}

‎src/include/executor/nodeCustom.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#ifndefNODECUSTOM_H
1313
#defineNODECUSTOM_H
1414

15+
#include"access/parallel.h"
1516
#include"nodes/execnodes.h"
1617

1718
/*
@@ -26,4 +27,14 @@ extern void ExecReScanCustomScan(CustomScanState *node);
2627
externvoidExecCustomMarkPos(CustomScanState*node);
2728
externvoidExecCustomRestrPos(CustomScanState*node);
2829

30+
/*
31+
* Parallel execution support
32+
*/
33+
externvoidExecCustomScanEstimate(CustomScanState*node,
34+
ParallelContext*pcxt);
35+
externvoidExecCustomScanInitializeDSM(CustomScanState*node,
36+
ParallelContext*pcxt);
37+
externvoidExecCustomScanInitializeWorker(CustomScanState*node,
38+
shm_toc*toc);
39+
2940
#endif/* NODECUSTOM_H */

‎src/include/executor/nodeForeignscan.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,19 @@
1414
#ifndefNODEFOREIGNSCAN_H
1515
#defineNODEFOREIGNSCAN_H
1616

17+
#include"access/parallel.h"
1718
#include"nodes/execnodes.h"
1819

1920
externForeignScanState*ExecInitForeignScan(ForeignScan*node,EState*estate,inteflags);
2021
externTupleTableSlot*ExecForeignScan(ForeignScanState*node);
2122
externvoidExecEndForeignScan(ForeignScanState*node);
2223
externvoidExecReScanForeignScan(ForeignScanState*node);
2324

25+
externvoidExecForeignScanEstimate(ForeignScanState*node,
26+
ParallelContext*pcxt);
27+
externvoidExecForeignScanInitializeDSM(ForeignScanState*node,
28+
ParallelContext*pcxt);
29+
externvoidExecForeignScanInitializeWorker(ForeignScanState*node,
30+
shm_toc*toc);
31+
2432
#endif/* NODEFOREIGNSCAN_H */

‎src/include/foreign/fdwapi.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#ifndefFDWAPI_H
1313
#defineFDWAPI_H
1414

15+
#include"access/parallel.h"
1516
#include"nodes/execnodes.h"
1617
#include"nodes/relation.h"
1718

@@ -122,6 +123,14 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation,
122123
typedefList*(*ImportForeignSchema_function) (ImportForeignSchemaStmt*stmt,
123124
OidserverOid);
124125

126+
typedefSize (*EstimateDSMForeignScan_function) (ForeignScanState*node,
127+
ParallelContext*pcxt);
128+
typedefvoid (*InitializeDSMForeignScan_function) (ForeignScanState*node,
129+
ParallelContext*pcxt,
130+
void*coordinate);
131+
typedefvoid (*InitializeWorkerForeignScan_function) (ForeignScanState*node,
132+
shm_toc*toc,
133+
void*coordinate);
125134
/*
126135
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
127136
* function. It provides pointers to the callback functions needed by the
@@ -177,6 +186,11 @@ typedef struct FdwRoutine
177186

178187
/* Support functions for IMPORT FOREIGN SCHEMA */
179188
ImportForeignSchema_functionImportForeignSchema;
189+
190+
/* Support functions for parallelism under Gather node */
191+
EstimateDSMForeignScan_functionEstimateDSMForeignScan;
192+
InitializeDSMForeignScan_functionInitializeDSMForeignScan;
193+
InitializeWorkerForeignScan_functionInitializeWorkerForeignScan;
180194
}FdwRoutine;
181195

182196

‎src/include/nodes/execnodes.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1585,6 +1585,7 @@ typedef struct ForeignScanState
15851585
{
15861586
ScanStatess;/* its first field is NodeTag */
15871587
List*fdw_recheck_quals;/* original quals not in ss.ps.qual */
1588+
Sizepscan_len;/* size of parallel coordination information */
15881589
/* use struct pointer to avoid including fdwapi.h here */
15891590
structFdwRoutine*fdwroutine;
15901591
void*fdw_state;/* foreign-data wrapper can keep state here */
@@ -1603,6 +1604,8 @@ typedef struct ForeignScanState
16031604
* the BeginCustomScan method.
16041605
* ----------------
16051606
*/
1607+
structParallelContext;/* avoid including parallel.h here */
1608+
structshm_toc;/* avoid including shm_toc.h here */
16061609
structExplainState;/* avoid including explain.h here */
16071610
structCustomScanState;
16081611

@@ -1619,7 +1622,15 @@ typedef struct CustomExecMethods
16191622
void(*ReScanCustomScan) (structCustomScanState*node);
16201623
void(*MarkPosCustomScan) (structCustomScanState*node);
16211624
void(*RestrPosCustomScan) (structCustomScanState*node);
1622-
1625+
/* Optional: parallel execution support */
1626+
Size(*EstimateDSMCustomScan) (structCustomScanState*node,
1627+
structParallelContext*pcxt);
1628+
void(*InitializeDSMCustomScan) (structCustomScanState*node,
1629+
structParallelContext*pcxt,
1630+
void*coordinate);
1631+
void(*InitializeWorkerCustomScan) (structCustomScanState*node,
1632+
structshm_toc*toc,
1633+
void*coordinate);
16231634
/* Optional: print additional information in EXPLAIN */
16241635
void(*ExplainCustomScan) (structCustomScanState*node,
16251636
List*ancestors,
@@ -1631,6 +1642,7 @@ typedef struct CustomScanState
16311642
ScanStatess;
16321643
uint32flags;/* mask of CUSTOMPATH_* flags, see relation.h */
16331644
List*custom_ps;/* list of child PlanState nodes, if any */
1645+
Sizepscan_len;/* size of parallel coordination information */
16341646
constCustomExecMethods*methods;
16351647
}CustomScanState;
16361648

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp