Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6e71dd7

Browse files
committed
Modify tqueue infrastructure to support transient record types.
Commit4a4e689, which introduced thismechanism, failed to account for the fact that the RECORD pseudo-typeuses transient typmods that are only meaningful within a singlebackend. Transferring such tuples without modification between twocooperating backends does not work. This commit installs a systemfor passing the tuple descriptors over the same shm_mq being used tosend the tuples themselves. The two sides might not assign the sametransient typmod to any given tuple descriptor, so we must alsosubstitute the appropriate receiver-side typmod for the one used bythe sender. That adds some CPU overhead, but still seems better thanbeing unable to pass records between cooperating parallel processes.Along the way, move the logic for handling multiple tuple queues fromtqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,which reads from a single queue, rather than a TupleQueueFunnel, whichpotentially reads from multiple queues. This change was suggestedpreviously as a way to make sure that nodeGather.c rather than tqueue.chad policy control over the order in which to read from queues, butit wasn't clear to me until now how good an idea it was. typmodmapping needs to be performed separately for each queue, and it ismuch simpler if the tqueue.c code handles that and leaves multiplexingmultiple queues to higher layers of the stack.
1 parentcbb82e3 commit6e71dd7

File tree

5 files changed

+986
-153
lines changed

5 files changed

+986
-153
lines changed

‎src/backend/executor/nodeGather.c

Lines changed: 108 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@
3636
#include"executor/nodeGather.h"
3737
#include"executor/nodeSubplan.h"
3838
#include"executor/tqueue.h"
39+
#include"miscadmin.h"
3940
#include"utils/memutils.h"
4041
#include"utils/rel.h"
4142

4243

4344
staticTupleTableSlot*gather_getnext(GatherState*gatherstate);
45+
staticHeapTuplegather_readnext(GatherState*gatherstate);
4446
staticvoidExecShutdownGatherWorkers(GatherState*node);
4547

4648

@@ -125,6 +127,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
125127
TupleTableSlot*
126128
ExecGather(GatherState*node)
127129
{
130+
TupleTableSlot*fslot=node->funnel_slot;
128131
inti;
129132
TupleTableSlot*slot;
130133
TupleTableSlot*resultSlot;
@@ -148,6 +151,7 @@ ExecGather(GatherState *node)
148151
*/
149152
if (gather->num_workers>0&&IsInParallelMode())
150153
{
154+
ParallelContext*pcxt;
151155
boolgot_any_worker= false;
152156

153157
/* Initialize the workers required to execute Gather node. */
@@ -160,18 +164,26 @@ ExecGather(GatherState *node)
160164
* Register backend workers. We might not get as many as we
161165
* requested, or indeed any at all.
162166
*/
163-
LaunchParallelWorkers(node->pei->pcxt);
167+
pcxt=node->pei->pcxt;
168+
LaunchParallelWorkers(pcxt);
164169

165-
/* Set up a tuple queue to collect the results. */
166-
node->funnel=CreateTupleQueueFunnel();
167-
for (i=0;i<node->pei->pcxt->nworkers;++i)
170+
/* Set up tuple queue readers to read the results. */
171+
if (pcxt->nworkers>0)
168172
{
169-
if (node->pei->pcxt->worker[i].bgwhandle)
173+
node->nreaders=0;
174+
node->reader=
175+
palloc(pcxt->nworkers*sizeof(TupleQueueReader*));
176+
177+
for (i=0;i<pcxt->nworkers;++i)
170178
{
179+
if (pcxt->worker[i].bgwhandle==NULL)
180+
continue;
181+
171182
shm_mq_set_handle(node->pei->tqueue[i],
172-
node->pei->pcxt->worker[i].bgwhandle);
173-
RegisterTupleQueueOnFunnel(node->funnel,
174-
node->pei->tqueue[i]);
183+
pcxt->worker[i].bgwhandle);
184+
node->reader[node->nreaders++]=
185+
CreateTupleQueueReader(node->pei->tqueue[i],
186+
fslot->tts_tupleDescriptor);
175187
got_any_worker= true;
176188
}
177189
}
@@ -182,7 +194,7 @@ ExecGather(GatherState *node)
182194
}
183195

184196
/* Run plan locally if no workers or not single-copy. */
185-
node->need_to_scan_locally= (node->funnel==NULL)
197+
node->need_to_scan_locally= (node->reader==NULL)
186198
|| !gather->single_copy;
187199
node->initialized= true;
188200
}
@@ -254,13 +266,9 @@ ExecEndGather(GatherState *node)
254266
}
255267

256268
/*
257-
* gather_getnext
258-
*
259-
* Get the next tuple from shared memory queue. This function
260-
* is responsible for fetching tuples from all the queues associated
261-
* with worker backends used in Gather node execution and if there is
262-
* no data available from queues or no worker is available, it does
263-
* fetch the data from local node.
269+
* Read the next tuple. We might fetch a tuple from one of the tuple queues
270+
* using gather_readnext, or if no tuple queue contains a tuple and the
271+
* single_copy flag is not set, we might generate one locally instead.
264272
*/
265273
staticTupleTableSlot*
266274
gather_getnext(GatherState*gatherstate)
@@ -270,18 +278,11 @@ gather_getnext(GatherState *gatherstate)
270278
TupleTableSlot*fslot=gatherstate->funnel_slot;
271279
HeapTupletup;
272280

273-
while (gatherstate->funnel!=NULL||gatherstate->need_to_scan_locally)
281+
while (gatherstate->reader!=NULL||gatherstate->need_to_scan_locally)
274282
{
275-
if (gatherstate->funnel!=NULL)
283+
if (gatherstate->reader!=NULL)
276284
{
277-
booldone= false;
278-
279-
/* wait only if local scan is done */
280-
tup=TupleQueueFunnelNext(gatherstate->funnel,
281-
gatherstate->need_to_scan_locally,
282-
&done);
283-
if (done)
284-
ExecShutdownGatherWorkers(gatherstate);
285+
tup=gather_readnext(gatherstate);
285286

286287
if (HeapTupleIsValid(tup))
287288
{
@@ -309,6 +310,80 @@ gather_getnext(GatherState *gatherstate)
309310
returnExecClearTuple(fslot);
310311
}
311312

313+
/*
314+
* Attempt to read a tuple from one of our parallel workers.
315+
*/
316+
staticHeapTuple
317+
gather_readnext(GatherState*gatherstate)
318+
{
319+
intwaitpos=gatherstate->nextreader;
320+
321+
for (;;)
322+
{
323+
TupleQueueReader*reader;
324+
HeapTupletup;
325+
boolreaderdone;
326+
327+
/* Make sure we've read all messages from workers. */
328+
HandleParallelMessages();
329+
330+
/* Attempt to read a tuple, but don't block if none is available. */
331+
reader=gatherstate->reader[gatherstate->nextreader];
332+
tup=TupleQueueReaderNext(reader, true,&readerdone);
333+
334+
/*
335+
* If this reader is done, remove it. If all readers are done,
336+
* clean up remaining worker state.
337+
*/
338+
if (readerdone)
339+
{
340+
DestroyTupleQueueReader(reader);
341+
--gatherstate->nreaders;
342+
if (gatherstate->nreaders==0)
343+
{
344+
ExecShutdownGather(gatherstate);
345+
returnNULL;
346+
}
347+
else
348+
{
349+
memmove(&gatherstate->reader[gatherstate->nextreader],
350+
&gatherstate->reader[gatherstate->nextreader+1],
351+
sizeof(TupleQueueReader*)
352+
* (gatherstate->nreaders-gatherstate->nextreader));
353+
if (gatherstate->nextreader >=gatherstate->nreaders)
354+
gatherstate->nextreader=0;
355+
if (gatherstate->nextreader<waitpos)
356+
--waitpos;
357+
}
358+
continue;
359+
}
360+
361+
/* Advance nextreader pointer in round-robin fashion. */
362+
gatherstate->nextreader=
363+
(gatherstate->nextreader+1) %gatherstate->nreaders;
364+
365+
/* If we got a tuple, return it. */
366+
if (tup)
367+
returntup;
368+
369+
/* Have we visited every TupleQueueReader? */
370+
if (gatherstate->nextreader==waitpos)
371+
{
372+
/*
373+
* If (still) running plan locally, return NULL so caller can
374+
* generate another tuple from the local copy of the plan.
375+
*/
376+
if (gatherstate->need_to_scan_locally)
377+
returnNULL;
378+
379+
/* Nothing to do except wait for developments. */
380+
WaitLatch(MyLatch,WL_LATCH_SET,0);
381+
CHECK_FOR_INTERRUPTS();
382+
ResetLatch(MyLatch);
383+
}
384+
}
385+
}
386+
312387
/* ----------------------------------------------------------------
313388
*ExecShutdownGatherWorkers
314389
*
@@ -320,11 +395,14 @@ gather_getnext(GatherState *gatherstate)
320395
void
321396
ExecShutdownGatherWorkers(GatherState*node)
322397
{
323-
/* Shut down tuple queuefunnel before shutting down workers. */
324-
if (node->funnel!=NULL)
398+
/* Shut down tuple queuereaders before shutting down workers. */
399+
if (node->reader!=NULL)
325400
{
326-
DestroyTupleQueueFunnel(node->funnel);
327-
node->funnel=NULL;
401+
inti;
402+
403+
for (i=0;i<node->nreaders;++i)
404+
DestroyTupleQueueReader(node->reader[i]);
405+
node->reader=NULL;
328406
}
329407

330408
/* Now shut down the workers. */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp