Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit39bc2c9

Browse files
committed
Draft, do not use in experiments!
1 parentba7614e commit39bc2c9

File tree

4 files changed

+240
-80
lines changed

4 files changed

+240
-80
lines changed

‎contrib/tempscan/nodeCustomTempScan.c

Lines changed: 143 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -163,47 +163,44 @@ create_partial_tempscan_plan(PlannerInfo *root, RelOptInfo *rel,
163163
cscan->custom_plans=custom_plans;
164164
cscan->methods=&plan_methods;
165165
cscan->flags=best_path->flags;
166-
cscan->custom_private=best_path->custom_private;
166+
cscan->custom_private=list_make1(makeInteger(best_path->path.parallel_workers));
167167

168168
return&cscan->scan.plan;
169169
}
170170

171171
typedefstructSharedTempScanInfo
172172
{
173-
intnworkers;
173+
intnworkers_launched;
174174
dsm_handlehandle;
175175
}SharedTempScanInfo;
176176

177-
#defineSharedTempScanInfoHeaderSize offsetof(SharedTempScanInfo, data)
178-
179-
typedefstructTempScanInfo
180-
{
181-
shm_mq_handle**tqueue;
182-
DestReceiver**receiver;
183-
}TempScanInfo;
184-
185177
typedefstructParallelTempScanState
186178
{
187179
CustomScanStatenode;
188180

189181
boolinitialized;
182+
intnworkers;/* workers planned. Needed to know how much resources to free */
190183
DestReceiver**receiver;/* Must be NULL for workers */
191-
TempScanInfoptsi;
184+
shm_mq_handle**tqueue;
185+
ParallelContext*pcxt;
192186
SharedTempScanInfo*shared;
193187

194188
TupleQueueReader*reader;
189+
boolparallelMode;
195190
}ParallelTempScanState;
196191

197192
staticNode*
198193
create_tempscan_state(CustomScan*cscan)
199194
{
200195
ParallelTempScanState*ts=palloc0(sizeof(ParallelTempScanState));
201196
CustomScanState*cstate= (CustomScanState*)ts;
197+
intpath_workers=linitial_node(Integer,cscan->custom_private)->ival;
202198

203199
Assert(list_length(cscan->custom_plans)==1);
204200

205201
cstate->ss.ps.type=T_CustomScanState;
206202
cstate->methods=&exec_methods;
203+
ts->parallelMode= (path_workers>0);
207204

208205
/*
209206
* Setup slotOps manually. Although we just put incoming tuple to the result
@@ -213,7 +210,9 @@ create_tempscan_state(CustomScan *cscan)
213210
cstate->slotOps=&TTSOpsMinimalTuple;
214211

215212
ts->receiver=NULL;
213+
ts->tqueue=NULL;
216214
ts->initialized= false;
215+
217216
ts->shared=NULL;
218217

219218
if (!IsParallelWorker())
@@ -270,76 +269,115 @@ ExecTempScan(CustomScanState *node)
270269
{
271270
ParallelTempScanState*ts= (ParallelTempScanState*)node;
272271
TupleTableSlot*result=ts->node.ss.ss_ScanTupleSlot;
272+
TupleTableSlot*slot;
273+
boolshould_free;
274+
MinimalTupletup;
275+
inti;
273276

274277
/*
275278
* HACK. At this point Custom DSM already initialised and we can switch off
276279
* this parameter.
277280
*/
278-
ts->node.ss.ps.plan->parallel_aware= false;
279-
280-
/* Forbid rescanning */
281-
ts->initialized= true;
281+
if (ts->pcxt->nworkers_launched==0)
282+
ts->node.ss.ps.plan->parallel_aware= false;
282283

283-
if (!IsParallelWorker())
284+
if (IsParallelWorker())
284285
{
285-
TupleTableSlot*slot;
286-
boolshould_free;
287-
MinimalTupletuple;
288-
inti;
286+
MinimalTupletup;
287+
booldone;
289288

290-
Assert(list_length(node->custom_ps)==1);
289+
/* Parallel worker should receive something from the tqueue */
290+
tup=TupleQueueReaderNext(ts->reader, false,&done);
291291

292-
slot=ExecProcNode((PlanState*)linitial(node->custom_ps));
293-
if (TupIsNull(slot))
292+
if (done)
294293
{
295-
if (ts->ptsi.receiver!=NULL)
296-
{
297-
for (i=0;i<ts->shared->nworkers;i++)
298-
{
299-
ts->ptsi.receiver[i]->rDestroy(ts->ptsi.receiver[i]);
300-
ts->ptsi.receiver[i]=NULL;
301-
ts->ptsi.tqueue[i]=NULL;
302-
}
303-
pfree(ts->ptsi.receiver);
304-
ts->ptsi.receiver=NULL;
305-
}
306-
307-
/* The end of the table is achieved, Return empty tuple to all */
294+
Assert(tup==NULL);
308295
returnNULL;
309296
}
310297

298+
/* TODO: should free ? */
299+
ExecStoreMinimalTuple(tup,result, false);
300+
result->tts_ops->copyslot(result,result);
301+
returnresult;
302+
}
303+
304+
Assert(list_length(node->custom_ps)==1);
305+
306+
if (!ts->initialized)
307+
{
308+
/*
309+
* Save number of workers because we will need it on later
310+
* stages of the execution.
311+
*/
312+
ts->shared->nworkers_launched=ts->pcxt->nworkers_launched;
313+
ts->initialized= true;
314+
}
315+
316+
slot=ExecProcNode((PlanState*)linitial(node->custom_ps));
317+
if (ts->receiver==NULL)
318+
returnslot;
319+
320+
if (TupIsNull(slot))
321+
{
322+
/* Parallel workers case */
323+
for (i=0;i<ts->shared->nworkers_launched;i++)
324+
{
325+
ts->receiver[i]->rDestroy(ts->receiver[i]);
326+
ts->receiver[i]=NULL;
327+
ts->tqueue[i]=NULL;
328+
}
329+
pfree(ts->receiver);
330+
ts->receiver=NULL;
331+
/* The end of the table is achieved, Return empty tuple to all */
332+
returnNULL;
333+
}
334+
335+
if (!ts->parallelMode)
336+
{
311337
/* Prepare mimimal tuple to send all workers and upstream locally. */
312-
tuple=ExecFetchSlotMinimalTuple(slot,&should_free);
313-
ExecStoreMinimalTuple(tuple,result,should_free);
338+
tup=ExecFetchSlotMinimalTuple(slot,&should_free);
339+
ExecStoreMinimalTuple(tup,result,should_free);
314340

315-
if (ts->ptsi.receiver!=NULL)
341+
/* Send the same tuple to each of worker. Don't forget myself */
342+
for (i=0;i<ts->shared->nworkers_launched;++i)
316343
{
317-
for (i=0;i<ts->shared->nworkers;++i)
318-
{
319-
ts->ptsi.receiver[i]->receiveSlot(result,ts->ptsi.receiver[i]);
320-
}
344+
boolret;
345+
346+
ret=ts->receiver[i]->receiveSlot(result,ts->receiver[i]);
347+
Assert(ret);
321348
}
349+
returnresult;
322350
}
323351
else
324352
{
325-
MinimalTupletup;
326-
booldone;
353+
intnworkers=ts->pcxt->nworkers_launched;
354+
/* Overwise we should tuple only to one of the workers */
327355

328-
/* Parallel worker should receive something from the tqueue */
329-
tup=TupleQueueReaderNext(ts->reader, false,&done);
356+
typedefstructTQueueDestReceiver
357+
{
358+
DestReceiverpub;/* public fields */
359+
shm_mq_handle*queue;/* shm_mq to send to */
360+
}TQueueDestReceiver;
330361

331-
if (done)
362+
TQueueDestReceiver*rec;
363+
364+
while (nworkers>0)
332365
{
333-
Assert(tup==NULL);
334-
returnNULL;
335-
}
366+
/* Prepare mimimal tuple */
367+
tup=ExecFetchSlotMinimalTuple(slot,&should_free);
368+
ExecStoreMinimalTuple(tup,result,should_free);
336369

337-
/* TODO: should free ? */
338-
ExecStoreMinimalTuple(tup,result, false);
339-
result->tts_ops->copyslot(result,result);
370+
for (i=0;i<nworkers;i++)
371+
{
372+
rec= (TQueueDestReceiver*)ts->receiver[i];
373+
result=shm_mq_send(tqueue->queue,tuple->t_len,tuple, false, false);
374+
(void)WaitLatch(MyLatch,
375+
WL_LATCH_SET |WL_TIMEOUT |WL_EXIT_ON_PM_DEATH,
376+
(nap.tv_sec*1000L)+ (nap.tv_usec /1000L),
377+
WAIT_EVENT_AUTOVACUUM_MAIN);
378+
}
379+
}
340380
}
341-
342-
returnresult;
343381
}
344382

345383
staticvoid
@@ -355,13 +393,13 @@ EndTempScan(CustomScanState *node)
355393
ExecEndNode((PlanState*)linitial(node->custom_ps));
356394

357395
/* Can happen if not all tuples needed */
358-
if (ts->ptsi.receiver!=NULL)
396+
if (ts->receiver!=NULL)
359397
{
360398
inti;
361399

362-
for (i=0;i<ts->shared->nworkers;++i)
400+
for (i=0;i<ts->nworkers;++i)
363401
{
364-
ts->ptsi.receiver[i]->rDestroy(ts->ptsi.receiver[i]);
402+
ts->receiver[i]->rDestroy(ts->receiver[i]);
365403
}
366404
}
367405
}
@@ -454,14 +492,29 @@ try_partial_tempscan(PlannerInfo *root, RelOptInfo *rel, Index rti,
454492
create_index_paths(root,rel);
455493
create_tidscan_paths(root,rel);
456494

495+
if (rel->consider_parallel&&rel->lateral_relids==NULL)
496+
{
497+
intparallel_workers;
498+
499+
parallel_workers=compute_parallel_worker(rel,rel->pages,-1,
500+
max_parallel_workers_per_gather);
501+
502+
/* If any limit was set to zero, the user doesn't want a parallel scan. */
503+
if (parallel_workers <=0)
504+
return;
505+
506+
/* Add an unordered partial path based on a parallel sequential scan. */
507+
add_partial_path(rel,create_seqscan_path(root,rel,NULL,parallel_workers));
508+
}
509+
457510
/*
458511
* Dangerous zone. But we assume it is strictly local. What about extension
459512
* which could call ours and may have desire to add some partial paths after
460513
* us?
461514
*/
462515

463-
list_free(rel->partial_pathlist);
464-
rel->partial_pathlist=NIL;
516+
//list_free(rel->partial_pathlist);
517+
//rel->partial_pathlist = NIL;
465518

466519
/*
467520
* Set guard over each parallel_safe path
@@ -488,8 +541,8 @@ try_partial_tempscan(PlannerInfo *root, RelOptInfo *rel, Index rti,
488541
* lateral references guarantees we don't need to change any parameters
489542
* on a ReScan?
490543
*/
491-
add_path(rel, (Path*)
492-
create_material_path(cpath->parent, (Path*)cpath));
544+
add_path(rel, (Path*)cpath
545+
/*create_material_path(cpath->parent, (Path *) cpath)*/);
493546
}
494547

495548
list_free(parallel_safe_lst);
@@ -607,33 +660,44 @@ InitializeDSMTempScan(CustomScanState *node, ParallelContext *pcxt,
607660
DSM_CREATE_NULL_IF_MAXSEGMENTS);
608661
Assert(seg!=NULL);/* Don't process this case so far */
609662

663+
ts->pcxt=pcxt;
664+
610665
/* Save shared data for common usage in parallel workers */
611666
ts->shared= (SharedTempScanInfo*)coordinate;
612667
ts->shared->handle=dsm_segment_handle(seg);
668+
ts->nworkers=pcxt->nworkers;
613669

614670
/*
615-
* Save number of workers because we will need it on later stages of the
616-
* execution.
671+
* We can't initialise queues to workers here because not sure about real
672+
* number of workers will be launched (depends on the number of free slots
673+
* for background workers - see max_worker_processes).
617674
*/
618-
ts->shared->nworkers=pcxt->nworkers;
619675

620-
if (ts->shared->nworkers>0)
621-
{
622-
inti;
623-
dsm_segment*seg=dsm_find_mapping(ts->shared->handle);
676+
/*
677+
* Initialise receivers here.
678+
* We don't do it earlier because real number of launched workers
679+
* will be known only after the Gather node launch them.
680+
* Anyway, in the case of any troubles we can initialise them
681+
* earlier and just not use the tail of them during the execution.
682+
*/
683+
if (ts->shared&&ts->nworkers>0)
684+
{
685+
inti;
686+
dsm_segment*seg=dsm_find_mapping(ts->shared->handle);
624687

625-
ts->ptsi.tqueue=
626-
ExecParallelSetupTupleQueues(ts->shared->nworkers,
688+
ts->tqueue=
689+
ExecParallelSetupTupleQueues(ts->nworkers,
627690
(char*)dsm_segment_address(seg),
628691
seg);
629692

630-
ts->ptsi.receiver=palloc(ts->shared->nworkers*sizeof(DestReceiver*));
631-
for (i=0;i<ts->shared->nworkers;i++)
632-
{
633-
ts->ptsi.receiver[i]=
634-
CreateTupleQueueDestReceiver(ts->ptsi.tqueue[i]);
635-
}
693+
ts->receiver=palloc(ts->nworkers*sizeof(DestReceiver*));
694+
for (i=0;i<ts->nworkers;i++)
695+
{
696+
ts->receiver[i]=CreateTupleQueueDestReceiver(ts->tqueue[i]);
636697
}
698+
}
699+
else
700+
elog(WARNING,"Workers do not needed");
637701
}
638702

639703
staticvoid
@@ -662,6 +726,7 @@ InitializeWorkerTempScan(CustomScanState *node, shm_toc *toc,
662726
shm_mq_set_receiver(mq,MyProc);
663727

664728
ts->reader=CreateTupleQueueReader(shm_mq_attach(mq,seg,NULL));
729+
ts->initialized= true;
665730
}
666731

667732
staticvoid

‎src/backend/optimizer/path/costsize.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2038,8 +2038,15 @@ cost_incremental_sort(Path *path,
20382038
foreach(l,pathkeys)
20392039
{
20402040
PathKey*key= (PathKey*)lfirst(l);
2041-
EquivalenceMember*member= (EquivalenceMember*)
2042-
linitial(key->pk_eclass->ec_members);
2041+
EquivalenceMember*member;
2042+
2043+
member=choose_computable_ec_member(root,key->pk_eclass,
2044+
path->pathtarget->exprs,
2045+
path->parent->relids,
2046+
path->parallel_safe);
2047+
2048+
if (!member)
2049+
elog(ERROR,"could not find pathkey item to sort");
20432050

20442051
/*
20452052
* Check if the expression contains Var with "varno 0" so that we

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp