Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc2bb02b

Browse files
author
Etsuro Fujita
committed
Allow asynchronous execution in more cases.
In commit27e1f14, create_append_plan() only allowed the subplancreated from a given subpath to be executed asynchronously when it wasan async-capable ForeignPath. To extend coverage, this patch handlescases when the given subpath includes some other Path types as well thatcan be omitted in the plan processing, such as a ProjectionPath directlyatop an async-capable ForeignPath, allowing asynchronous execution inpartitioned-scan/partitioned-join queries with non-Var tlist expressionsand more UNION queries.Andrey Lepikhov and Etsuro Fujita, reviewed by Alexander Pyhalov andZhihong Yu.Discussion:https://postgr.es/m/659c37a8-3e71-0ff2-394c-f04428c76f08%40postgrespro.ru
1 parent376dc43 commitc2bb02b

File tree

9 files changed

+287
-15
lines changed

9 files changed

+287
-15
lines changed

‎contrib/postgres_fdw/expected/postgres_fdw.out

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10221,6 +10221,31 @@ SELECT * FROM result_tbl ORDER BY a;
1022110221
2505 | 505 | 0505
1022210222
(2 rows)
1022310223

10224+
DELETE FROM result_tbl;
10225+
EXPLAIN (VERBOSE, COSTS OFF)
10226+
INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
10227+
QUERY PLAN
10228+
---------------------------------------------------------------------------------
10229+
Insert on public.result_tbl
10230+
-> Append
10231+
-> Async Foreign Scan on public.async_p1 async_pt_1
10232+
Output: async_pt_1.a, async_pt_1.b, ('AAA'::text || async_pt_1.c)
10233+
Filter: (async_pt_1.b === 505)
10234+
Remote SQL: SELECT a, b, c FROM public.base_tbl1
10235+
-> Async Foreign Scan on public.async_p2 async_pt_2
10236+
Output: async_pt_2.a, async_pt_2.b, ('AAA'::text || async_pt_2.c)
10237+
Filter: (async_pt_2.b === 505)
10238+
Remote SQL: SELECT a, b, c FROM public.base_tbl2
10239+
(10 rows)
10240+
10241+
INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
10242+
SELECT * FROM result_tbl ORDER BY a;
10243+
a | b | c
10244+
------+-----+---------
10245+
1505 | 505 | AAA0505
10246+
2505 | 505 | AAA0505
10247+
(2 rows)
10248+
1022410249
DELETE FROM result_tbl;
1022510250
-- Check case where multiple partitions use the same connection
1022610251
CREATE TABLE base_tbl3 (a int, b int, c text);
@@ -10358,6 +10383,69 @@ SELECT * FROM join_tbl ORDER BY a1;
1035810383
3900 | 900 | 0900 | 3900 | 900 | 0900
1035910384
(30 rows)
1036010385

10386+
DELETE FROM join_tbl;
10387+
EXPLAIN (VERBOSE, COSTS OFF)
10388+
INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
10389+
QUERY PLAN
10390+
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
10391+
Insert on public.join_tbl
10392+
-> Append
10393+
-> Async Foreign Scan
10394+
Output: t1_1.a, t1_1.b, ('AAA'::text || t1_1.c), t2_1.a, t2_1.b, ('AAA'::text || t2_1.c)
10395+
Relations: (public.async_p1 t1_1) INNER JOIN (public.async_p1 t2_1)
10396+
Remote SQL: SELECT r5.a, r5.b, r5.c, r8.a, r8.b, r8.c FROM (public.base_tbl1 r5 INNER JOIN public.base_tbl1 r8 ON (((r5.a = r8.a)) AND ((r5.b = r8.b)) AND (((r5.b % 100) = 0))))
10397+
-> Async Foreign Scan
10398+
Output: t1_2.a, t1_2.b, ('AAA'::text || t1_2.c), t2_2.a, t2_2.b, ('AAA'::text || t2_2.c)
10399+
Relations: (public.async_p2 t1_2) INNER JOIN (public.async_p2 t2_2)
10400+
Remote SQL: SELECT r6.a, r6.b, r6.c, r9.a, r9.b, r9.c FROM (public.base_tbl2 r6 INNER JOIN public.base_tbl2 r9 ON (((r6.a = r9.a)) AND ((r6.b = r9.b)) AND (((r6.b % 100) = 0))))
10401+
-> Hash Join
10402+
Output: t1_3.a, t1_3.b, ('AAA'::text || t1_3.c), t2_3.a, t2_3.b, ('AAA'::text || t2_3.c)
10403+
Hash Cond: ((t2_3.a = t1_3.a) AND (t2_3.b = t1_3.b))
10404+
-> Seq Scan on public.async_p3 t2_3
10405+
Output: t2_3.a, t2_3.b, t2_3.c
10406+
-> Hash
10407+
Output: t1_3.a, t1_3.b, t1_3.c
10408+
-> Seq Scan on public.async_p3 t1_3
10409+
Output: t1_3.a, t1_3.b, t1_3.c
10410+
Filter: ((t1_3.b % 100) = 0)
10411+
(20 rows)
10412+
10413+
INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
10414+
SELECT * FROM join_tbl ORDER BY a1;
10415+
a1 | b1 | c1 | a2 | b2 | c2
10416+
------+-----+---------+------+-----+---------
10417+
1000 | 0 | AAA0000 | 1000 | 0 | AAA0000
10418+
1100 | 100 | AAA0100 | 1100 | 100 | AAA0100
10419+
1200 | 200 | AAA0200 | 1200 | 200 | AAA0200
10420+
1300 | 300 | AAA0300 | 1300 | 300 | AAA0300
10421+
1400 | 400 | AAA0400 | 1400 | 400 | AAA0400
10422+
1500 | 500 | AAA0500 | 1500 | 500 | AAA0500
10423+
1600 | 600 | AAA0600 | 1600 | 600 | AAA0600
10424+
1700 | 700 | AAA0700 | 1700 | 700 | AAA0700
10425+
1800 | 800 | AAA0800 | 1800 | 800 | AAA0800
10426+
1900 | 900 | AAA0900 | 1900 | 900 | AAA0900
10427+
2000 | 0 | AAA0000 | 2000 | 0 | AAA0000
10428+
2100 | 100 | AAA0100 | 2100 | 100 | AAA0100
10429+
2200 | 200 | AAA0200 | 2200 | 200 | AAA0200
10430+
2300 | 300 | AAA0300 | 2300 | 300 | AAA0300
10431+
2400 | 400 | AAA0400 | 2400 | 400 | AAA0400
10432+
2500 | 500 | AAA0500 | 2500 | 500 | AAA0500
10433+
2600 | 600 | AAA0600 | 2600 | 600 | AAA0600
10434+
2700 | 700 | AAA0700 | 2700 | 700 | AAA0700
10435+
2800 | 800 | AAA0800 | 2800 | 800 | AAA0800
10436+
2900 | 900 | AAA0900 | 2900 | 900 | AAA0900
10437+
3000 | 0 | AAA0000 | 3000 | 0 | AAA0000
10438+
3100 | 100 | AAA0100 | 3100 | 100 | AAA0100
10439+
3200 | 200 | AAA0200 | 3200 | 200 | AAA0200
10440+
3300 | 300 | AAA0300 | 3300 | 300 | AAA0300
10441+
3400 | 400 | AAA0400 | 3400 | 400 | AAA0400
10442+
3500 | 500 | AAA0500 | 3500 | 500 | AAA0500
10443+
3600 | 600 | AAA0600 | 3600 | 600 | AAA0600
10444+
3700 | 700 | AAA0700 | 3700 | 700 | AAA0700
10445+
3800 | 800 | AAA0800 | 3800 | 800 | AAA0800
10446+
3900 | 900 | AAA0900 | 3900 | 900 | AAA0900
10447+
(30 rows)
10448+
1036110449
DELETE FROM join_tbl;
1036210450
RESET enable_partitionwise_join;
1036310451
-- Test rescan of an async Append node with do_exec_prune=false
@@ -10536,6 +10624,88 @@ DROP TABLE local_tbl;
1053610624
DROP INDEX base_tbl1_idx;
1053710625
DROP INDEX base_tbl2_idx;
1053810626
DROP INDEX async_p3_idx;
10627+
-- UNION queries
10628+
EXPLAIN (VERBOSE, COSTS OFF)
10629+
INSERT INTO result_tbl
10630+
(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
10631+
UNION
10632+
(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
10633+
QUERY PLAN
10634+
-----------------------------------------------------------------------------------------------------------------
10635+
Insert on public.result_tbl
10636+
-> HashAggregate
10637+
Output: async_p1.a, async_p1.b, (('AAA'::text || async_p1.c))
10638+
Group Key: async_p1.a, async_p1.b, (('AAA'::text || async_p1.c))
10639+
-> Append
10640+
-> Async Foreign Scan on public.async_p1
10641+
Output: async_p1.a, async_p1.b, ('AAA'::text || async_p1.c)
10642+
Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY a ASC NULLS LAST LIMIT 10::bigint
10643+
-> Async Foreign Scan on public.async_p2
10644+
Output: async_p2.a, async_p2.b, ('AAA'::text || async_p2.c)
10645+
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((b < 10))
10646+
(11 rows)
10647+
10648+
INSERT INTO result_tbl
10649+
(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
10650+
UNION
10651+
(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
10652+
SELECT * FROM result_tbl ORDER BY a;
10653+
a | b | c
10654+
------+----+---------
10655+
1000 | 0 | AAA0000
10656+
1005 | 5 | AAA0005
10657+
1010 | 10 | AAA0010
10658+
1015 | 15 | AAA0015
10659+
1020 | 20 | AAA0020
10660+
1025 | 25 | AAA0025
10661+
1030 | 30 | AAA0030
10662+
1035 | 35 | AAA0035
10663+
1040 | 40 | AAA0040
10664+
1045 | 45 | AAA0045
10665+
2000 | 0 | AAA0000
10666+
2005 | 5 | AAA0005
10667+
(12 rows)
10668+
10669+
DELETE FROM result_tbl;
10670+
EXPLAIN (VERBOSE, COSTS OFF)
10671+
INSERT INTO result_tbl
10672+
(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
10673+
UNION ALL
10674+
(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
10675+
QUERY PLAN
10676+
-----------------------------------------------------------------------------------------------------------
10677+
Insert on public.result_tbl
10678+
-> Append
10679+
-> Async Foreign Scan on public.async_p1
10680+
Output: async_p1.a, async_p1.b, ('AAA'::text || async_p1.c)
10681+
Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY a ASC NULLS LAST LIMIT 10::bigint
10682+
-> Async Foreign Scan on public.async_p2
10683+
Output: async_p2.a, async_p2.b, ('AAA'::text || async_p2.c)
10684+
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((b < 10))
10685+
(8 rows)
10686+
10687+
INSERT INTO result_tbl
10688+
(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
10689+
UNION ALL
10690+
(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
10691+
SELECT * FROM result_tbl ORDER BY a;
10692+
a | b | c
10693+
------+----+---------
10694+
1000 | 0 | AAA0000
10695+
1005 | 5 | AAA0005
10696+
1010 | 10 | AAA0010
10697+
1015 | 15 | AAA0015
10698+
1020 | 20 | AAA0020
10699+
1025 | 25 | AAA0025
10700+
1030 | 30 | AAA0030
10701+
1035 | 35 | AAA0035
10702+
1040 | 40 | AAA0040
10703+
1045 | 45 | AAA0045
10704+
2000 | 0 | AAA0000
10705+
2005 | 5 | AAA0005
10706+
(12 rows)
10707+
10708+
DELETE FROM result_tbl;
1053910709
-- Test that pending requests are processed properly
1054010710
SET enable_mergejoin TO false;
1054110711
SET enable_hashjoin TO false;

‎contrib/postgres_fdw/sql/postgres_fdw.sql

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3245,6 +3245,13 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
32453245
SELECT*FROM result_tblORDER BY a;
32463246
DELETEFROM result_tbl;
32473247

3248+
EXPLAIN (VERBOSE, COSTS OFF)
3249+
INSERT INTO result_tblSELECT a, b,'AAA'|| cFROM async_ptWHERE b===505;
3250+
INSERT INTO result_tblSELECT a, b,'AAA'|| cFROM async_ptWHERE b===505;
3251+
3252+
SELECT*FROM result_tblORDER BY a;
3253+
DELETEFROM result_tbl;
3254+
32483255
-- Check case where multiple partitions use the same connection
32493256
CREATETABLEbase_tbl3 (aint, bint, ctext);
32503257
CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FORVALUESFROM (3000) TO (4000)
@@ -3286,6 +3293,13 @@ INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AN
32863293
SELECT*FROM join_tblORDER BY a1;
32873294
DELETEFROM join_tbl;
32883295

3296+
EXPLAIN (VERBOSE, COSTS OFF)
3297+
INSERT INTO join_tblSELECTt1.a,t1.b,'AAA'||t1.c,t2.a,t2.b,'AAA'||t2.cFROM async_pt t1, async_pt t2WHEREt1.a=t2.aANDt1.b=t2.bANDt1.b %100=0;
3298+
INSERT INTO join_tblSELECTt1.a,t1.b,'AAA'||t1.c,t2.a,t2.b,'AAA'||t2.cFROM async_pt t1, async_pt t2WHEREt1.a=t2.aANDt1.b=t2.bANDt1.b %100=0;
3299+
3300+
SELECT*FROM join_tblORDER BY a1;
3301+
DELETEFROM join_tbl;
3302+
32893303
RESET enable_partitionwise_join;
32903304

32913305
-- Test rescan of an async Append node with do_exec_prune=false
@@ -3357,6 +3371,33 @@ DROP INDEX base_tbl1_idx;
33573371
DROPINDEX base_tbl2_idx;
33583372
DROPINDEX async_p3_idx;
33593373

3374+
-- UNION queries
3375+
EXPLAIN (VERBOSE, COSTS OFF)
3376+
INSERT INTO result_tbl
3377+
(SELECT a, b,'AAA'|| cFROM async_p1ORDER BY aLIMIT10)
3378+
UNION
3379+
(SELECT a, b,'AAA'|| cFROM async_p2WHERE b<10);
3380+
INSERT INTO result_tbl
3381+
(SELECT a, b,'AAA'|| cFROM async_p1ORDER BY aLIMIT10)
3382+
UNION
3383+
(SELECT a, b,'AAA'|| cFROM async_p2WHERE b<10);
3384+
3385+
SELECT*FROM result_tblORDER BY a;
3386+
DELETEFROM result_tbl;
3387+
3388+
EXPLAIN (VERBOSE, COSTS OFF)
3389+
INSERT INTO result_tbl
3390+
(SELECT a, b,'AAA'|| cFROM async_p1ORDER BY aLIMIT10)
3391+
UNION ALL
3392+
(SELECT a, b,'AAA'|| cFROM async_p2WHERE b<10);
3393+
INSERT INTO result_tbl
3394+
(SELECT a, b,'AAA'|| cFROM async_p1ORDER BY aLIMIT10)
3395+
UNION ALL
3396+
(SELECT a, b,'AAA'|| cFROM async_p2WHERE b<10);
3397+
3398+
SELECT*FROM result_tblORDER BY a;
3399+
DELETEFROM result_tbl;
3400+
33603401
-- Test that pending requests are processed properly
33613402
SET enable_mergejoin TO false;
33623403
SET enable_hashjoin TO false;

‎src/backend/nodes/copyfuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,7 @@ _copySubqueryScan(const SubqueryScan *from)
632632
* copy remainder of node
633633
*/
634634
COPY_NODE_FIELD(subplan);
635+
COPY_SCALAR_FIELD(scanstatus);
635636

636637
returnnewnode;
637638
}

‎src/backend/nodes/outfuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,7 @@ _outSubqueryScan(StringInfo str, const SubqueryScan *node)
638638
_outScanInfo(str, (constScan*)node);
639639

640640
WRITE_NODE_FIELD(subplan);
641+
WRITE_ENUM_FIELD(scanstatus,SubqueryScanStatus);
641642
}
642643

643644
staticvoid

‎src/backend/nodes/readfuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2194,6 +2194,7 @@ _readSubqueryScan(void)
21942194
ReadCommonScan(&local_node->scan);
21952195

21962196
READ_NODE_FIELD(subplan);
2197+
READ_ENUM_FIELD(scanstatus,SubqueryScanStatus);
21972198

21982199
READ_DONE();
21992200
}

‎src/backend/optimizer/plan/createplan.c

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ static List *get_gating_quals(PlannerInfo *root, List *quals);
8282
staticPlan*create_gating_plan(PlannerInfo*root,Path*path,Plan*plan,
8383
List*gating_quals);
8484
staticPlan*create_join_plan(PlannerInfo*root,JoinPath*best_path);
85-
staticboolis_async_capable_path(Path*path);
85+
staticboolmark_async_capable_plan(Plan*plan,Path*path);
8686
staticPlan*create_append_plan(PlannerInfo*root,AppendPath*best_path,
8787
intflags);
8888
staticPlan*create_merge_append_plan(PlannerInfo*root,MergeAppendPath*best_path,
@@ -1110,28 +1110,58 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path)
11101110
}
11111111

11121112
/*
1113-
* is_async_capable_path
1114-
*Check whether a given Path node is async-capable.
1113+
* mark_async_capable_plan
1114+
*Check whether a given Path node is async-capable, and if so, mark the
1115+
*Plan node created from it as such and return true, otherwise return
1116+
*false.
11151117
*/
11161118
staticbool
1117-
is_async_capable_path(Path*path)
1119+
mark_async_capable_plan(Plan*plan,Path*path)
11181120
{
11191121
switch (nodeTag(path))
11201122
{
1123+
caseT_SubqueryScanPath:
1124+
{
1125+
SubqueryScan*scan_plan= (SubqueryScan*)plan;
1126+
1127+
/*
1128+
* If a SubqueryScan node atop of an async-capable plan node
1129+
* is deletable, consider it as async-capable.
1130+
*/
1131+
if (trivial_subqueryscan(scan_plan)&&
1132+
mark_async_capable_plan(scan_plan->subplan,
1133+
((SubqueryScanPath*)path)->subpath))
1134+
break;
1135+
return false;
1136+
}
11211137
caseT_ForeignPath:
11221138
{
11231139
FdwRoutine*fdwroutine=path->parent->fdwroutine;
11241140

11251141
Assert(fdwroutine!=NULL);
11261142
if (fdwroutine->IsForeignPathAsyncCapable!=NULL&&
11271143
fdwroutine->IsForeignPathAsyncCapable((ForeignPath*)path))
1128-
return true;
1144+
break;
1145+
return false;
11291146
}
1130-
break;
1147+
caseT_ProjectionPath:
1148+
1149+
/*
1150+
* If the generated plan node doesn't include a Result node,
1151+
* consider it as async-capable if the subpath is async-capable.
1152+
*/
1153+
if (!IsA(plan,Result)&&
1154+
mark_async_capable_plan(plan,
1155+
((ProjectionPath*)path)->subpath))
1156+
return true;
1157+
return false;
11311158
default:
1132-
break;
1159+
return false;
11331160
}
1134-
return false;
1161+
1162+
plan->async_capable= true;
1163+
1164+
return true;
11351165
}
11361166

11371167
/*
@@ -1294,14 +1324,14 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
12941324
}
12951325
}
12961326

1297-
subplans=lappend(subplans,subplan);
1298-
1299-
/* Check to see if subplan can be executed asynchronously */
1300-
if (consider_async&&is_async_capable_path(subpath))
1327+
/* If needed, check to see if subplan can be executed asynchronously */
1328+
if (consider_async&&mark_async_capable_plan(subplan,subpath))
13011329
{
1302-
subplan->async_capable= true;
1330+
Assert(subplan->async_capable);
13031331
++nasyncplans;
13041332
}
1333+
1334+
subplans=lappend(subplans,subplan);
13051335
}
13061336

13071337
/*
@@ -5598,6 +5628,7 @@ make_subqueryscan(List *qptlist,
55985628
plan->righttree=NULL;
55995629
node->scan.scanrelid=scanrelid;
56005630
node->subplan=subplan;
5631+
node->scanstatus=SUBQUERY_SCAN_UNKNOWN;
56015632

56025633
returnnode;
56035634
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp