Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfccbb2c

Browse files
committed
Add dummy parallel join
1 parent03a11dd commitfccbb2c

File tree

11 files changed

+382
-120
lines changed

11 files changed

+382
-120
lines changed

‎contrib/pg_exchange/exchange.c

Lines changed: 82 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ static void EXCHANGE_InitializeWorker(CustomScanState *node,
6565
staticNode*EXCHANGE_Create_state(CustomScan*node);
6666

6767

68+
#defineEND_OF_TUPLES'E'
69+
#defineEND_OF_EXCHANGE 'Q'
6870
void
6971
EXCHANGE_Init_methods(void)
7072
{
@@ -95,6 +97,7 @@ EXCHANGE_Init_methods(void)
9597
DistExec_Init_methods();
9698
}
9799

100+
#include"nodes/relation.h"
98101
/*
99102
* Add one path for a base relation target: replace all ForeignScan nodes by
100103
* local Scan nodes.
@@ -116,7 +119,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
116119
*/
117120
return;
118121

119-
elog(INFO,"INSERT EXCHANGE");
122+
elog(INFO,"INSERT EXCHANGE. paths: %d",list_length(rel->pathlist));
120123

121124
/* Traverse all possible paths and search for APPEND */
122125
foreach(lc,rel->pathlist)
@@ -125,16 +128,15 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
125128
Path*tmpLocalScanPath=NULL;
126129
AppendPath*appendPath=NULL;
127130
ListCell*lc1;
128-
List*private_data=NIL;
129-
130-
Assert(path->pathtype!=T_MergeAppend);/* Do it later */
131+
Bitmapset*servers=NULL;
132+
List*subpaths=NIL;
131133

132134
if (path->pathtype!=T_Append)
133135
continue;
134136

135-
appendPath=makeNode(AppendPath);
136-
memcpy(appendPath,path,sizeof(AppendPath));
137-
appendPath->subpaths=NIL;
137+
//elog(INFO, "-> IE. path params: %hhu, ptype: %d, tcost: %f, scost: %f",
138+
//path->param_info != NULL, path->pathtype,
139+
//path->total_cost, path->startup_cost);
138140

139141
/*
140142
* Traverse all APPEND subpaths, check for scan-type and search for
@@ -145,7 +147,9 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
145147
Path*subpath= (Path*)lfirst(lc1);
146148
Path*tmpPath;
147149
Oidserverid=InvalidOid;
148-
150+
elog(INFO,"--> IE. subpath params: %hhu, ptype: %d, tcost: %f, scost: %f",
151+
subpath->param_info!=NULL,subpath->pathtype,
152+
subpath->total_cost,subpath->startup_cost);
149153
if ((subpath->pathtype!=T_ForeignScan)&& (tmpLocalScanPath))
150154
/* Check assumption No.1 */
151155
Assert(tmpLocalScanPath->pathtype==subpath->pathtype);
@@ -159,8 +163,11 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
159163

160164
caseT_ForeignScan:
161165
serverid=subpath->parent->serverid;
166+
if (PATH_REQ_OUTER(subpath)!=NULL)
167+
continue;
162168
tmpPath= (Path*)makeNode(SeqScan);
163-
tmpPath=create_seqscan_path(root,subpath->parent,subpath->parent->lateral_relids,0);
169+
tmpPath=create_seqscan_path(root,subpath->parent,
170+
PATH_REQ_OUTER(subpath),0);
164171
break;
165172

166173
default:
@@ -170,22 +177,25 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
170177
if (!tmpLocalScanPath)
171178
tmpLocalScanPath=tmpPath;
172179

173-
appendPath->subpaths=lappend(appendPath->subpaths,tmpPath);
174-
if (OidIsValid(serverid))
175-
private_data=lappend_oid(private_data,serverid);
180+
subpaths=lappend(subpaths,tmpPath);
181+
//appendPath->subpaths = lappend(appendPath->subpaths, tmpPath);
182+
if (OidIsValid(serverid)&& !bms_is_member((int)serverid,servers))
183+
servers=bms_add_member(servers,serverid);
176184
}
177185

178-
if (private_data==NIL)
186+
if (servers==NULL)
179187
{
180-
pfree(appendPath);
181188
elog(INFO,"NO one foreign source found");
182189
continue;
183190
}
184191
else
185-
elog(INFO,"Source found: %d",list_length(private_data));
192+
elog(INFO,"Source found: %d",bms_num_members(servers));
186193

194+
appendPath=create_append_path(root,rel,subpaths,NIL,
195+
PATH_REQ_OUTER(tmpLocalScanPath),0, false,
196+
((AppendPath*)path)->partitioned_rels,-1);
187197
path=create_exchange_path(root,rel, (Path*)appendPath);
188-
path=create_distexec_path(root,rel,path,private_data);
198+
path=create_distexec_path(root,rel,path,servers);
189199
add_path(rel,path);
190200
}
191201
}
@@ -206,8 +216,8 @@ cost_exchange(PlannerInfo *root, RelOptInfo *baserel, Path *path)
206216

207217
/* Now I do not want to think about cost estimations. */
208218
path->rows=baserel->tuples;
209-
path->startup_cost=0.0001;
210-
path->total_cost=path->startup_cost+0.0001*path->rows;
219+
path->startup_cost=10000.0001;
220+
path->total_cost=path->startup_cost+100000.0001*path->rows;
211221
}
212222

213223
/* XXX: Need to be placed in shared memory */
@@ -342,7 +352,7 @@ EXCHANGE_Begin(CustomScanState *node, EState *estate, int eflags)
342352
{
343353
CustomScan*cscan= (CustomScan*)node->ss.ps.plan;
344354
Plan*scan_plan;
345-
boolexplain_only= ((eflags&EXEC_FLAG_EXPLAIN_ONLY)!=0);
355+
//boolexplain_only = ((eflags & EXEC_FLAG_EXPLAIN_ONLY) != 0);
346356
PlanState*planState;
347357
ExchangeState*state= (ExchangeState*)node;
348358
TupleDescscan_tupdesc;
@@ -353,7 +363,7 @@ EXCHANGE_Begin(CustomScanState *node, EState *estate, int eflags)
353363
planState= (PlanState*)ExecInitNode(scan_plan,estate,eflags);
354364
node->custom_ps=lappend(node->custom_ps,planState);
355365

356-
Assert(Stream_subscribe(state->stream));
366+
Stream_subscribe(state->stream);
357367

358368
state->init= false;
359369
state->ltuples=0;
@@ -375,23 +385,33 @@ distribution_fn_gather(TupleTableSlot *slot, DMQDestCont *dcont)
375385
return-1;
376386
}
377387

378-
staticTupleTableSlot*
379-
EXCHANGE_Execute(CustomScanState*node)
388+
staticvoid
389+
init_state_ifany(ExchangeState*state)
380390
{
381-
ScanState*ss=&node->ss;
382-
ScanState*subPlanState=linitial(node->custom_ps);
383-
ExchangeState*state= (ExchangeState*)node;
384-
boolreadRemote= true;
385-
386391
if (!state->init)
387392
{
388393
EphemeralNamedRelationenr=get_ENR(state->estate->es_queryEnv,destsName);
394+
395+
Assert(enr!=NULL&&enr->reldata!=NULL);
389396
state->dests= (DMQDestCont*)enr->reldata;
390-
state->init= true;
391397
state->hasLocal= true;
392398
state->activeRemotes=state->dests->nservers;
399+
state->init= true;
400+
//elog(INFO, "[%d] EXCHANGE Init", getpid());
393401
}
394402

403+
}
404+
405+
staticTupleTableSlot*
406+
EXCHANGE_Execute(CustomScanState*node)
407+
{
408+
ScanState*ss=&node->ss;
409+
ScanState*subPlanState=linitial(node->custom_ps);
410+
ExchangeState*state= (ExchangeState*)node;
411+
boolreadRemote= true;
412+
413+
init_state_ifany(state);
414+
395415
for(;;)
396416
{
397417
TupleTableSlot*slot=NULL;
@@ -405,22 +425,23 @@ EXCHANGE_Execute(CustomScanState *node)
405425

406426
slot=RecvTuple(ss->ss_ScanTupleSlot->tts_tupleDescriptor,
407427
state->stream,&status);
408-
if (status==0)
428+
switch (status)
409429
{
410-
if (TupIsNull(slot))
411-
{
412-
state->activeRemotes--;
413-
//elog(LOG, "Finish remote receiving. r=%d", state->rtuples);
414-
}
415-
else
416-
{
417-
state->rtuples++;
418-
//elog(LOG, "GOT tuple from another node. r=%d", state->rtuples);
419-
returnslot;
420-
}
430+
case-1:
431+
/* No tuples currently */
432+
break;
433+
case0:
434+
Assert(!TupIsNull(slot));
435+
state->rtuples++;
436+
returnslot;
437+
case1:
438+
state->activeRemotes--;
439+
break;
440+
case2:/* Close EXCHANGE channel */
441+
break;
442+
default:
443+
Assert(0);
421444
}
422-
//else
423-
//elog(LOG, "No remote tuples for now");
424445
}
425446

426447
if ((state->hasLocal)&& (!readRemote))
@@ -429,9 +450,9 @@ EXCHANGE_Execute(CustomScanState *node)
429450
if (TupIsNull(slot))
430451
{
431452
inti;
432-
//elog(LOG, "FINISH Local store: l=%d, r=%d", state->ltuples, state->rtuples);
453+
//elog(LOG, "[%s]FINISH Local store: l=%d, r=%d", state->stream, state->ltuples, state->rtuples);
433454
for (i=0;i<state->dests->nservers;i++)
434-
SendTuple(state->dests->dests[i].dest_id,state->stream,NULL);
455+
SendByteMessage(state->dests->dests[i].dest_id,state->stream,END_OF_TUPLES);
435456
state->hasLocal= false;
436457
continue;
437458
}
@@ -444,7 +465,8 @@ EXCHANGE_Execute(CustomScanState *node)
444465

445466
if ((state->activeRemotes==0)&& (!state->hasLocal))
446467
{
447-
elog(LOG,"Exchange returns NULL: %d %d",state->ltuples,state->rtuples);
468+
elog(LOG,"[%s] Exchange returns NULL: %d %d",state->stream,
469+
state->ltuples,state->rtuples);
448470
returnNULL;
449471
}
450472

@@ -457,7 +479,6 @@ EXCHANGE_Execute(CustomScanState *node)
457479
returnslot;
458480
else
459481
{
460-
//elog(LOG, "Send real tuple");
461482
SendTuple(dest,state->stream,slot);
462483
}
463484
}
@@ -471,20 +492,25 @@ EXCHANGE_End(CustomScanState *node)
471492

472493
Assert(list_length(node->custom_ps)==1);
473494
ExecEndNode(linitial(node->custom_ps));
474-
Assert(Stream_unsubscribe(state->stream));
475-
elog(LOG,"EXCHANGE_END");
476-
/*
477-
* Clean out exchange state
478-
*/
495+
Stream_unsubscribe(state->stream);
496+
497+
elog(INFO,"EXCHANGE_END");
479498
}
480499

481500
staticvoid
482501
EXCHANGE_Rescan(CustomScanState*node)
483502
{
484-
PlanState*outerPlan=outerPlanState(node);
503+
ExchangeState*state= (ExchangeState*)node;
504+
PlanState*subPlan= (PlanState*)linitial(node->custom_ps);
485505

486-
if (outerPlan->chgParam==NULL)
487-
ExecReScan(outerPlan);
506+
init_state_ifany(state);
507+
elog(INFO,"Rescan exchange! %d",getpid());
508+
if (subPlan->chgParam==NULL)
509+
ExecReScan(subPlan);
510+
state->activeRemotes=state->dests->nservers;
511+
state->ltuples=0;
512+
state->rtuples=0;
513+
state->hasLocal= true;
488514
}
489515

490516
staticvoid
@@ -500,8 +526,10 @@ static void
500526
EXCHANGE_Explain(CustomScanState*node,List*ancestors,ExplainState*es)
501527
{
502528
StringInfoDatastr;
529+
ExchangeState*state= (ExchangeState*)node;
503530

504531
initStringInfo(&str);
532+
appendStringInfo(&str,"stream: %s. ",state->stream);
505533
ExplainPropertyText("Exchange",str.data,es);
506534
}
507535

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp