Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3452dc5

Browse files
committed
Push tuple limits through Gather and Gather Merge.
If we only need, say, 10 tuples in total, then we certainly don't needmore than 10 tuples from any single process. Pushing down the limitlets workers exit early when possible. For Gather Merge, there isan additional benefit: a Sort immediately below the Gather Merge canbe done as a bounded sort if there is an applicable limit.Robert Haas and Tom LaneDiscussion:http://postgr.es/m/CA+TgmoYa3QKKrLj5rX7UvGqhH73G1Li4B-EKxrmASaca2tFu9Q@mail.gmail.com
1 parentce5dcf5 commit3452dc5

File tree

10 files changed

+222
-97
lines changed

10 files changed

+222
-97
lines changed

‎src/backend/executor/execParallel.c

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,25 @@
4747
* greater than any 32-bit integer here so that values < 2^32 can be used
4848
* by individual parallel nodes to store their own state.
4949
*/
50-
#definePARALLEL_KEY_PLANNEDSTMTUINT64CONST(0xE000000000000001)
51-
#definePARALLEL_KEY_PARAMSUINT64CONST(0xE000000000000002)
52-
#definePARALLEL_KEY_BUFFER_USAGEUINT64CONST(0xE000000000000003)
53-
#definePARALLEL_KEY_TUPLE_QUEUEUINT64CONST(0xE000000000000004)
54-
#definePARALLEL_KEY_INSTRUMENTATIONUINT64CONST(0xE000000000000005)
55-
#definePARALLEL_KEY_DSAUINT64CONST(0xE000000000000006)
56-
#definePARALLEL_KEY_QUERY_TEXTUINT64CONST(0xE000000000000007)
50+
#definePARALLEL_KEY_EXECUTOR_FIXEDUINT64CONST(0xE000000000000001)
51+
#definePARALLEL_KEY_PLANNEDSTMTUINT64CONST(0xE000000000000002)
52+
#definePARALLEL_KEY_PARAMSUINT64CONST(0xE000000000000003)
53+
#definePARALLEL_KEY_BUFFER_USAGEUINT64CONST(0xE000000000000004)
54+
#definePARALLEL_KEY_TUPLE_QUEUEUINT64CONST(0xE000000000000005)
55+
#definePARALLEL_KEY_INSTRUMENTATIONUINT64CONST(0xE000000000000006)
56+
#definePARALLEL_KEY_DSAUINT64CONST(0xE000000000000007)
57+
#definePARALLEL_KEY_QUERY_TEXTUINT64CONST(0xE000000000000008)
5758

5859
#definePARALLEL_TUPLE_QUEUE_SIZE65536
5960

61+
/*
62+
* Fixed-size random stuff that we need to pass to parallel workers.
63+
*/
64+
typedefstructFixedParallelExecutorState
65+
{
66+
int64tuples_needed;/* tuple bound, see ExecSetTupleBound */
67+
}FixedParallelExecutorState;
68+
6069
/*
6170
* DSM structure for accumulating per-PlanState instrumentation.
6271
*
@@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei)
381390
* execution and return results to the main backend.
382391
*/
383392
ParallelExecutorInfo*
384-
ExecInitParallelPlan(PlanState*planstate,EState*estate,intnworkers)
393+
ExecInitParallelPlan(PlanState*planstate,EState*estate,intnworkers,
394+
int64tuples_needed)
385395
{
386396
ParallelExecutorInfo*pei;
387397
ParallelContext*pcxt;
388398
ExecParallelEstimateContexte;
389399
ExecParallelInitializeDSMContextd;
400+
FixedParallelExecutorState*fpes;
390401
char*pstmt_data;
391402
char*pstmt_space;
392403
char*param_space;
@@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
418429
* for the various things we need to store.
419430
*/
420431

432+
/* Estimate space for fixed-size state. */
433+
shm_toc_estimate_chunk(&pcxt->estimator,
434+
sizeof(FixedParallelExecutorState));
435+
shm_toc_estimate_keys(&pcxt->estimator,1);
436+
421437
/* Estimate space for query text. */
422438
query_len=strlen(estate->es_sourceText);
423439
shm_toc_estimate_chunk(&pcxt->estimator,query_len);
@@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
487503
* asked for has been allocated or initialized yet, though, so do that.
488504
*/
489505

506+
/* Store fixed-size state. */
507+
fpes=shm_toc_allocate(pcxt->toc,sizeof(FixedParallelExecutorState));
508+
fpes->tuples_needed=tuples_needed;
509+
shm_toc_insert(pcxt->toc,PARALLEL_KEY_EXECUTOR_FIXED,fpes);
510+
490511
/* Store query string */
491512
query_string=shm_toc_allocate(pcxt->toc,query_len);
492513
memcpy(query_string,estate->es_sourceText,query_len);
@@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
833854
void
834855
ParallelQueryMain(dsm_segment*seg,shm_toc*toc)
835856
{
857+
FixedParallelExecutorState*fpes;
836858
BufferUsage*buffer_usage;
837859
DestReceiver*receiver;
838860
QueryDesc*queryDesc;
@@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
841863
void*area_space;
842864
dsa_area*area;
843865

866+
/* Get fixed-size state. */
867+
fpes=shm_toc_lookup(toc,PARALLEL_KEY_EXECUTOR_FIXED, false);
868+
844869
/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
845870
receiver=ExecParallelGetReceiver(seg,toc);
846871
instrumentation=shm_toc_lookup(toc,PARALLEL_KEY_INSTRUMENTATION, true);
@@ -868,8 +893,17 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
868893
queryDesc->planstate->state->es_query_dsa=area;
869894
ExecParallelInitializeWorker(queryDesc->planstate,toc);
870895

871-
/* Run the plan */
872-
ExecutorRun(queryDesc,ForwardScanDirection,0L, true);
896+
/* Pass down any tuple bound */
897+
ExecSetTupleBound(fpes->tuples_needed,queryDesc->planstate);
898+
899+
/*
900+
* Run the plan. If we specified a tuple bound, be careful not to demand
901+
* more tuples than that.
902+
*/
903+
ExecutorRun(queryDesc,
904+
ForwardScanDirection,
905+
fpes->tuples_needed<0 ? (int64)0 :fpes->tuples_needed,
906+
true);
873907

874908
/* Shut down the executor */
875909
ExecutorFinish(queryDesc);

‎src/backend/executor/execProcnode.c

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,3 +757,124 @@ ExecShutdownNode(PlanState *node)
757757

758758
return false;
759759
}
760+
761+
/*
762+
* ExecSetTupleBound
763+
*
764+
* Set a tuple bound for a planstate node. This lets child plan nodes
765+
* optimize based on the knowledge that the maximum number of tuples that
766+
* their parent will demand is limited. The tuple bound for a node may
767+
* only be changed between scans (i.e., after node initialization or just
768+
* before an ExecReScan call).
769+
*
770+
* Any negative tuples_needed value means "no limit", which should be the
771+
* default assumption when this is not called at all for a particular node.
772+
*
773+
* Note: if this is called repeatedly on a plan tree, the exact same set
774+
* of nodes must be updated with the new limit each time; be careful that
775+
* only unchanging conditions are tested here.
776+
*/
777+
void
778+
ExecSetTupleBound(int64tuples_needed,PlanState*child_node)
779+
{
780+
/*
781+
* Since this function recurses, in principle we should check stack depth
782+
* here. In practice, it's probably pointless since the earlier node
783+
* initialization tree traversal would surely have consumed more stack.
784+
*/
785+
786+
if (IsA(child_node,SortState))
787+
{
788+
/*
789+
* If it is a Sort node, notify it that it can use bounded sort.
790+
*
791+
* Note: it is the responsibility of nodeSort.c to react properly to
792+
* changes of these parameters. If we ever redesign this, it'd be a
793+
* good idea to integrate this signaling with the parameter-change
794+
* mechanism.
795+
*/
796+
SortState*sortState= (SortState*)child_node;
797+
798+
if (tuples_needed<0)
799+
{
800+
/* make sure flag gets reset if needed upon rescan */
801+
sortState->bounded= false;
802+
}
803+
else
804+
{
805+
sortState->bounded= true;
806+
sortState->bound=tuples_needed;
807+
}
808+
}
809+
elseif (IsA(child_node,MergeAppendState))
810+
{
811+
/*
812+
* If it is a MergeAppend, we can apply the bound to any nodes that
813+
* are children of the MergeAppend, since the MergeAppend surely need
814+
* read no more than that many tuples from any one input.
815+
*/
816+
MergeAppendState*maState= (MergeAppendState*)child_node;
817+
inti;
818+
819+
for (i=0;i<maState->ms_nplans;i++)
820+
ExecSetTupleBound(tuples_needed,maState->mergeplans[i]);
821+
}
822+
elseif (IsA(child_node,ResultState))
823+
{
824+
/*
825+
* Similarly, for a projecting Result, we can apply the bound to its
826+
* child node.
827+
*
828+
* If Result supported qual checking, we'd have to punt on seeing a
829+
* qual. Note that having a resconstantqual is not a showstopper: if
830+
* that condition succeeds it affects nothing, while if it fails, no
831+
* rows will be demanded from the Result child anyway.
832+
*/
833+
if (outerPlanState(child_node))
834+
ExecSetTupleBound(tuples_needed,outerPlanState(child_node));
835+
}
836+
elseif (IsA(child_node,SubqueryScanState))
837+
{
838+
/*
839+
* We can also descend through SubqueryScan, but only if it has no
840+
* qual (otherwise it might discard rows).
841+
*/
842+
SubqueryScanState*subqueryState= (SubqueryScanState*)child_node;
843+
844+
if (subqueryState->ss.ps.qual==NULL)
845+
ExecSetTupleBound(tuples_needed,subqueryState->subplan);
846+
}
847+
elseif (IsA(child_node,GatherState))
848+
{
849+
/*
850+
* A Gather node can propagate the bound to its workers. As with
851+
* MergeAppend, no one worker could possibly need to return more
852+
* tuples than the Gather itself needs to.
853+
*
854+
* Note: As with Sort, the Gather node is responsible for reacting
855+
* properly to changes to this parameter.
856+
*/
857+
GatherState*gstate= (GatherState*)child_node;
858+
859+
gstate->tuples_needed=tuples_needed;
860+
861+
/* Also pass down the bound to our own copy of the child plan */
862+
ExecSetTupleBound(tuples_needed,outerPlanState(child_node));
863+
}
864+
elseif (IsA(child_node,GatherMergeState))
865+
{
866+
/* Same comments as for Gather */
867+
GatherMergeState*gstate= (GatherMergeState*)child_node;
868+
869+
gstate->tuples_needed=tuples_needed;
870+
871+
ExecSetTupleBound(tuples_needed,outerPlanState(child_node));
872+
}
873+
874+
/*
875+
* In principle we could descend through any plan node type that is
876+
* certain not to discard or combine input rows; but on seeing a node that
877+
* can do that, we can't propagate the bound any further. For the moment
878+
* it's unclear that any other cases are worth checking here.
879+
*/
880+
}

‎src/backend/executor/nodeGather.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
7272
gatherstate->ps.state=estate;
7373
gatherstate->ps.ExecProcNode=ExecGather;
7474
gatherstate->need_to_scan_locally= !node->single_copy;
75+
gatherstate->tuples_needed=-1;
7576

7677
/*
7778
* Miscellaneous initialization
@@ -156,7 +157,8 @@ ExecGather(PlanState *pstate)
156157
if (!node->pei)
157158
node->pei=ExecInitParallelPlan(node->ps.lefttree,
158159
estate,
159-
gather->num_workers);
160+
gather->num_workers,
161+
node->tuples_needed);
160162

161163
/*
162164
* Register backend workers. We might not get as many as we

‎src/backend/executor/nodeGatherMerge.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
7777
gm_state->ps.plan= (Plan*)node;
7878
gm_state->ps.state=estate;
7979
gm_state->ps.ExecProcNode=ExecGatherMerge;
80+
gm_state->tuples_needed=-1;
8081

8182
/*
8283
* Miscellaneous initialization
@@ -190,7 +191,8 @@ ExecGatherMerge(PlanState *pstate)
190191
if (!node->pei)
191192
node->pei=ExecInitParallelPlan(node->ps.lefttree,
192193
estate,
193-
gm->num_workers);
194+
gm->num_workers,
195+
node->tuples_needed);
194196

195197
/* Try to launch workers. */
196198
pcxt=node->pei->pcxt;

‎src/backend/executor/nodeLimit.c

Lines changed: 16 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
#include"nodes/nodeFuncs.h"
2828

2929
staticvoidrecompute_limits(LimitState*node);
30-
staticvoidpass_down_bound(LimitState*node,PlanState*child_node);
30+
staticint64compute_tuples_needed(LimitState*node);
3131

3232

3333
/* ----------------------------------------------------------------
@@ -297,92 +297,26 @@ recompute_limits(LimitState *node)
297297
/* Set state-machine state */
298298
node->lstate=LIMIT_RESCAN;
299299

300-
/* Notify child node about limit, if useful */
301-
pass_down_bound(node,outerPlanState(node));
300+
/*
301+
* Notify child node about limit. Note: think not to "optimize" by
302+
* skipping ExecSetTupleBound if compute_tuples_needed returns < 0. We
303+
* must update the child node anyway, in case this is a rescan and the
304+
* previous time we got a different result.
305+
*/
306+
ExecSetTupleBound(compute_tuples_needed(node),outerPlanState(node));
302307
}
303308

304309
/*
305-
* If we have a COUNT, and our input is a Sort node, notify it that it can
306-
* use bounded sort. We can also pass down the bound through plan nodes
307-
* that cannot remove or combine input rows; for example, if our input is a
308-
* MergeAppend, we can apply the same bound to any Sorts that are direct
309-
* children of the MergeAppend, since the MergeAppend surely need not read
310-
* more than that many tuples from any one input.
311-
*
312-
* This is a bit of a kluge, but we don't have any more-abstract way of
313-
* communicating between the two nodes; and it doesn't seem worth trying
314-
* to invent one without some more examples of special communication needs.
315-
*
316-
* Note: it is the responsibility of nodeSort.c to react properly to
317-
* changes of these parameters. If we ever do redesign this, it'd be a
318-
* good idea to integrate this signaling with the parameter-change mechanism.
310+
* Compute the maximum number of tuples needed to satisfy this Limit node.
311+
* Return a negative value if there is not a determinable limit.
319312
*/
320-
staticvoid
321-
pass_down_bound(LimitState*node,PlanState*child_node)
313+
staticint64
314+
compute_tuples_needed(LimitState*node)
322315
{
323-
/*
324-
* Since this function recurses, in principle we should check stack depth
325-
* here. In practice, it's probably pointless since the earlier node
326-
* initialization tree traversal would surely have consumed more stack.
327-
*/
328-
329-
if (IsA(child_node,SortState))
330-
{
331-
SortState*sortState= (SortState*)child_node;
332-
int64tuples_needed=node->count+node->offset;
333-
334-
/* negative test checks for overflow in sum */
335-
if (node->noCount||tuples_needed<0)
336-
{
337-
/* make sure flag gets reset if needed upon rescan */
338-
sortState->bounded= false;
339-
}
340-
else
341-
{
342-
sortState->bounded= true;
343-
sortState->bound=tuples_needed;
344-
}
345-
}
346-
elseif (IsA(child_node,MergeAppendState))
347-
{
348-
/* Pass down the bound through MergeAppend */
349-
MergeAppendState*maState= (MergeAppendState*)child_node;
350-
inti;
351-
352-
for (i=0;i<maState->ms_nplans;i++)
353-
pass_down_bound(node,maState->mergeplans[i]);
354-
}
355-
elseif (IsA(child_node,ResultState))
356-
{
357-
/*
358-
* We also have to be prepared to look through a Result, since the
359-
* planner might stick one atop MergeAppend for projection purposes.
360-
*
361-
* If Result supported qual checking, we'd have to punt on seeing a
362-
* qual. Note that having a resconstantqual is not a showstopper: if
363-
* that fails we're not getting any rows at all.
364-
*/
365-
if (outerPlanState(child_node))
366-
pass_down_bound(node,outerPlanState(child_node));
367-
}
368-
elseif (IsA(child_node,SubqueryScanState))
369-
{
370-
/*
371-
* We can also look through SubqueryScan, but only if it has no qual
372-
* (otherwise it might discard rows).
373-
*/
374-
SubqueryScanState*subqueryState= (SubqueryScanState*)child_node;
375-
376-
if (subqueryState->ss.ps.qual==NULL)
377-
pass_down_bound(node,subqueryState->subplan);
378-
}
379-
380-
/*
381-
* In principle we could look through any plan node type that is certain
382-
* not to discard or combine input rows. In practice, there are not many
383-
* node types that the planner might put between Sort and Limit, so trying
384-
* to be very general is not worth the trouble.
385-
*/
316+
if (node->noCount)
317+
return-1;
318+
/* Note: if this overflows, we'll return a negative value, which is OK */
319+
returnnode->count+node->offset;
386320
}
387321

388322
/* ----------------------------------------------------------------

‎src/include/executor/execParallel.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ typedef struct ParallelExecutorInfo
3333
}ParallelExecutorInfo;
3434

3535
externParallelExecutorInfo*ExecInitParallelPlan(PlanState*planstate,
36-
EState*estate,intnworkers);
36+
EState*estate,intnworkers,int64tuples_needed);
3737
externvoidExecParallelFinish(ParallelExecutorInfo*pei);
3838
externvoidExecParallelCleanup(ParallelExecutorInfo*pei);
3939
externvoidExecParallelReinitialize(ParallelExecutorInfo*pei);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp