Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9645d73

Browse files
committed
local node as participant
1 parent2c64afe commit9645d73

File tree

3 files changed

+167
-40
lines changed

3 files changed

+167
-40
lines changed

‎contrib/postgres_fdw/connection.c

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,6 @@ begin_remote_xact(ConnCacheEntry *entry)
470470
if (current_global_cid==0)
471471
{
472472
MemoryContextoldcxt;
473-
char*resp;
474473

475474
/*
476475
* This is the first remote participant, create global
@@ -484,38 +483,20 @@ begin_remote_xact(ConnCacheEntry *entry)
484483
++two_phase_xact_count);
485484
MemoryContextSwitchTo(oldcxt);
486485

487-
488-
// res = PQexec(entry->conn, psprintf("SELECT pg_global_snaphot_create('%s')",
489-
// two_phase_xact_gid));
490-
491-
// if (PQresultStatus(res) != PGRES_TUPLES_OK)
492-
// {
493-
// pgfdw_report_error(ERROR, res, entry->conn, true, sql);
494-
// }
495-
// resp = PQgetvalue(res, 0, 0);
496-
// if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%ld", &current_global_cid) != 1)
497-
// {
498-
// pgfdw_report_error(ERROR, res, entry->conn, true, sql);
499-
// }
500-
// PQclear(res);
486+
current_global_cid=DtmLocalExtend(two_phase_xact_gid);
487+
}
501488

489+
Assert(two_phase_xact_gid);
490+
/* join the new participant */
491+
res=PQexec(entry->conn,
492+
psprintf("SELECT pg_global_snaphot_join("UINT64_FORMAT", '%s')",
493+
current_global_cid,two_phase_xact_gid));
502494

503-
current_global_cid=DtmLocalExtend(two_phase_xact_gid);
495+
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
496+
{
497+
pgfdw_report_error(ERROR,res,entry->conn, true,sql);
504498
}
505-
// else
506-
// {
507-
Assert(two_phase_xact_gid);
508-
/* join the new participant */
509-
res=PQexec(entry->conn,
510-
psprintf("SELECT pg_global_snaphot_join("UINT64_FORMAT", '%s')",
511-
current_global_cid,two_phase_xact_gid));
512-
513-
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
514-
{
515-
pgfdw_report_error(ERROR,res,entry->conn, true,sql);
516-
}
517-
PQclear(res);
518-
// }
499+
PQclear(res);
519500
}
520501

521502
/* A new potential participant for 2PC */
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use strict;
2+
use warnings;
3+
4+
use PostgresNode;
5+
use TestLib;
6+
use Test::Moretests=> 1;
7+
8+
# my $master = get_new_node("master");
9+
# $master->init;
10+
# $master->append_conf('postgresql.conf', qq(
11+
# max_prepared_transactions = 30
12+
# log_checkpoints = true
13+
# postgres_fdw.use_tsdtm = on
14+
# ));
15+
# $master->start;
16+
17+
my$shard1 = get_new_node("shard1");
18+
$shard1->init;
19+
$shard1->append_conf('postgresql.conf',qq(
20+
max_prepared_transactions = 30
21+
postgres_fdw.use_tsdtm = on
22+
));
23+
$shard1->start;
24+
25+
my$shard2 = get_new_node("shard2");
26+
$shard2->init;
27+
$shard2->append_conf('postgresql.conf',qq(
28+
max_prepared_transactions = 30
29+
postgres_fdw.use_tsdtm = on
30+
));
31+
$shard2->start;
32+
33+
###############################################################################
34+
# Prepare nodes
35+
###############################################################################
36+
37+
my@shards = ($shard1,$shard2);
38+
39+
foreachmy$node (@shards)
40+
{
41+
$node->safe_psql('postgres',"CREATE EXTENSION postgres_fdw");
42+
$node->safe_psql('postgres',"CREATE TABLE accounts(id integer primary key, amount integer)");
43+
$node->safe_psql('postgres',"CREATE TABLE accounts_local() inherits(accounts)");
44+
$node->safe_psql('postgres',"CREATE TABLE global_transactions(tx_time timestamp)");
45+
$node->safe_psql('postgres',"CREATE TABLE local_transactions(tx_time timestamp)");
46+
47+
foreachmy$neighbor (@shards)
48+
{
49+
nextif ($neighboreq$node);
50+
51+
my$port =$neighbor->port;
52+
my$host =$neighbor->host;
53+
54+
$node->safe_psql('postgres',"CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw options(dbname 'postgres', host '$host', port '$port')");
55+
$node->safe_psql('postgres',"CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts) server shard_$port options(table_name 'accounts_local')");
56+
$node->safe_psql('postgres',"CREATE USER MAPPING for stas SERVER shard_$port options (user 'stas')");
57+
}
58+
59+
}
60+
61+
diag("\n");
62+
diag($shard1->connstr('postgres'),"\n" );
63+
diag($shard2->connstr('postgres'),"\n" );
64+
65+
$shard1->psql('postgres',"insert into accounts_local select 2*id-1, 0 from generate_series(1, 10010) as id;");
66+
$shard2->psql('postgres',"insert into accounts_local select 2*id, 0 from generate_series(1, 10010) as id;");
67+
68+
diag("\n");
69+
diag($shard1->connstr('postgres'),"\n" );
70+
diag($shard2->connstr('postgres'),"\n" );
71+
72+
#sleep(6000);
73+
74+
$shard1->pgbench(-n,-c=> 20,-t=> 30,-f=>"$TestLib::log_path/../../t/bank.sql",'postgres' );
75+
$shard2->pgbench(-n,-c=> 20,-t=> 30,-f=>"$TestLib::log_path/../../t/bank.sql",'postgres' );
76+
77+
diag("\n");
78+
diag($shard1->connstr('postgres'),"\n" );
79+
diag($shard2->connstr('postgres'),"\n" );
80+
# sleep(3600);
81+
82+
###############################################################################
83+
# Helpers
84+
###############################################################################
85+
86+
subcount_and_delete_rows
87+
{
88+
my ($node,$table) =@_;
89+
my ($rc,$count,$err);
90+
91+
($rc,$count,$err) =$node->psql('postgres',"select count(*) from$table",
92+
on_error_die=> 1);
93+
94+
die"count_rows:$err"if ($errne'');
95+
96+
$node->psql('postgres',"delete from$table",on_error_die=> 1);
97+
98+
diag($node->name,": completed$count transactions");
99+
100+
return$count;
101+
}
102+
103+
###############################################################################
104+
# Concurrent global transactions
105+
###############################################################################
106+
107+
my ($err,$rc);
108+
my$started;
109+
my$seconds = 30;
110+
my$selects;
111+
my$total ='0';
112+
my$oldtotal ='0';
113+
my$isolation_errors = 0;
114+
115+
116+
my ($pgb_handle1,$pgb_handle2);
117+
118+
$pgb_handle1 =$shard1->pgbench_async(-n,-c=> 5,-T=>$seconds,-f=>"$TestLib::log_path/../../t/bank.sql",'postgres' );
119+
$pgb_handle2 =$shard2->pgbench_async(-n,-c=> 5,-T=>$seconds,-f=>"$TestLib::log_path/../../t/bank.sql",'postgres' );
120+
121+
$started =time();
122+
$selects = 0;
123+
my$i = 1;
124+
while (time() -$started <$seconds)
125+
{
126+
my$shard =$shard1;
127+
foreachmy$shard (@shards)
128+
{
129+
$total =$shard->safe_psql('postgres',"select sum(amount) from accounts");
130+
if ( ($totalne$oldtotal)and ($totalne'') )
131+
{
132+
$isolation_errors++;
133+
$oldtotal =$total;
134+
diag("$i: Isolation error. Total =$total");
135+
}
136+
if ($totalne'') {$selects++; }
137+
$i++;
138+
}
139+
}
140+
141+
$shard1->pgbench_await($pgb_handle1);
142+
$shard2->pgbench_await($pgb_handle2);
143+
144+
# sanity check
145+
diag("completed$selects selects");
146+
die"no actual transactions happend"unless ($selects > 0 &&
147+
count_and_delete_rows($shard1,'global_transactions') > 0 &&
148+
count_and_delete_rows($shard2,'global_transactions') > 0);
149+
150+
is($isolation_errors, 0,'isolation between concurrent global transaction');
151+

‎src/backend/access/transam/global_snapshot.c

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
392392
SpinLockRelease(&local->lock);
393393
return true;
394394
}
395-
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS)
395+
if (ts->status==TRANSACTION_STATUS_UNKNOWN)
396396
{
397397
DTM_TRACE((stderr,"%d: wait for in-doubt transaction %u in snapshot %lu\n",getpid(),xid,dtm_tx.snapshot));
398398
SpinLockRelease(&local->lock);
@@ -431,22 +431,17 @@ DtmInitialize()
431431

432432
info.keysize=sizeof(TransactionId);
433433
info.entrysize=sizeof(DtmTransStatus);
434-
info.hash=dtm_xid_hash_fn;
435-
info.match=dtm_xid_match_fn;
436434
xid2status=ShmemInitHash("xid2status",
437435
DTM_HASH_INIT_SIZE,DTM_HASH_INIT_SIZE,
438436
&info,
439-
HASH_ELEM |HASH_FUNCTION |HASH_COMPARE);
437+
HASH_ELEM |HASH_BLOBS);
440438

441439
info.keysize=MAX_GTID_SIZE;
442440
info.entrysize=sizeof(DtmTransId);
443-
info.hash=dtm_gtid_hash_fn;
444-
info.match=dtm_gtid_match_fn;
445-
info.keycopy=dtm_gtid_keycopy_fn;
446441
gtid2xid=ShmemInitHash("gtid2xid",
447442
DTM_HASH_INIT_SIZE,DTM_HASH_INIT_SIZE,
448443
&info,
449-
HASH_ELEM |HASH_FUNCTION |HASH_COMPARE |HASH_KEYCOPY);
444+
HASH_ELEM);
450445

451446
TM=&DtmTM;
452447

@@ -526,7 +521,7 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
526521
{
527522
DtmTransId*id= (DtmTransId*)hash_search(gtid2xid,gtid,HASH_ENTER,NULL);
528523

529-
id->xid=x->xid;
524+
id->xid=GetCurrentTransactionId();
530525
id->nSubxids=0;
531526
id->subxids=0;
532527
}
@@ -559,7 +554,7 @@ DtmLocalBeginPrepare(GlobalTransactionId gtid)
559554
Assert(id!=NULL);
560555
Assert(TransactionIdIsValid(id->xid));
561556
ts= (DtmTransStatus*)hash_search(xid2status,&id->xid,HASH_ENTER,NULL);
562-
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
557+
ts->status=TRANSACTION_STATUS_UNKNOWN;
563558
ts->cid=dtm_get_cid();
564559
ts->nSubxids=id->nSubxids;
565560
DtmTransactionListAppend(ts);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp