Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit04e706d

Browse files
author
Etsuro Fujita
committed
postgres_fdw: Add support for parallel commit.
postgres_fdw commits remote (sub)transactions opened on remote server(s)in a local (sub)transaction one by one when the local (sub)transactioncommits. This patch allows it to commit the remote (sub)transactions inparallel to improve performance. This is enabled by the server option"parallel_commit". The default is false.Etsuro Fujita, reviewed by Fujii Masao and David Zhang.Discussion:http://postgr.es/m/CAPmGK17dAZCXvwnfpr1eTfknTGdt%3DhYTV9405Gt5SqPOX8K84w%40mail.gmail.com
1 parentcfb4e20 commit04e706d

File tree

5 files changed

+376
-19
lines changed

5 files changed

+376
-19
lines changed

‎contrib/postgres_fdw/connection.c

Lines changed: 205 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
5858
boolhave_prep_stmt;/* have we prepared any stmts in this xact? */
5959
boolhave_error;/* have any subxacts aborted in this xact? */
6060
boolchanging_xact_state;/* xact state change in process */
61+
boolparallel_commit;/* do we commit (sub)xacts in parallel? */
6162
boolinvalidated;/* true if reconnect is pending */
6263
boolkeep_connections;/* setting value of keep_connections
6364
* server option */
@@ -92,6 +93,9 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
9293
staticvoiddisconnect_pg_server(ConnCacheEntry*entry);
9394
staticvoidcheck_conn_params(constchar**keywords,constchar**values,UserMapping*user);
9495
staticvoidconfigure_remote_session(PGconn*conn);
96+
staticvoiddo_sql_command_begin(PGconn*conn,constchar*sql);
97+
staticvoiddo_sql_command_end(PGconn*conn,constchar*sql,
98+
boolconsume_input);
9599
staticvoidbegin_remote_xact(ConnCacheEntry*entry);
96100
staticvoidpgfdw_xact_callback(XactEventevent,void*arg);
97101
staticvoidpgfdw_subxact_callback(SubXactEventevent,
@@ -100,13 +104,17 @@ static void pgfdw_subxact_callback(SubXactEvent event,
100104
void*arg);
101105
staticvoidpgfdw_inval_callback(Datumarg,intcacheid,uint32hashvalue);
102106
staticvoidpgfdw_reject_incomplete_xact_state_change(ConnCacheEntry*entry);
107+
staticvoidpgfdw_reset_xact_state(ConnCacheEntry*entry,booltoplevel);
103108
staticboolpgfdw_cancel_query(PGconn*conn);
104109
staticboolpgfdw_exec_cleanup_query(PGconn*conn,constchar*query,
105110
boolignore_errors);
106111
staticboolpgfdw_get_cleanup_result(PGconn*conn,TimestampTzendtime,
107112
PGresult**result,bool*timed_out);
108113
staticvoidpgfdw_abort_cleanup(ConnCacheEntry*entry,constchar*sql,
109114
booltoplevel);
115+
staticvoidpgfdw_finish_pre_commit_cleanup(List*pending_entries);
116+
staticvoidpgfdw_finish_pre_subcommit_cleanup(List*pending_entries,
117+
intcurlevel);
110118
staticboolUserMappingPasswordRequired(UserMapping*user);
111119
staticbooldisconnect_cached_connections(Oidserverid);
112120

@@ -316,14 +324,20 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
316324
* is changed will be closed and re-made later.
317325
*
318326
* By default, all the connections to any foreign servers are kept open.
327+
*
328+
* Also determine whether to commit (sub)transactions opened on the remote
329+
* server in parallel at (sub)transaction end.
319330
*/
320331
entry->keep_connections= true;
332+
entry->parallel_commit= false;
321333
foreach(lc,server->options)
322334
{
323335
DefElem*def= (DefElem*)lfirst(lc);
324336

325337
if (strcmp(def->defname,"keep_connections")==0)
326338
entry->keep_connections=defGetBoolean(def);
339+
elseif (strcmp(def->defname,"parallel_commit")==0)
340+
entry->parallel_commit=defGetBoolean(def);
327341
}
328342

329343
/* Now try to make the connection */
@@ -623,10 +637,30 @@ configure_remote_session(PGconn *conn)
623637
void
624638
do_sql_command(PGconn*conn,constchar*sql)
625639
{
626-
PGresult*res;
640+
do_sql_command_begin(conn,sql);
641+
do_sql_command_end(conn,sql, false);
642+
}
627643

644+
staticvoid
645+
do_sql_command_begin(PGconn*conn,constchar*sql)
646+
{
628647
if (!PQsendQuery(conn,sql))
629648
pgfdw_report_error(ERROR,NULL,conn, false,sql);
649+
}
650+
651+
staticvoid
652+
do_sql_command_end(PGconn*conn,constchar*sql,boolconsume_input)
653+
{
654+
PGresult*res;
655+
656+
/*
657+
* If requested, consume whatever data is available from the socket.
658+
* (Note that if all data is available, this allows pgfdw_get_result to
659+
* call PQgetResult without forcing the overhead of WaitLatchOrSocket,
660+
* which would be large compared to the overhead of PQconsumeInput.)
661+
*/
662+
if (consume_input&& !PQconsumeInput(conn))
663+
pgfdw_report_error(ERROR,NULL,conn, false,sql);
630664
res=pgfdw_get_result(conn,sql);
631665
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
632666
pgfdw_report_error(ERROR,res,conn, true,sql);
@@ -888,6 +922,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
888922
{
889923
HASH_SEQ_STATUSscan;
890924
ConnCacheEntry*entry;
925+
List*pending_entries=NIL;
891926

892927
/* Quick exit if no connections were touched in this transaction. */
893928
if (!xact_got_connection)
@@ -925,6 +960,12 @@ pgfdw_xact_callback(XactEvent event, void *arg)
925960

926961
/* Commit all remote transactions during pre-commit */
927962
entry->changing_xact_state= true;
963+
if (entry->parallel_commit)
964+
{
965+
do_sql_command_begin(entry->conn,"COMMIT TRANSACTION");
966+
pending_entries=lappend(pending_entries,entry);
967+
continue;
968+
}
928969
do_sql_command(entry->conn,"COMMIT TRANSACTION");
929970
entry->changing_xact_state= false;
930971

@@ -981,23 +1022,15 @@ pgfdw_xact_callback(XactEvent event, void *arg)
9811022
}
9821023

9831024
/* Reset state to show we're out of a transaction */
984-
entry->xact_depth=0;
1025+
pgfdw_reset_xact_state(entry, true);
1026+
}
9851027

986-
/*
987-
* If the connection isn't in a good idle state, it is marked as
988-
* invalid or keep_connections option of its server is disabled, then
989-
* discard it to recover. Next GetConnection will open a new
990-
* connection.
991-
*/
992-
if (PQstatus(entry->conn)!=CONNECTION_OK||
993-
PQtransactionStatus(entry->conn)!=PQTRANS_IDLE||
994-
entry->changing_xact_state||
995-
entry->invalidated||
996-
!entry->keep_connections)
997-
{
998-
elog(DEBUG3,"discarding connection %p",entry->conn);
999-
disconnect_pg_server(entry);
1000-
}
1028+
/* If there are any pending connections, finish cleaning them up */
1029+
if (pending_entries)
1030+
{
1031+
Assert(event==XACT_EVENT_PARALLEL_PRE_COMMIT||
1032+
event==XACT_EVENT_PRE_COMMIT);
1033+
pgfdw_finish_pre_commit_cleanup(pending_entries);
10011034
}
10021035

10031036
/*
@@ -1021,6 +1054,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
10211054
HASH_SEQ_STATUSscan;
10221055
ConnCacheEntry*entry;
10231056
intcurlevel;
1057+
List*pending_entries=NIL;
10241058

10251059
/* Nothing to do at subxact start, nor after commit. */
10261060
if (!(event==SUBXACT_EVENT_PRE_COMMIT_SUB||
@@ -1063,6 +1097,12 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
10631097
/* Commit all remote subtransactions during pre-commit */
10641098
snprintf(sql,sizeof(sql),"RELEASE SAVEPOINT s%d",curlevel);
10651099
entry->changing_xact_state= true;
1100+
if (entry->parallel_commit)
1101+
{
1102+
do_sql_command_begin(entry->conn,sql);
1103+
pending_entries=lappend(pending_entries,entry);
1104+
continue;
1105+
}
10661106
do_sql_command(entry->conn,sql);
10671107
entry->changing_xact_state= false;
10681108
}
@@ -1076,7 +1116,14 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
10761116
}
10771117

10781118
/* OK, we're outta that level of subtransaction */
1079-
entry->xact_depth--;
1119+
pgfdw_reset_xact_state(entry, false);
1120+
}
1121+
1122+
/* If there are any pending connections, finish cleaning them up */
1123+
if (pending_entries)
1124+
{
1125+
Assert(event==SUBXACT_EVENT_PRE_COMMIT_SUB);
1126+
pgfdw_finish_pre_subcommit_cleanup(pending_entries,curlevel);
10801127
}
10811128
}
10821129

@@ -1169,6 +1216,40 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
11691216
server->servername)));
11701217
}
11711218

1219+
/*
1220+
* Reset state to show we're out of a (sub)transaction.
1221+
*/
1222+
staticvoid
1223+
pgfdw_reset_xact_state(ConnCacheEntry*entry,booltoplevel)
1224+
{
1225+
if (toplevel)
1226+
{
1227+
/* Reset state to show we're out of a transaction */
1228+
entry->xact_depth=0;
1229+
1230+
/*
1231+
* If the connection isn't in a good idle state, it is marked as
1232+
* invalid or keep_connections option of its server is disabled, then
1233+
* discard it to recover. Next GetConnection will open a new
1234+
* connection.
1235+
*/
1236+
if (PQstatus(entry->conn)!=CONNECTION_OK||
1237+
PQtransactionStatus(entry->conn)!=PQTRANS_IDLE||
1238+
entry->changing_xact_state||
1239+
entry->invalidated||
1240+
!entry->keep_connections)
1241+
{
1242+
elog(DEBUG3,"discarding connection %p",entry->conn);
1243+
disconnect_pg_server(entry);
1244+
}
1245+
}
1246+
else
1247+
{
1248+
/* Reset state to show we're out of a subtransaction */
1249+
entry->xact_depth--;
1250+
}
1251+
}
1252+
11721253
/*
11731254
* Cancel the currently-in-progress query (whose query text we do not have)
11741255
* and ignore the result. Returns true if we successfully cancel the query
@@ -1456,6 +1537,112 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel)
14561537
entry->changing_xact_state= false;
14571538
}
14581539

1540+
/*
1541+
* Finish pre-commit cleanup of connections on each of which we've sent a
1542+
* COMMIT command to the remote server.
1543+
*/
1544+
staticvoid
1545+
pgfdw_finish_pre_commit_cleanup(List*pending_entries)
1546+
{
1547+
ConnCacheEntry*entry;
1548+
List*pending_deallocs=NIL;
1549+
ListCell*lc;
1550+
1551+
Assert(pending_entries);
1552+
1553+
/*
1554+
* Get the result of the COMMIT command for each of the pending entries
1555+
*/
1556+
foreach(lc,pending_entries)
1557+
{
1558+
entry= (ConnCacheEntry*)lfirst(lc);
1559+
1560+
Assert(entry->changing_xact_state);
1561+
/*
1562+
* We might already have received the result on the socket, so pass
1563+
* consume_input=true to try to consume it first
1564+
*/
1565+
do_sql_command_end(entry->conn,"COMMIT TRANSACTION", true);
1566+
entry->changing_xact_state= false;
1567+
1568+
/* Do a DEALLOCATE ALL in parallel if needed */
1569+
if (entry->have_prep_stmt&&entry->have_error)
1570+
{
1571+
/* Ignore errors (see notes in pgfdw_xact_callback) */
1572+
if (PQsendQuery(entry->conn,"DEALLOCATE ALL"))
1573+
{
1574+
pending_deallocs=lappend(pending_deallocs,entry);
1575+
continue;
1576+
}
1577+
}
1578+
entry->have_prep_stmt= false;
1579+
entry->have_error= false;
1580+
1581+
pgfdw_reset_xact_state(entry, true);
1582+
}
1583+
1584+
/* No further work if no pending entries */
1585+
if (!pending_deallocs)
1586+
return;
1587+
1588+
/*
1589+
* Get the result of the DEALLOCATE command for each of the pending
1590+
* entries
1591+
*/
1592+
foreach(lc,pending_deallocs)
1593+
{
1594+
PGresult*res;
1595+
1596+
entry= (ConnCacheEntry*)lfirst(lc);
1597+
1598+
/* Ignore errors (see notes in pgfdw_xact_callback) */
1599+
while ((res=PQgetResult(entry->conn))!=NULL)
1600+
{
1601+
PQclear(res);
1602+
/* Stop if the connection is lost (else we'll loop infinitely) */
1603+
if (PQstatus(entry->conn)==CONNECTION_BAD)
1604+
break;
1605+
}
1606+
entry->have_prep_stmt= false;
1607+
entry->have_error= false;
1608+
1609+
pgfdw_reset_xact_state(entry, true);
1610+
}
1611+
}
1612+
1613+
/*
1614+
* Finish pre-subcommit cleanup of connections on each of which we've sent a
1615+
* RELEASE command to the remote server.
1616+
*/
1617+
staticvoid
1618+
pgfdw_finish_pre_subcommit_cleanup(List*pending_entries,intcurlevel)
1619+
{
1620+
ConnCacheEntry*entry;
1621+
charsql[100];
1622+
ListCell*lc;
1623+
1624+
Assert(pending_entries);
1625+
1626+
/*
1627+
* Get the result of the RELEASE command for each of the pending entries
1628+
*/
1629+
snprintf(sql,sizeof(sql),"RELEASE SAVEPOINT s%d",curlevel);
1630+
foreach(lc,pending_entries)
1631+
{
1632+
entry= (ConnCacheEntry*)lfirst(lc);
1633+
1634+
Assert(entry->changing_xact_state);
1635+
/*
1636+
* We might already have received the result on the socket, so pass
1637+
* consume_input=true to try to consume it first
1638+
*/
1639+
do_sql_command_end(entry->conn,sql, true);
1640+
entry->changing_xact_state= false;
1641+
1642+
pgfdw_reset_xact_state(entry, false);
1643+
}
1644+
}
1645+
14591646
/*
14601647
* List active foreign server connections.
14611648
*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp