Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit27e1f14

Browse files
author
Etsuro Fujita
committed
Add support for asynchronous execution.
This implements asynchronous execution, which runs multiple parts of anon-parallel-aware Append concurrently rather than serially to improveperformance when possible. Currently, the only node type that can berun concurrently is a ForeignScan that is an immediate child of such anAppend. In the case where such ForeignScans access data on differentremote servers, this would run those ForeignScans concurrently, andoverlap the remote operations to be performed simultaneously, so it'llimprove the performance especially when the operations involvetime-consuming ones such as remote join and remote aggregation.We may extend this to other node types such as joins or aggregates overForeignScans in the future.This also adds the support for postgres_fdw, which is enabled by thetable-level/server-level option "async_capable". The default is false.Robert Haas, Kyotaro Horiguchi, Thomas Munro, and myself. This commitis mostly based on the patch proposed by Robert Haas, but also usesstuff from the patch proposed by Kyotaro Horiguchi and from the patchproposed by Thomas Munro. Reviewed by Kyotaro Horiguchi, KonstantinKnizhnik, Andrey Lepikhov, Movead Li, Thomas Munro, Justin Pryzby, andothers.Discussion:https://postgr.es/m/CA%2BTgmoaXQEt4tZ03FtQhnzeDEMzBck%2BLrni0UWHVVgOTnA6C1w%40mail.gmail.comDiscussion:https://postgr.es/m/CA%2BhUKGLBRyu0rHrDCMC4%3DRn3252gogyp1SjOgG8SEKKZv%3DFwfQ%40mail.gmail.comDiscussion:https://postgr.es/m/20200228.170650.667613673625155850.horikyota.ntt%40gmail.com
1 parent66392d3 commit27e1f14

39 files changed

+2068
-57
lines changed

‎contrib/postgres_fdw/connection.c

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ typedef struct ConnCacheEntry
6262
Oidserverid;/* foreign server OID used to get server name */
6363
uint32server_hashvalue;/* hash value of foreign server OID */
6464
uint32mapping_hashvalue;/* hash value of user mapping OID */
65+
PgFdwConnStatestate;/* extra per-connection state */
6566
}ConnCacheEntry;
6667

6768
/*
@@ -115,9 +116,12 @@ static bool disconnect_cached_connections(Oid serverid);
115116
* will_prep_stmt must be true if caller intends to create any prepared
116117
* statements. Since those don't go away automatically at transaction end
117118
* (not even on error), we need this flag to cue manual cleanup.
119+
*
120+
* If state is not NULL, *state receives the per-connection state associated
121+
* with the PGconn.
118122
*/
119123
PGconn*
120-
GetConnection(UserMapping*user,boolwill_prep_stmt)
124+
GetConnection(UserMapping*user,boolwill_prep_stmt,PgFdwConnState**state)
121125
{
122126
boolfound;
123127
boolretry= false;
@@ -196,6 +200,9 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
196200
*/
197201
PG_TRY();
198202
{
203+
/* Process a pending asynchronous request if any. */
204+
if (entry->state.pendingAreq)
205+
process_pending_request(entry->state.pendingAreq);
199206
/* Start a new transaction or subtransaction if needed. */
200207
begin_remote_xact(entry);
201208
}
@@ -264,6 +271,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
264271
/* Remember if caller will prepare statements */
265272
entry->have_prep_stmt |=will_prep_stmt;
266273

274+
/* If caller needs access to the per-connection state, return it. */
275+
if (state)
276+
*state=&entry->state;
277+
267278
returnentry->conn;
268279
}
269280

@@ -291,6 +302,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
291302
entry->mapping_hashvalue=
292303
GetSysCacheHashValue1(USERMAPPINGOID,
293304
ObjectIdGetDatum(user->umid));
305+
memset(&entry->state,0,sizeof(entry->state));
294306

295307
/* Now try to make the connection */
296308
entry->conn=connect_pg_server(server,user);
@@ -648,8 +660,12 @@ GetPrepStmtNumber(PGconn *conn)
648660
* Caller is responsible for the error handling on the result.
649661
*/
650662
PGresult*
651-
pgfdw_exec_query(PGconn*conn,constchar*query)
663+
pgfdw_exec_query(PGconn*conn,constchar*query,PgFdwConnState*state)
652664
{
665+
/* First, process a pending asynchronous request, if any. */
666+
if (state&&state->pendingAreq)
667+
process_pending_request(state->pendingAreq);
668+
653669
/*
654670
* Submit a query. Since we don't use non-blocking mode, this also can
655671
* block. But its risk is relatively small, so we ignore that for now.
@@ -940,6 +956,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
940956
{
941957
entry->have_prep_stmt= false;
942958
entry->have_error= false;
959+
/* Also reset per-connection state */
960+
memset(&entry->state,0,sizeof(entry->state));
943961
}
944962

945963
/* Disarm changing_xact_state if it all worked. */
@@ -1172,6 +1190,10 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
11721190
* Cancel the currently-in-progress query (whose query text we do not have)
11731191
* and ignore the result. Returns true if we successfully cancel the query
11741192
* and discard any pending result, and false if not.
1193+
*
1194+
* XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1195+
* query text from the pendingAreq saved in the per-connection state, then
1196+
* report the query using it.
11751197
*/
11761198
staticbool
11771199
pgfdw_cancel_query(PGconn*conn)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp