Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit72ba986

Browse files
committed
JOIN cascades support.
I tested this on the simple queries:1. select * from rt, pt, st WHERE rt.id=pt.id and pt.id=st.id;2. select * from rt, pt, st WHERE rt.id=pt.id and pt.id=st.payload;
1 parent8af4fe0 commit72ba986

File tree

10 files changed

+170
-60
lines changed

10 files changed

+170
-60
lines changed

‎contrib/pg_exchange/exchange.c‎

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
210210
ListCell*lc;
211211

212212
if (!rte->inh)
213-
/*
214-
* Relation is not contain any partitions.
215-
*/
213+
/* Relation is not contain any partitions. */
216214
return;
217215

218216
/* Traverse all possible paths and search for APPEND */
@@ -223,16 +221,31 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
223221
AppendPath*appendPath=NULL;
224222
ListCell*lc1;
225223
Bitmapset*servers=NULL;
226-
List*subpaths=NIL;
224+
List*subpaths=NIL;
225+
List*append_paths;
227226

228-
if (path->pathtype!=T_Append)
229-
continue;
227+
/*
228+
* In the case of partitioned relation all paths will be ended by Append
229+
* or MergeAppend path node.
230+
*/
231+
switch (path->pathtype)
232+
{
233+
caseT_Append:
234+
append_paths= ((AppendPath*)path)->subpaths;
235+
break;
236+
caseT_MergeAppend:
237+
append_paths= ((MergeAppendPath*)path)->subpaths;
238+
break;
239+
default:
240+
elog(FATAL,"Unexpected node type %d, pathtype %d",path->type,
241+
path->pathtype);
242+
}
230243

231244
/*
232245
* Traverse all APPEND subpaths, check for scan-type and search for
233246
* foreign scans
234247
*/
235-
foreach(lc1,((AppendPath*)path)->subpaths)
248+
foreach(lc1,append_paths)
236249
{
237250
Path*subpath= (Path*)lfirst(lc1);
238251
Path*tmpPath;
@@ -279,6 +292,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
279292
((AppendPath*)path)->partitioned_rels,-1);
280293
path= (Path*)create_exchange_path(root,rel, (Path*)appendPath, true);
281294
path=create_distexec_path(root,rel,path,servers);
295+
//elog(LOG, "Path added");
282296
add_path(rel,path);
283297
}
284298
}
@@ -740,6 +754,8 @@ EXCHANGE_Execute(CustomScanState *node)
740754
returnslot;
741755
case1:
742756
state->activeRemotes--;
757+
elog(LOG,"[%s] GOT NULL. activeRemotes: %d",state->stream,
758+
state->activeRemotes);
743759
break;
744760
case2:/* Close EXCHANGE channel */
745761
break;

‎contrib/pg_exchange/hooks.c‎

Lines changed: 103 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
staticset_rel_pathlist_hook_typeprev_set_rel_pathlist_hook=NULL;
2424
staticshmem_startup_hook_typePreviousShmemStartupHook=NULL;
2525
staticset_join_pathlist_hook_typeprev_set_join_pathlist_hook=NULL;
26-
staticvoidsecond_stage_paths(PlannerInfo*root,List*firstStagePaths,RelOptInfo*joinrel,
26+
staticvoidsecond_stage_paths(PlannerInfo*root,RelOptInfo*joinrel,
2727
RelOptInfo*outerrel,RelOptInfo*innerrel,
2828
JoinTypejointype,JoinPathExtraData*extra);
2929

@@ -95,15 +95,56 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
9595
JoinPathExtraData*extra)
9696
{
9797
ListCell*lc;
98-
List*firstStagePaths=NIL;/* Trivial paths, made with exchange */
98+
inti=0;
99+
List*newpaths=NIL;
100+
List*delpaths=NIL;
101+
List*rpaths=NIL;
102+
intdistpaths=0;
99103

100104
if (prev_set_join_pathlist_hook)
101105
prev_set_join_pathlist_hook(root,joinrel,outerrel,innerrel,
102106
jointype,extra);
103107

104108
/*
105-
* At first, traverse all paths and search for the case with Exchanges at
106-
* the left or right subtree. We need to delete DistPlanExec nodes and
109+
* Search for distributed paths for children relations.
110+
*/
111+
foreach(lc,innerrel->pathlist)
112+
{
113+
Path*path=lfirst(lc);
114+
115+
if (IsDistExecNode(path))
116+
distpaths++;
117+
}
118+
119+
if (distpaths==0)
120+
/* Distributed query execution does not needed. */
121+
return;
122+
123+
/*
124+
* Force try to add hash joins into the pathlist. Collect any paths that
125+
* do not satisfy the exchange rules and delete it.
126+
*/
127+
foreach(lc,joinrel->pathlist)
128+
{
129+
Path*path=lfirst(lc);
130+
if ((path->pathtype!=T_HashJoin)&& !IsDistExecNode(path))
131+
rpaths=lappend(rpaths,path);
132+
}
133+
foreach(lc,rpaths)
134+
joinrel->pathlist=list_delete_ptr(joinrel->pathlist,lfirst(lc));
135+
136+
/*
137+
* Try to create hash join path.
138+
* XXX: Here we have a problem of path additions: hash join may be not added
139+
* if it is not cheap. But cost of the this path includes costs of child paths.
140+
* Child paths will be tuned below and costs will be changed too.
141+
*/
142+
hash_inner_and_outer(root,joinrel,outerrel,innerrel,jointype,extra);
143+
Assert(list_length(joinrel->pathlist)>0);
144+
145+
/*
146+
* Traverse all paths and search for the case with EXCHANGE nodes
147+
* at the left or right subtree. We need to delete DistPlanExec nodes and
107148
* insert only one at the head of join.
108149
*/
109150
foreach(lc,joinrel->pathlist)
@@ -115,9 +156,15 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
115156
CustomPath*sub;
116157
Path*path=lfirst(lc);
117158

118-
if ((path->pathtype!=T_NestLoop)&&
119-
(path->pathtype!=T_MergeJoin)&&
120-
(path->pathtype!=T_HashJoin))
159+
/*
160+
* NestLoop and MergeJoin need to change EXCHANGE node logic and
161+
* disabled for now.
162+
* For this we need to introduce remote PUSH or PULL operation for
163+
* force transfer tuples from instance to instance.
164+
*/
165+
Assert(path->pathtype!=T_NestLoop&&path->pathtype!=T_MergeJoin);
166+
167+
if (path->pathtype!=T_HashJoin)
121168
continue;
122169

123170
jp= (JoinPath*)path;
@@ -128,9 +175,7 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
128175
* If inner path contains DistExec node - save its servers list and
129176
* delete it from the path.
130177
*/
131-
if ((inner->pathtype==T_CustomScan)&&
132-
(strcmp(((CustomPath*)inner)->methods->CustomName,
133-
DISTEXECPATHNAME)==0))
178+
if (IsDistExecNode(inner))
134179
{
135180
ListCell*lc;
136181

@@ -145,15 +190,19 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
145190
}
146191
Assert(list_length(sub->custom_paths)==1);
147192
jp->innerjoinpath= (Path*)linitial(sub->custom_paths);
193+
}else
194+
{
195+
elog(LOG,"inner Path can't contain any FE nodes. JT=%d (innt=%d patht=%d), (outt=%d patht=%d) %d",
196+
jp->jointype,inner->type,inner->pathtype,
197+
outer->type,outer->pathtype,i++);
198+
148199
}
149200

150201
/*
151202
* If outer path contains DistExec node - save its servers list and
152203
* delete it from the path.
153204
*/
154-
if ((outer->pathtype==T_CustomScan)&&
155-
(strcmp(((CustomPath*)outer)->methods->CustomName,
156-
DISTEXECPATHNAME)==0))
205+
if (IsDistExecNode(outer))
157206
{
158207
ListCell*lc;
159208

@@ -177,25 +226,29 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
177226
path=create_distexec_path(root,joinrel,
178227
(Path*)copy_join_pathnode(jp),
179228
servers);
180-
add_path(joinrel,path);
229+
newpaths=lappend(newpaths,path);
230+
delpaths=lappend(delpaths,jp);
231+
}
181232

182-
/*
183-
* We need guarantee, that previous JOIN path was deleted. It was
184-
* incorrect.
185-
*/
186-
list_delete_ptr(joinrel->pathlist,jp);
233+
/*
234+
* We need to guarantee, that previous JOIN path was deleted from the path
235+
* list. It was incorrect.
236+
*/
237+
foreach(lc,delpaths)
238+
{
239+
Path*path=lfirst(lc);
240+
joinrel->pathlist=list_delete_ptr(joinrel->pathlist,path);
241+
}
187242

188-
/* Save link to the path for future works. */
189-
firstStagePaths=lappend(firstStagePaths,path);
243+
foreach(lc,newpaths)
244+
{
245+
Path*path=lfirst(lc);
246+
add_path(joinrel,path);
190247
}
191248

192-
second_stage_paths(root,firstStagePaths,joinrel,outerrel,innerrel,jointype,
193-
extra);
249+
second_stage_paths(root,joinrel,outerrel,innerrel,jointype,extra);
194250
}
195251

196-
#defineIsDistExecNode(pathnode) ((pathnode->path.pathtype == T_CustomScan) && \
197-
(strcmp(((CustomPath *)pathnode)->methods->CustomName, DISTEXECPATHNAME) == 0))
198-
199252
staticCustomPath*
200253
duplicate_join_path(CustomPath*distExecPath)
201254
{
@@ -321,10 +374,7 @@ arrange_partitioning_attrs(RelOptInfo *rel1,
321374
part_scheme->partnatts++;
322375
ReleaseSysCache(opclasstup);
323376
}
324-
//elog(INFO, "arrange_partitioning_attrs: ");
325-
//elog(INFO, "->1: %s ", nodeToString(rel1->partexprs[0]));
326-
//elog(INFO, "->2: %s ", nodeToString(rel2->partexprs[0]));
327-
//elog(INFO, "restrictlist: %s", nodeToString(restrictlist));
377+
328378
/* Now we use hash partition only */
329379
Assert((rel1->part_scheme->strategy==PARTITION_STRATEGY_HASH)&&
330380
(rel1->part_scheme->strategy==rel2->part_scheme->strategy));
@@ -394,24 +444,38 @@ arrange_partitions(RelOptInfo *rel1,
394444
* Add Paths same as the case of partitionwise join.
395445
*/
396446
staticvoid
397-
second_stage_paths(PlannerInfo*root,List*firstStagePaths,RelOptInfo*joinrel,RelOptInfo*outerrel,
447+
second_stage_paths(PlannerInfo*root,RelOptInfo*joinrel,RelOptInfo*outerrel,
398448
RelOptInfo*innerrel,JoinTypejointype,JoinPathExtraData*extra)
399449
{
400450
ListCell*lc;
401451

402-
if (list_length(firstStagePaths)==0)
403-
return;
452+
elog(LOG,"List length=%d",list_length(joinrel->pathlist));
453+
foreach(lc,joinrel->pathlist)
454+
{
455+
Path*path=lfirst(lc);
456+
elog(LOG,"[%d] BEFORE SECOND STAGE: type=%d pathtype=%d",
457+
list_length(joinrel->pathlist),path->type,path->pathtype);
458+
if (path->type>300)
459+
Assert(0);
460+
}
404461

405-
foreach(lc,firstStagePaths)
462+
foreach(lc,joinrel->pathlist)
406463
{
407-
CustomPath*path= (CustomPath*)lfirst(lc);
464+
Path*pathhead= (Path*)lfirst(lc);
465+
CustomPath*path;
408466
JoinPath*jp;
409467
ExchangePath*innerex;
410468
ExchangePath*outerex;
411469
ExchangePath*expath;
412470
inti;
413471

414-
Assert(IsDistExecNode(path));
472+
if (!IsDistExecNode(pathhead))
473+
{
474+
elog(LOG,"NO second_stage_paths. type=%d, pathtype=%d",pathhead->type,pathhead->pathtype);
475+
continue;
476+
}
477+
478+
path= (CustomPath*)pathhead;
415479

416480
/*
417481
* Add gather-type EXCHANGE node into the head of the path.
@@ -420,7 +484,9 @@ second_stage_paths(PlannerInfo *root, List *firstStagePaths, RelOptInfo *joinrel
420484
if (!IsA(((Path*)linitial(path->custom_paths)),CustomScan))
421485
{
422486
jp= (JoinPath*)linitial(path->custom_paths);
423-
Assert(jp->path.pathtype==T_HashJoin);
487+
488+
if (jp->path.pathtype!=T_HashJoin)
489+
continue;
424490
expath=create_exchange_path(root,joinrel, (Path*)jp,GATHER_MODE);
425491
path->custom_paths=list_delete(path->custom_paths,jp);
426492
path->custom_paths=lappend(path->custom_paths,expath);
@@ -433,7 +499,6 @@ second_stage_paths(PlannerInfo *root, List *firstStagePaths, RelOptInfo *joinrel
433499
&innerex->altrel,extra->restrictlist,jointype))
434500
{
435501
/* Simple case like foreign-push-join case. */
436-
//elog(INFO, "--- MAKE SIMPLE PATH ---");
437502
innerex->mode=STEALTH_MODE;
438503
outerex->mode=STEALTH_MODE;
439504
}
@@ -442,7 +507,6 @@ second_stage_paths(PlannerInfo *root, List *firstStagePaths, RelOptInfo *joinrel
442507
CustomPath*newpath;
443508
boolres;
444509

445-
//elog(INFO, "--- MAKE SHUFFLE PATH ---");
446510
/* Get a copy of the simple path */
447511
newpath=duplicate_join_path(path);
448512
set_path_pointers(newpath,&jp,&expath,&outerex,&innerex);

‎contrib/pg_exchange/nodeDistPlanExec.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ EstablishDMQConnections(const lcontext *context, const char *serverName, PlanSta
238238
sprintf(connstr,"host=%s port=%d "
239239
"fallback_application_name=%s",
240240
host,port,senderName);
241-
elog(LOG,"Add destination: senderName=%s, receiverName=%s, connstr=%s",senderName,receiverName,connstr);
241+
//elog(LOG, "Add destination: senderName=%s, receiverName=%s, connstr=%s", senderName, receiverName, connstr);
242242
sub->dest_id=dmq_destination_add(connstr,senderName,receiverName,10);
243243
memcpy(sub->node,receiverName,strlen(receiverName)+1);
244244
}
@@ -337,7 +337,7 @@ ExecEndDistPlanExec(CustomScanState *node)
337337
PGresult*result;
338338

339339
while ((result=PQgetResult(dpe->conn[i]))!=NULL);
340-
elog(LOG,"ExecEndDistPlanExec: %d",PQresultStatus(result));
340+
//elog(LOG, "ExecEndDistPlanExec: %d", PQresultStatus(result));
341341
}
342342
if (dpe->conn)
343343
pfree(dpe->conn);

‎contrib/pg_exchange/nodeDistPlanExec.h‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ typedef struct
2828
externchardestsName[10];
2929
#defineDISTEXECPATHNAME"DistExecPath"
3030

31+
#defineIsDistExecNode(pathnode) ((((Path *) pathnode)->pathtype == T_CustomScan) && \
32+
(strcmp(((CustomPath *)pathnode)->methods->CustomName, DISTEXECPATHNAME) == 0))
33+
3134
externvoidDistExec_Init_methods(void);
3235
externCustomScan*make_distplanexec(List*custom_plans,List*tlist,List*private_data);
3336
externPath*create_distexec_path(PlannerInfo*root,RelOptInfo*rel,

‎contrib/pg_execplan/tests/init_node0.sql‎

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ CREATE TABLE rt_0 PARTITION OF rt FOR VALUES WITH (modulus 3, remainder 0);
2525
CREATE FOREIGN TABLE rt_1 PARTITION OF rt FORVALUES WITH (modulus3, remainder1) SERVER remote1;
2626
CREATE FOREIGN TABLE rt_2 PARTITION OF rt FORVALUES WITH (modulus3, remainder2) SERVER remote2;
2727

28+
DROPTABLE IF EXISTS st cascade;
29+
CREATETABLEst (
30+
idintegernot null,
31+
payloadinteger,
32+
testinteger
33+
) PARTITION BY hash (id);
34+
35+
CREATETABLEst_0 PARTITION OF st FORVALUES WITH (modulus3, remainder0);
36+
CREATE FOREIGN TABLE st_1 PARTITION OF st FORVALUES WITH (modulus3, remainder1) SERVER remote1;
37+
CREATE FOREIGN TABLE st_2 PARTITION OF st FORVALUES WITH (modulus3, remainder2) SERVER remote2;
38+
2839

2940
-- For local tests
3041
CREATETABLEa (

‎contrib/pg_execplan/tests/init_node1.sql‎

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,14 @@ CREATE TABLE rt (
2424
CREATE FOREIGN TABLE rt_0 PARTITION OF rt FORVALUES WITH (modulus3, remainder0) SERVER remote1;
2525
CREATETABLErt_1 PARTITION OF rt FORVALUES WITH (modulus3, remainder1);
2626
CREATE FOREIGN TABLE rt_2 PARTITION OF rt FORVALUES WITH (modulus3, remainder2) SERVER remote2;
27+
28+
DROPTABLE IF EXISTS st cascade;
29+
CREATETABLEst (
30+
idintegernot null,
31+
payloadinteger,
32+
testinteger
33+
) PARTITION BY hash (id);
34+
35+
CREATE FOREIGN TABLE st_0 PARTITION OF st FORVALUES WITH (modulus3, remainder0) SERVER remote1;
36+
CREATETABLEst_1 PARTITION OF st FORVALUES WITH (modulus3, remainder1);
37+
CREATE FOREIGN TABLE st_2 PARTITION OF st FORVALUES WITH (modulus3, remainder2) SERVER remote2;

‎contrib/pg_execplan/tests/init_node2.sql‎

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,14 @@ CREATE TABLE rt (
2424
CREATE FOREIGN TABLE rt_0 PARTITION OF rt FORVALUES WITH (modulus3, remainder0) SERVER remote1;
2525
CREATE FOREIGN TABLE rt_1 PARTITION OF rt FORVALUES WITH (modulus3, remainder1) SERVER remote2;
2626
CREATETABLErt_2 PARTITION OF rt FORVALUES WITH (modulus3, remainder2);
27+
28+
DROPTABLE IF EXISTS st cascade;
29+
CREATETABLEst (
30+
idintegernot null,
31+
payloadinteger,
32+
testinteger
33+
) PARTITION BY hash (id);
34+
35+
CREATE FOREIGN TABLE st_0 PARTITION OF st FORVALUES WITH (modulus3, remainder0) SERVER remote1;
36+
CREATE FOREIGN TABLE st_1 PARTITION OF st FORVALUES WITH (modulus3, remainder1) SERVER remote2;
37+
CREATETABLEst_2 PARTITION OF st FORVALUES WITH (modulus3, remainder2);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp