Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitae0b9d0

Browse files
committed
Use prepared statements for DirectModify.
We keep per-connection hashtable sql->prep statement name for checking whetherwe have prepared it already or not. This is quick-and-dirty solution, because itwill lead to hashtable bloat if custom plans are used -- in this case we preparequeries with substituted param values.
1 parent285ec6d commitae0b9d0

File tree

3 files changed

+158
-50
lines changed

3 files changed

+158
-50
lines changed

‎contrib/postgres_fdw/connection.c‎

Lines changed: 65 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include"access/htup_details.h"
1818
#include"catalog/pg_user_mapping.h"
1919
#include"access/global_snapshot.h"
20+
#include"access/hash.h"
2021
#include"access/xact.h"
2122
#include"access/xtm.h"
2223
#include"access/transam.h"
@@ -33,41 +34,6 @@
3334
#include"utils/memutils.h"
3435
#include"utils/syscache.h"
3536

36-
/*
37-
* Connection cache hash table entry
38-
*
39-
* The lookup key in this hash table is the user mapping OID. We use just one
40-
* connection per user mapping ID, which ensures that all the scans use the
41-
* same snapshot during a query. Using the user mapping OID rather than
42-
* the foreign server OID + user OID avoids creating multiple connections when
43-
* the public user mapping applies to all user OIDs.
44-
*
45-
* The "conn" pointer can be NULL if we don't currently have a live connection.
46-
* When we do have a connection, xact_depth tracks the current depth of
47-
* transactions and subtransactions open on the remote side. We need to issue
48-
* commands at the same nesting depth on the remote as we're executing at
49-
* ourselves, so that rolling back a subtransaction will kill the right
50-
* queries and not the wrong ones.
51-
*/
52-
typedefOidConnCacheKey;
53-
54-
structConnCacheEntry
55-
{
56-
ConnCacheKeykey;/* hash key (must be first) */
57-
PGconn*conn;/* connection to foreign server, or NULL */
58-
WaitEventSet*wait_set;/* for data from server ready notifications */
59-
/* Remaining fields are invalid when conn is NULL: */
60-
intxact_depth;/* 0 = no xact open, 1 = main xact open, 2 =
61-
* one level of subxact open, etc */
62-
boolhave_prep_stmt;/* have we prepared any stmts in this xact? */
63-
boolhave_error;/* have any subxacts aborted in this xact? */
64-
boolchanging_xact_state;/* xact state change in process */
65-
boolinvalidated;/* true if reconnect is pending */
66-
uint32server_hashvalue;/* hash value of foreign server OID */
67-
uint32mapping_hashvalue;/* hash value of user mapping OID */
68-
boolcopy_from_started;/* COPY FROM in progress on this conn */
69-
};
70-
7137
/*
7238
* Connection cache (initialized on first use)
7339
*/
@@ -117,6 +83,35 @@ static bool pgfdw_exec_cleanup_query(ConnCacheEntry *entry, const char *query,
11783
boolignore_errors);
11884
staticboolpgfdw_get_cleanup_result(ConnCacheEntry*entry,TimestampTzendtime,
11985
PGresult**result);
86+
staticvoidcleanup_dm_prepared(ConnCacheEntry*entry);
87+
88+
/* Adapted from string_hash */
89+
staticuint32
90+
char_ptr_hash_fn(constvoid*key,Sizekeysize)
91+
{
92+
char*const*keyptr=key;
93+
returnDatumGetUInt32(hash_any((constunsignedchar*) (*keyptr),strlen(*keyptr)));
94+
}
95+
96+
staticint
97+
char_ptr_match_fn(constvoid*key1,constvoid*key2,Sizekeysize)
98+
{
99+
char*const*keyptr1=key1;
100+
char*const*keyptr2=key2;
101+
returnstrcmp(*keyptr1,*keyptr2);
102+
}
103+
104+
/* Allocate always from top-level, where hashtable lives */
105+
staticvoid*
106+
char_ptr_keycopy_fn(void*dest,constvoid*src,Sizekeysize)
107+
{
108+
char**destptr=dest;
109+
char*const*srcptr=src;
110+
111+
*destptr=MemoryContextStrdup(CacheMemoryContext,*srcptr);
112+
returnNULL;/* not used */
113+
}
114+
120115
/*
121116
* Get a ConnCacheEntry which can be used to execute queries on the remote PostgreSQL
122117
* server with the user's authorization. A new connection is established
@@ -208,6 +203,7 @@ GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
208203
if (entry->conn==NULL)
209204
{
210205
ForeignServer*server=GetForeignServer(user->serverid);
206+
HASHCTLctl;
211207

212208
/* Reset all transient state fields, to be sure all are clean */
213209
entry->xact_depth=0;
@@ -226,6 +222,19 @@ GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
226222
/* Now try to make the connection */
227223
connect_pg_server(entry,server,user);
228224

225+
/* Create hash table of prepared statemetns for DirectModify */
226+
ctl.keysize=sizeof(char*);
227+
ctl.entrysize=sizeof(DirectModifyPrepStmtHashEnt);
228+
ctl.hash=char_ptr_hash_fn;
229+
ctl.match=char_ptr_match_fn;
230+
ctl.keycopy=char_ptr_keycopy_fn;
231+
ctl.hcxt=CacheMemoryContext;
232+
233+
entry->dm_prepared=hash_create("DirectModify prepared stmts",
234+
16,&ctl,
235+
HASH_ELEM |HASH_FUNCTION |HASH_COMPARE |
236+
HASH_KEYCOPY |HASH_CONTEXT);
237+
229238
elog(DEBUG3,"new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
230239
entry->conn,server->servername,user->umid,user->userid);
231240
}
@@ -364,6 +373,8 @@ disconnect_pg_server(ConnCacheEntry *entry)
364373
{
365374
if (entry->conn!=NULL)
366375
{
376+
cleanup_dm_prepared(entry);
377+
hash_destroy(entry->dm_prepared);
367378
Assert(entry->wait_set);
368379
FreeWaitEventSet(entry->wait_set);
369380
entry->wait_set=NULL;
@@ -1021,6 +1032,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
10211032
entry->have_error= false;
10221033
}
10231034

1035+
/* We have deallocated all prepared statements */
1036+
cleanup_dm_prepared(entry);
1037+
10241038
/* Disarm changing_xact_state if it all worked. */
10251039
entry->changing_xact_state=abort_cleanup_failure;
10261040
break;
@@ -1118,11 +1132,27 @@ deallocate_prepared_stmts(ConnCacheEntry *entry)
11181132
{
11191133
res=PQexec(entry->conn,"DEALLOCATE ALL");
11201134
PQclear(res);
1135+
cleanup_dm_prepared(entry);
11211136
}
11221137
entry->have_prep_stmt= false;
11231138
entry->have_error= false;
11241139
}
11251140

1141+
staticvoidcleanup_dm_prepared(ConnCacheEntry*entry)
1142+
{
1143+
HASH_SEQ_STATUSscan;
1144+
DirectModifyPrepStmtHashEnt*prep_stmt_entry;
1145+
1146+
hash_seq_init(&scan,entry->dm_prepared);
1147+
while ((prep_stmt_entry= (DirectModifyPrepStmtHashEnt*)hash_seq_search(&scan)))
1148+
{
1149+
/* save the key to free it */
1150+
char*sql=prep_stmt_entry->sql;
1151+
hash_search(entry->dm_prepared,&prep_stmt_entry->sql,HASH_REMOVE,NULL);
1152+
pfree(sql);
1153+
}
1154+
}
1155+
11261156
/*
11271157
* pgfdw_subxact_callback --- cleanup at subtransaction end.
11281158
*/

‎contrib/postgres_fdw/postgres_fdw.c‎

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2337,7 +2337,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
23372337
* Get connection to the foreign server. Connection manager will
23382338
* establish new connection if necessary.
23392339
*/
2340-
dmstate->conn_entry=GetConnection(user,false);
2340+
dmstate->conn_entry=GetConnection(user,true);
23412341

23422342
/* Initialize state variable */
23432343
dmstate->num_tuples=-1;/* -1 means not set yet */
@@ -3323,6 +3323,8 @@ execute_dml_stmt(ForeignScanState *node)
33233323
ExprContext*econtext=node->ss.ps.ps_ExprContext;
33243324
intnumParams=dmstate->numParams;
33253325
constchar**values=dmstate->param_values;
3326+
DirectModifyPrepStmtHashEnt*prep_stmt_entry;
3327+
boolfound;
33263328

33273329
/*
33283330
* Construct array of query parameter values in text format.
@@ -3334,22 +3336,59 @@ execute_dml_stmt(ForeignScanState *node)
33343336
dmstate->param_exprs,
33353337
values);
33363338

3339+
/*
3340+
* Prepare the statement, if we have never seen it before.
3341+
*/
3342+
prep_stmt_entry=hash_search(entry->dm_prepared,&dmstate->query,
3343+
HASH_FIND,&found);
3344+
if (!found)
3345+
{
3346+
charp_name[NAMEDATALEN];
3347+
longnum=hash_get_num_entries(entry->dm_prepared)+1;
3348+
PGresult*res;
3349+
3350+
snprintf(p_name,NAMEDATALEN,"postgres_fdw:%d:%ld",MyProcPid,num);
3351+
3352+
if (!PQsendPrepare(conn,
3353+
p_name,
3354+
dmstate->query,
3355+
0,
3356+
NULL))
3357+
pgfdw_report_error(ERROR,NULL,conn, false,dmstate->query);
3358+
3359+
/*
3360+
* Get the result, and check for success.
3361+
*
3362+
* We don't use a PG_TRY block here, so be careful not to throw error
3363+
* without releasing the PGresult.
3364+
*/
3365+
res=pgfdw_get_result(entry,dmstate->query);
3366+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
3367+
pgfdw_report_error(ERROR,res,conn, true,dmstate->query);
3368+
PQclear(res);
3369+
3370+
/* Now, when it is successfully prepared, add it to the hashtable */
3371+
prep_stmt_entry=hash_search(entry->dm_prepared,&dmstate->query,
3372+
HASH_ENTER,&found);
3373+
strlcpy(prep_stmt_entry->prep_name,p_name,NAMEDATALEN);
3374+
}
3375+
33373376
/*
33383377
* Notice that we pass NULL for paramTypes, thus forcing the remote server
33393378
* to infer types for all parameters. Since we explicitly cast every
33403379
* parameter (see deparse.c), the "inference" is trivial and will produce
33413380
* the desired result. This allows us to avoid assuming that the remote
33423381
* server has the same OIDs we do for the parameters' types.
33433382
*/
3344-
if (!PQsendQueryParams(conn,dmstate->query,numParams,
3345-
NULL,values,NULL,NULL,0))
3383+
3384+
if (!PQsendQueryPrepared(conn,
3385+
prep_stmt_entry->prep_name,
3386+
numParams,
3387+
values,
3388+
NULL,
3389+
NULL,
3390+
0))
33463391
pgfdw_report_error(ERROR,NULL,conn, false,dmstate->query);
3347-
// }
3348-
// else
3349-
// {
3350-
// if (!PQsendQuery(dmstate->conn, dmstate->query))
3351-
// pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3352-
// }
33533392

33543393
/*
33553394
* Get the result, and check for success.

‎contrib/postgres_fdw/postgres_fdw.h‎

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,11 @@
1717
#include"commands/copy.h"
1818
#include"lib/stringinfo.h"
1919
#include"nodes/relation.h"
20+
#include"storage/latch.h"
2021
#include"utils/relcache.h"
2122

2223
#include"libpq-fe.h"
2324

24-
/*
25-
* Encapsulates connection to foreign server. Contents should be unknown
26-
* outside connection.c
27-
*/
28-
typedefstructConnCacheEntryConnCacheEntry;
29-
3025
/*
3126
* FDW-specific planner information kept in RelOptInfo.fdw_private for a
3227
* postgres_fdw foreign table. For a baserel, this struct is created by
@@ -117,6 +112,50 @@ typedef struct PgFdwRelationInfo
117112
intrelation_index;
118113
}PgFdwRelationInfo;
119114

115+
/*
116+
* Connection cache hash table entry
117+
*
118+
* The lookup key in this hash table is the user mapping OID. We use just one
119+
* connection per user mapping ID, which ensures that all the scans use the
120+
* same snapshot during a query. Using the user mapping OID rather than
121+
* the foreign server OID + user OID avoids creating multiple connections when
122+
* the public user mapping applies to all user OIDs.
123+
*
124+
* The "conn" pointer can be NULL if we don't currently have a live connection.
125+
* When we do have a connection, xact_depth tracks the current depth of
126+
* transactions and subtransactions open on the remote side. We need to issue
127+
* commands at the same nesting depth on the remote as we're executing at
128+
* ourselves, so that rolling back a subtransaction will kill the right
129+
* queries and not the wrong ones.
130+
*/
131+
typedefOidConnCacheKey;
132+
133+
typedefstructConnCacheEntryConnCacheEntry;
134+
structConnCacheEntry
135+
{
136+
ConnCacheKeykey;/* hash key (must be first) */
137+
PGconn*conn;/* connection to foreign server, or NULL */
138+
WaitEventSet*wait_set;/* for data from server ready notifications */
139+
/* Remaining fields are invalid when conn is NULL: */
140+
intxact_depth;/* 0 = no xact open, 1 = main xact open, 2 =
141+
* one level of subxact open, etc */
142+
boolhave_prep_stmt;/* have we prepared any stmts in this xact? */
143+
boolhave_error;/* have any subxacts aborted in this xact? */
144+
boolchanging_xact_state;/* xact state change in process */
145+
boolinvalidated;/* true if reconnect is pending */
146+
uint32server_hashvalue;/* hash value of foreign server OID */
147+
uint32mapping_hashvalue;/* hash value of user mapping OID */
148+
boolcopy_from_started;/* COPY FROM in progress on this conn */
149+
HTAB*dm_prepared;/* prepared statements for DirectModify */
150+
};
151+
152+
/* sql -> prepared statement hashtable */
153+
typedefstructDirectModifyPrepStmtHashEnt
154+
{
155+
char*sql;/* SQL of the statement, the key; arbitrary size */
156+
charprep_name[NAMEDATALEN];/* name of prepared statement */
157+
}DirectModifyPrepStmtHashEnt;
158+
120159
/* in postgres_fdw.c */
121160
externintset_transmission_modes(void);
122161
externvoidreset_transmission_modes(intnestlevel);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp