Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3b9ebfd

Browse files
committed
Raw commit
1 parent8491aa4 commit3b9ebfd

File tree

4 files changed

+198
-46
lines changed

4 files changed

+198
-46
lines changed

‎contrib/pg_exchange/exchange.c

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -263,15 +263,18 @@ set_exchange_altrel(ExchangeMode mode, ExchangePath *path, RelOptInfo *outerrel,
263263
RelOptInfo*rel=&path->altrel;
264264

265265
Assert(rel&& (outerrel||innerrel));
266-
Assert(!bms_is_empty(servers));
266+
Assert(!bms_is_empty(servers)||mode!=EXCH_SHUFFLE);
267267

268-
rel->nparts=bms_num_members(servers);
269-
rel->part_rels=palloc(sizeof(RelOptInfo*)*rel->nparts);
270-
for (i=0;i<rel->nparts;i++)
268+
if (!bms_is_empty(servers))
271269
{
272-
sid=bms_next_member(servers,sid);
273-
rel->part_rels[i]=palloc0(sizeof(RelOptInfo));
274-
rel->part_rels[i]->serverid= (Oid)sid;
270+
rel->nparts=bms_num_members(servers);
271+
rel->part_rels=palloc(sizeof(RelOptInfo*)*rel->nparts);
272+
for (i=0;i<rel->nparts;i++)
273+
{
274+
sid=bms_next_member(servers,sid);
275+
rel->part_rels[i]=palloc0(sizeof(RelOptInfo));
276+
rel->part_rels[i]->serverid= (Oid)sid;
277+
}
275278
}
276279

277280
switch (mode)
@@ -399,6 +402,82 @@ make_local_scan_path(Path *localPath, RelOptInfo *rel,
399402
returnpathnode;
400403
}
401404

405+
staticPath*
406+
foreign_to_seqscan(PlannerInfo*root,RelOptInfo*rel,ForeignPath*fpath)
407+
{
408+
Path*seqScan=makeNode(Path);
409+
Coststartup_cost=0;
410+
Costcpu_run_cost;
411+
412+
seqScan->pathtype=T_SeqScan;
413+
seqScan->parallel_aware=fpath->path.parallel_aware;
414+
seqScan->parallel_safe=fpath->path.parallel_safe;
415+
seqScan->parallel_workers=fpath->path.parallel_workers;
416+
seqScan->param_info=get_baserel_parampathinfo(root,rel,rel->lateral_relids);
417+
418+
seqScan->parent=fpath->path.parent;
419+
seqScan->pathkeys=fpath->path.pathkeys;
420+
seqScan->pathtarget=fpath->path.pathtarget;
421+
seqScan->rows=fpath->path.rows;
422+
seqScan->startup_cost=fpath->path.startup_cost;
423+
seqScan->total_cost=fpath->path.total_cost;
424+
425+
cpu_run_cost=cpu_tuple_cost*rel->tuples;
426+
cpu_run_cost+=seqScan->pathtarget->cost.per_tuple*seqScan->rows;
427+
seqScan->startup_cost=startup_cost;
428+
/* TODO: Disk run costs and qual costs? */
429+
seqScan->total_cost=startup_cost+cpu_run_cost;
430+
431+
returnseqScan;
432+
}
433+
434+
staticvoid
435+
add_trivial_distributed_path(PlannerInfo*root,RelOptInfo*rel,Indexrti,RangeTblEntry*rte)
436+
{
437+
ListCell*lc;
438+
439+
foreach(lc,rel->pathlist)
440+
{
441+
Path*path= (Path*)lfirst(lc);
442+
Bitmapset*servers=NULL;
443+
IndexOptInfo*indexinfo=NULL;
444+
445+
switch (nodeTag(path))
446+
{
447+
caseT_Path:
448+
caseT_BitmapHeapPath:
449+
caseT_IndexPath:
450+
caseT_TidPath:
451+
caseT_SubqueryScanPath:
452+
break;
453+
454+
caseT_ForeignPath:
455+
servers=bms_add_member(servers, (int)path->parent->serverid);
456+
path=foreign_to_seqscan(root,rel, (ForeignPath*)path);
457+
break;
458+
459+
default:
460+
Assert(0);
461+
elog(FATAL,"Unexpected path node: %d",nodeTag(path));
462+
}
463+
path= (Path*)create_exchange_path(root,rel, (Path*)path,
464+
EXCH_GATHER);
465+
set_exchange_altrel(EXCH_GATHER, (ExchangePath*)path,rel,NULL,NULL,
466+
servers);
467+
if (indexinfo)
468+
{
469+
List**private;
470+
471+
private=&((ExchangePath*)path)->cp.custom_private;
472+
*private=lappend(*private,indexinfo);
473+
}
474+
475+
path= (Path*)create_distexec_path(root,rel,path,servers);
476+
477+
force_add_path(rel,path);
478+
}
479+
}
480+
402481
/*
403482
* Add one path for a base relation target: replace all ForeignScan nodes by
404483
* local Scan nodes.
@@ -415,8 +494,11 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
415494
ListCell*lc;
416495

417496
if (!rte->inh)
497+
{
418498
/* Relation is not contain any partitions. */
499+
//add_trivial_distributed_path(root, rel, rti, rte);
419500
return;
501+
}
420502

421503
/* Traverse all possible paths and search for APPEND */
422504
foreach(lc,rel->pathlist)
@@ -427,7 +509,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
427509
ListCell*lc1;
428510
Bitmapset*servers=NULL;
429511
List*subpaths=NIL;
430-
List*append_paths;
512+
List**append_paths;
431513
IndexOptInfo*indexinfo=NULL;
432514

433515
/*
@@ -437,10 +519,10 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
437519
switch (nodeTag(path))
438520
{
439521
caseT_AppendPath:
440-
append_paths= ((AppendPath*)path)->subpaths;
522+
append_paths=&((AppendPath*)path)->subpaths;
441523
break;
442524
caseT_MergeAppendPath:
443-
append_paths= ((MergeAppendPath*)path)->subpaths;
525+
append_paths=&((MergeAppendPath*)path)->subpaths;
444526
break;
445527
default:
446528
elog(FATAL,"Unexpected node type %d, pathtype %d",path->type,
@@ -452,7 +534,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
452534
* at the same node has the only strategy. It is caused by symmetry of
453535
* data placement.
454536
*/
455-
for (lc1=list_head(append_paths);lc1!=NULL;lc1=lnext(lc1))
537+
for (lc1=list_head(*append_paths);lc1!=NULL;lc1=lnext(lc1))
456538
{
457539
Path*subpath= (Path*)lfirst(lc1);
458540

@@ -472,7 +554,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
472554
/*
473555
* Traverse all APPEND subpaths. Form new path list.
474556
*/
475-
foreach(lc1,append_paths)
557+
foreach(lc1,*append_paths)
476558
{
477559
Path*subpath= (Path*)lfirst(lc1);
478560
Path*tmpPath=NULL;

‎contrib/pg_exchange/exchange.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939
#definecstmSubPath1(customPath) (Path *) linitial(((CustomPath *) \
4040
customPath)->custom_paths)
4141

42+
#definecstmSubPlan1(custom) ((Plan *) linitial(((CustomScan *) \
43+
custom)->custom_plans))
44+
4245
typedefenumExchangeMode
4346
{
4447
EXCH_GATHER,

‎contrib/pg_exchange/hooks.c

Lines changed: 76 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,25 @@ reset_cheapest(RelOptInfo *rel)
8888
set_cheapest(rel);
8989
}
9090

91+
staticbool
92+
contain_distributed_paths(List*pathlist)
93+
{
94+
ListCell*lc;
95+
96+
foreach(lc,pathlist)
97+
{
98+
Path*path=lfirst(lc);
99+
100+
if (IsDistExecNode(path))
101+
return true;
102+
}
103+
return false;
104+
}
105+
106+
/*
107+
* TODO: We need routine cost_recalculate() that will be walk across path
108+
* and call cost function at each node from leaf to the root of path tree.
109+
*/
91110
staticList*
92111
create_distributed_join_paths(PlannerInfo*root,RelOptInfo*joinrel,
93112
RelOptInfo*outerrel,RelOptInfo*innerrel,
@@ -99,13 +118,18 @@ create_distributed_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
99118
List*prev_outer_pathlist;
100119
List*prev_join_pathlist;
101120
List*dist_paths=NIL;
121+
booldistributedOuter;
122+
booldistributedInner;
102123

103124
/* Save old pathlists. */
104125
prev_inner_pathlist=innerrel->pathlist;
105126
prev_outer_pathlist=outerrel->pathlist;
106127
prev_join_pathlist=joinrel->pathlist;
107128
innerrel->pathlist=outerrel->pathlist=joinrel->pathlist=NIL;
108129

130+
distributedOuter=contain_distributed_paths(prev_outer_pathlist);
131+
distributedInner=contain_distributed_paths(prev_inner_pathlist);
132+
109133
foreach(lc,prev_inner_pathlist)
110134
{
111135
Path*innpath=lfirst(lc);
@@ -114,20 +138,26 @@ create_distributed_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
114138
ListCell*olc;
115139
ExchangePath*gather;
116140

117-
/* Use only distributed paths */
118-
if (!IsDistExecNode(innpath))
119-
continue;
120-
121-
inner_servers=extractForeignServers((CustomPath*)innpath);
122-
inn_child= (ExchangePath*)cstmSubPath1(innpath);
123-
Assert(inn_child->mode==EXCH_GATHER);
124-
if (IsExchangeNode(inn_child))
125-
inn_child=create_exchange_path(root,innerrel,
126-
cstmSubPath1(inn_child),
127-
inner_mode);
141+
if (IsDistExecNode(innpath))
142+
{
143+
inner_servers=extractForeignServers((CustomPath*)innpath);
144+
inn_child= (ExchangePath*)cstmSubPath1(innpath);
145+
Assert(inn_child->mode==EXCH_GATHER);
146+
if (IsExchangeNode(inn_child))
147+
inn_child=create_exchange_path(root,innerrel,
148+
cstmSubPath1(inn_child),inner_mode);
149+
else
150+
inn_child=create_exchange_path(root,innerrel,
151+
(Path*)inn_child,inner_mode);
152+
}
153+
elseif (!distributedInner&&distributedOuter)
154+
{
155+
/* The case of JOIN partitioned and simple relation */
156+
inn_child=create_exchange_path(root,innerrel,innpath,EXCH_GATHER);
157+
}
128158
else
129-
inn_child=create_exchange_path(root,innerrel, (Path*)inn_child,
130-
inner_mode);
159+
/* Use only distributed paths */
160+
continue;
131161

132162
innerrel->pathlist=lappend(innerrel->pathlist,inn_child);
133163
Assert(list_length(innerrel->pathlist)==1);
@@ -141,20 +171,33 @@ create_distributed_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
141171
boolres;
142172

143173
/* Use only distributed paths */
144-
if (!IsDistExecNode(outpath))
145-
continue;
146-
147-
outer_servers=extractForeignServers((CustomPath*)outpath);
148-
out_child= (ExchangePath*)cstmSubPath1(outpath);
149-
Assert(out_child->mode==EXCH_GATHER);
150-
if (IsExchangeNode(out_child))
151-
out_child=create_exchange_path(root,outerrel,
152-
cstmSubPath1(out_child),
153-
outer_mode);
174+
if (IsDistExecNode(outpath))
175+
{
176+
outer_servers=extractForeignServers((CustomPath*)outpath);
177+
if (!outer_servers&& !outer_servers&&
178+
inner_mode==EXCH_SHUFFLE&&outer_mode==EXCH_SHUFFLE)
179+
continue;
180+
181+
out_child= (ExchangePath*)cstmSubPath1(outpath);
182+
Assert(out_child->mode==EXCH_GATHER);
183+
if (IsExchangeNode(out_child))
184+
out_child=create_exchange_path(root,outerrel,
185+
cstmSubPath1(out_child),outer_mode);
186+
else
187+
out_child=create_exchange_path(root,outerrel,
188+
(Path*)out_child,inner_mode);
189+
if (!distributedInner)
190+
set_exchange_altrel(EXCH_GATHER,inn_child,innerrel,NULL,
191+
NULL,outer_servers);
192+
}
193+
elseif (distributedInner&& !distributedOuter)
194+
{
195+
out_child=create_exchange_path(root,outerrel,outpath,EXCH_GATHER);
196+
set_exchange_altrel(EXCH_GATHER,out_child,outerrel,NULL,
197+
NULL,inner_servers);
198+
}
154199
else
155-
out_child=create_exchange_path(root,outerrel,
156-
(Path*)out_child,
157-
inner_mode);
200+
continue;
158201

159202
if (inner_mode==EXCH_SHUFFLE)
160203
{
@@ -204,16 +247,18 @@ create_distributed_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
204247
{
205248
JoinPath*jp= (JoinPath*)path;
206249

250+
set_exchange_altrel(EXCH_BROADCAST,out_child,NULL,
251+
&((ExchangePath*)cstmSubPath1(out_child))->altrel,
252+
NIL,bms_union(inner_servers,outer_servers));
253+
inn_child->mode=EXCH_STEALTH;
254+
cost_exchange(root,outerrel,out_child);
255+
cost_exchange(root,innerrel,inn_child);
256+
207257
if (jp->innerjoinpath->pathtype!=T_Material)
208258
jp->innerjoinpath= (Path*)create_material_path(innerrel,
209259
jp->innerjoinpath);
210260
Assert(jp->innerjoinpath->pathtype==T_Material);
211-
212-
set_exchange_altrel(EXCH_BROADCAST,out_child,NULL,
213-
&((ExchangePath*)cstmSubPath1(out_child))->altrel,
214-
NIL,bms_union(inner_servers,outer_servers));
215261
cost_exchange(root,joinrel,out_child);
216-
inn_child->mode=EXCH_STEALTH;
217262
}
218263
elseif (inner_mode==EXCH_SHUFFLE)
219264
{

‎contrib/pg_exchange/nodeDistPlanExec.c

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -572,12 +572,26 @@ localize_plan(Plan *node, lcontext *context)
572572
if (IsExchangePlanNode(node))
573573
{
574574
List*private= ((CustomScan*)node)->custom_private;
575-
575+
elog(LOG,"LOCALIZE: exchange");
576576
if (lnext(lnext(list_head(private))))
577577
context->indexinfo= (IndexOptInfo*)lthird(private);
578578
}
579579

580+
context->foreign_scans=NIL;
580581
plan_tree_walker(node,localize_plan,context);
582+
if (context->foreign_scans!=NIL)
583+
{
584+
CustomScan*css= (CustomScan*)node;
585+
//Index scanrelid = ((Scan *) cstmSubPlan1(node))->scanrelid;
586+
587+
Assert(list_length(context->foreign_scans)==1);
588+
css->custom_plans=list_delete_ptr(css->custom_plans,
589+
cstmSubPlan1(node));
590+
css->custom_plans=lappend(css->custom_plans,
591+
make_dummyscan(0));
592+
list_free(context->foreign_scans);
593+
context->foreign_scans=NIL;
594+
}
581595
context->indexinfo=NULL;
582596
break;
583597

@@ -602,6 +616,9 @@ localize_plan(Plan *node, lcontext *context)
602616
*plans=list_delete_ptr(*plans,lfirst(lc));
603617
*plans=lappend(*plans,make_dummyscan(scanrelid));
604618
}
619+
620+
list_free(context->foreign_scans);
621+
context->foreign_scans=NIL;
605622
}
606623
break;
607624

@@ -616,8 +633,8 @@ localize_plan(Plan *node, lcontext *context)
616633
Oidreloid;
617634

618635
reloid=getrelid(scan->scanrelid,context->pstmt->rtable);
619-
rel=heap_open(reloid,NoLock);
620-
if (rel->rd_rel->relkind==RELKIND_FOREIGN_TABLE)
636+
rel=try_relation_open(reloid,NoLock);
637+
if (rel&&rel->rd_rel->relkind==RELKIND_FOREIGN_TABLE)
621638
{
622639
Oidserverid;
623640

@@ -628,6 +645,11 @@ localize_plan(Plan *node, lcontext *context)
628645
relation_close(rel,NoLock);
629646
break;
630647
}
648+
elseif (!rel)
649+
{
650+
context->foreign_scans=lappend(context->foreign_scans,node);
651+
break;
652+
}
631653

632654
/* Need to localize scan */
633655
if (IsA(node,IndexScan)||IsA(node,IndexOnlyScan)||

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp