Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4596c34

Browse files
committed
Optimized version of postgres_fdw: use 2pc only when needed
1 parent0e1ff43 commit4596c34

File tree

9 files changed

+1462
-37
lines changed

9 files changed

+1462
-37
lines changed

‎contrib/postgres_fdw/connection.c

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ static bool xact_got_connection = false;
6767
typedeflong longcsn_t;
6868
staticcsn_tcurrentGlobalTransactionId=0;
6969
staticintcurrentLocalTransactionId=0;
70+
staticPGconn*currentConnection=NULL;
71+
7072

7173
/* prototypes of private functions */
7274
staticPGconn*connect_pg_server(ForeignServer*server,UserMapping*user);
@@ -406,6 +408,8 @@ static void
406408
begin_remote_xact(ConnCacheEntry*entry)
407409
{
408410
intcurlevel=GetCurrentTransactionNestLevel();
411+
PGresult*res;
412+
409413

410414
/* Start main transaction if we haven't yet */
411415
if (entry->xact_depth <=0)
@@ -419,8 +423,6 @@ begin_remote_xact(ConnCacheEntry *entry)
419423
if (TransactionIdIsValid(gxid))
420424
{
421425
charstmt[64];
422-
PGresult*res;
423-
424426
snprintf(stmt,sizeof(stmt),"select public.dtm_join_transaction(%d)",gxid);
425427
res=PQexec(entry->conn,stmt);
426428
PQclear(res);
@@ -434,26 +436,30 @@ begin_remote_xact(ConnCacheEntry *entry)
434436
entry->xact_depth=1;
435437
if (UseTsDtmTransactions)
436438
{
437-
if (!currentGlobalTransactionId)
439+
if (currentConnection==NULL)
438440
{
439-
PGresult*res=PQexec(entry->conn,psprintf("SELECT public.dtm_extend('%d.%d')",
440-
MyProcPid,++currentLocalTransactionId));
441-
char*resp;
442-
443-
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
444-
{
445-
pgfdw_report_error(ERROR,res,entry->conn, true,sql);
446-
}
447-
resp=PQgetvalue(res,0,0);
448-
if (resp==NULL|| (*resp)=='\0'||sscanf(resp,"%lld",&currentGlobalTransactionId)!=1)
449-
{
450-
pgfdw_report_error(ERROR,res,entry->conn, true,sql);
451-
}
452-
PQclear(res);
453-
}
454-
else
441+
currentConnection=entry->conn;
442+
}
443+
elseif (entry->conn!=currentConnection)
455444
{
456-
PGresult*res=PQexec(entry->conn,psprintf("SELECT public.dtm_access(%llu, '%d.%d')",currentGlobalTransactionId,MyProcPid,currentLocalTransactionId));
445+
if (!currentGlobalTransactionId)
446+
{
447+
char*resp;
448+
res=PQexec(currentConnection,psprintf("SELECT public.dtm_extend('%d.%d')",
449+
MyProcPid,++currentLocalTransactionId));
450+
451+
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
452+
{
453+
pgfdw_report_error(ERROR,res,currentConnection, true,sql);
454+
}
455+
resp=PQgetvalue(res,0,0);
456+
if (resp==NULL|| (*resp)=='\0'||sscanf(resp,"%lld",&currentGlobalTransactionId)!=1)
457+
{
458+
pgfdw_report_error(ERROR,res,currentConnection, true,sql);
459+
}
460+
PQclear(res);
461+
}
462+
res=PQexec(entry->conn,psprintf("SELECT public.dtm_access(%llu, '%d.%d')",currentGlobalTransactionId,MyProcPid,currentLocalTransactionId));
457463

458464
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
459465
{
@@ -954,6 +960,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
954960
cursor_number=0;
955961

956962
currentGlobalTransactionId=0;
963+
currentConnection=NULL;
957964
}
958965
}
959966

‎contrib/postgres_fdw/postgres_fdw.c

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3179,22 +3179,29 @@ execute_dml_stmt(ForeignScanState *node)
31793179
/*
31803180
* Construct array of query parameter values in text format.
31813181
*/
3182-
if (numParams>0)
3182+
if (numParams>0)
3183+
{
31833184
process_query_params(econtext,
31843185
dmstate->param_flinfo,
31853186
dmstate->param_exprs,
31863187
values);
3187-
3188-
/*
3189-
* Notice that we pass NULL for paramTypes, thus forcing the remote server
3190-
* to infer types for all parameters. Since we explicitly cast every
3191-
* parameter (see deparse.c), the "inference" is trivial and will produce
3192-
* the desired result. This allows us to avoid assuming that the remote
3193-
* server has the same OIDs we do for the parameters' types.
3194-
*/
3195-
if (!PQsendQueryParams(dmstate->conn,dmstate->query,numParams,
3196-
NULL,values,NULL,NULL,0))
3197-
pgfdw_report_error(ERROR,NULL,dmstate->conn, false,dmstate->query);
3188+
3189+
/*
3190+
* Notice that we pass NULL for paramTypes, thus forcing the remote server
3191+
* to infer types for all parameters. Since we explicitly cast every
3192+
* parameter (see deparse.c), the "inference" is trivial and will produce
3193+
* the desired result. This allows us to avoid assuming that the remote
3194+
* server has the same OIDs we do for the parameters' types.
3195+
*/
3196+
if (!PQsendQueryParams(dmstate->conn,dmstate->query,numParams,
3197+
NULL,values,NULL,NULL,0))
3198+
pgfdw_report_error(ERROR,NULL,dmstate->conn, false,dmstate->query);
3199+
}
3200+
else
3201+
{
3202+
if (!PQsendQuery(dmstate->conn,dmstate->query))
3203+
pgfdw_report_error(ERROR,NULL,dmstate->conn, false,dmstate->query);
3204+
}
31983205

31993206
/*
32003207
* Get the result, and check for success.

‎contrib/postgres_fdw/tests/dtmbench.cpp

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ struct config
5353
int nShards;
5454
string connection;
5555
bool prepared;
56+
bool local;
57+
bool pathman_sharding;
5658

5759
config() {
5860
nShards =1;
@@ -62,6 +64,8 @@ struct config
6264
nAccounts =10000;
6365
updatePercent =100;
6466
prepared =false;
67+
local =false;
68+
pathman_sharding =false;
6569
}
6670
};
6771

@@ -125,14 +129,17 @@ void* writer(void* arg)
125129
{
126130
thread& t = *(thread*)arg;
127131
connectionconn(cfg.connection);
132+
128133
if (cfg.prepared) {
129134
conn.prepare("transfer","update t set v = v + $1 where u=$2");
130135
}
136+
131137
for (int i =0; i < cfg.nIterations; i++)
132138
{
133139
worktxn(conn);
134140
int srcAcc =random() % cfg.nAccounts;
135-
int dstAcc =random() % cfg.nAccounts;
141+
int dstAcc = (cfg.local ? srcAcc +1 :random()) % cfg.nAccounts;
142+
136143
try {
137144
if (random() %100 < cfg.updatePercent) {
138145
int rc = cfg.prepared
@@ -180,8 +187,17 @@ void initializeDatabase()
180187
for (int i =0; i < cfg.nShards; i++)
181188
{
182189
worktxn(conn);
183-
exec(txn,"alter table t_fdw%i add check (u between %d and %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1);
190+
191+
if (!cfg.pathman_sharding) {
192+
exec(txn,"alter table t_fdw%i add check (u between %d and %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1);
193+
}else {
194+
//exec(txn, "SELECT add_range_partition('t', %d::int, %d, 't_fdw%i')",
195+
exec(txn,"SELECT add_foreign_range_partition('t', %d::int, %d, 't_fdw%i', 'shard%i')",
196+
accountsPerShard*i, accountsPerShard*(i+1), i+1, i %3 +1);
197+
}
198+
184199
exec(txn,"insert into t_fdw%i (select generate_series(%d,%d), %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1,0);
200+
185201
txn.commit();
186202
}
187203
}
@@ -221,9 +237,15 @@ int main (int argc, char* argv[])
221237
initialize =true;
222238
cfg.nShards =atoi(argv[++i]);
223239
continue;
240+
case'l':
241+
cfg.local =true;
242+
continue;
224243
case'P':
225244
cfg.prepared =true;
226245
continue;
246+
case'S':
247+
cfg.pathman_sharding =true;
248+
continue;
227249
}
228250
}
229251
printf("Options:\n"
@@ -234,7 +256,9 @@ int main (int argc, char* argv[])
234256
"\t-p N\tupdate percent (100)\n"
235257
"\t-c STR\tdatabase connection string\n"
236258
"\t-i N\tinitialize N shards\n"
237-
"\t-P\tuse prepared statements\n");
259+
"\t-l\ttlocal tranfers\n"
260+
"\t-P\tuse prepared statements\n"
261+
"\t-S\tuse pathman_sharding\n");
238262
return1;
239263
}
240264

@@ -282,7 +306,8 @@ int main (int argc, char* argv[])
282306
printf(
283307
"{\"tps\":%f,\"transactions\":%ld,"
284308
"\"selects\":%ld,\"updates\":%ld,\"aborts\":%ld,\"abort_percent\": %d,"
285-
"\"readers\":%d,\"writers\":%d,\"update_percent\":%d,\"accounts\":%d,\"iterations\":%d ,\"shards\":%d,\"prepared\":%d}\n",
309+
"\"readers\":%d,\"writers\":%d,\"update_percent\":%d,\"accounts\":%d,"
310+
"\"iterations\":%d ,\"shards\":%d,\"prepared\":%d,\"pathman_sharding\":%d,\"local\":%d}\n",
286311
(double)(nTransactions*USEC)/elapsed,
287312
nTransactions,
288313
nSelects,
@@ -295,7 +320,9 @@ int main (int argc, char* argv[])
295320
cfg.nAccounts,
296321
cfg.nIterations,
297322
cfg.nShards,
298-
cfg.prepared);
323+
cfg.prepared,
324+
cfg.pathman_sharding,
325+
cfg.local);
299326

300327
return0;
301328
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp