Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit316e377

Browse files
committed
Corectly handle rollback of remote transaction
1 parent39d8402 commit316e377

File tree

2 files changed

+254
-8
lines changed

2 files changed

+254
-8
lines changed

‎contrib/pg_xtm/pg_dtm.c‎

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ static Snapshot CurrentTransactionSnapshot;
8888
staticTransactionIdDtmNextXid;
8989
staticSnapshotDataDtmSnapshot= {HeapTupleSatisfiesMVCC };
9090
staticboolDtmHasGlobalSnapshot;
91-
staticboolDtmIsGlobalTransaction;
91+
staticboolDtmGlobalXidAssigned;
9292
staticintDtmLocalXidReserve;
9393
staticintDtmCurcid;
9494
staticSnapshotDtmLastSnapshot;
@@ -329,6 +329,7 @@ DtmGetNewTransactionId(bool isSubXact)
329329
TransactionIdxid;
330330

331331
XTM_INFO("%d: GetNewTransactionId\n",getpid());
332+
Assert(!DtmGlobalXidAssigned);
332333

333334
/*
334335
* Workers synchronize transaction state at the beginning of each parallel
@@ -550,6 +551,9 @@ static bool DtmTransactionIdIsInProgress(TransactionId xid)
550551

551552
staticSnapshotDtmGetSnapshot(Snapshotsnapshot)
552553
{
554+
if (DtmGlobalXidAssigned) {
555+
returnGetLocalSnapshotData(snapshot);
556+
}
553557
if (TransactionIdIsValid(DtmNextXid)/*&& IsMVCCSnapshot(snapshot)*/&&snapshot!=&CatalogSnapshotData) {
554558
if (!DtmHasGlobalSnapshot&& (snapshot!=DtmLastSnapshot||DtmCurcid!=snapshot->curcid)) {
555559
DtmGlobalGetSnapshot(DtmNextXid,&DtmSnapshot,&dtm->minXid);
@@ -584,7 +588,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
584588
{
585589
XTM_INFO("%d: DtmSetTransactionStatus %u = %u\n",getpid(),xid,status);
586590
if (!RecoveryInProgress()) {
587-
if (!DtmIsGlobalTransaction&&TransactionIdIsValid(DtmNextXid)) {
591+
if (!DtmGlobalXidAssigned&&TransactionIdIsValid(DtmNextXid)) {
588592
/* Already should be IN_PROGRESS */
589593
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
590594
CurrentTransactionSnapshot=NULL;
@@ -664,15 +668,19 @@ static void
664668
DtmXactCallback(XactEventevent,void*arg)
665669
{
666670
if (event==XACT_EVENT_COMMIT||event==XACT_EVENT_ABORT) {
667-
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n",getpid(),event,DtmIsGlobalTransaction,DtmNextXid);
668-
if (DtmIsGlobalTransaction) {
669-
DtmIsGlobalTransaction= false;
671+
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n",getpid(),event,DtmGlobalXidAssigned,DtmNextXid);
672+
if (DtmGlobalXidAssigned) {
673+
DtmGlobalXidAssigned= false;
670674
}elseif (TransactionIdIsValid(DtmNextXid)) {
671675
if (event==XACT_EVENT_COMMIT) {
672676
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
673677
hash_search(xid_in_doubt,&DtmNextXid,HASH_REMOVE,NULL);
674678
LWLockRelease(dtm->hashLock);
675-
}
679+
}else {
680+
if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) {
681+
DtmGlobalSetTransStatus(DtmNextXid,TRANSACTION_STATUS_ABORTED, false);
682+
}
683+
}
676684
DtmNextXid=InvalidTransactionId;
677685
DtmLastSnapshot=NULL;
678686
}
@@ -780,7 +788,7 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
780788
XTM_INFO("%d: Start global transaction %d, dtm->minXid=%d\n",getpid(),DtmNextXid,dtm->minXid);
781789

782790
DtmHasGlobalSnapshot= true;
783-
DtmIsGlobalTransaction= true;
791+
DtmGlobalXidAssigned= true;
784792
DtmLastSnapshot=NULL;
785793

786794
PG_RETURN_INT32(DtmNextXid);
@@ -796,7 +804,7 @@ Datum dtm_join_transaction(PG_FUNCTION_ARGS)
796804
XTM_INFO("%d: Join global transaction %d, dtm->minXid=%d\n",getpid(),DtmNextXid,dtm->minXid);
797805

798806
DtmHasGlobalSnapshot= true;
799-
DtmIsGlobalTransaction= true;
807+
DtmGlobalXidAssigned= true;
800808
DtmLastSnapshot=NULL;
801809

802810
PG_RETURN_VOID();
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"strconv"
7+
"math/rand"
8+
"time"
9+
"github.com/jackc/pgx"
10+
)
11+
12+
const (
13+
TRANSFER_CONNECTIONS=2
14+
INIT_AMOUNT=10000
15+
N_ITERATIONS=10000
16+
N_ACCOUNTS=TRANSFER_CONNECTIONS//100000
17+
ISOLATION_LEVEL="repeatable read"
18+
//ISOLATION_LEVEL = "read committed"
19+
)
20+
21+
22+
varcfg1= pgx.ConnConfig{
23+
Host:"127.0.0.1",
24+
Port:5432,
25+
Database:"postgres",
26+
}
27+
28+
varcfg2= pgx.ConnConfig{
29+
Host:"127.0.0.1",
30+
Port:5433,
31+
Database:"postgres",
32+
}
33+
34+
35+
varrunning=false
36+
37+
funcprepare_db() {
38+
// var xid int32
39+
40+
conn1,err:=pgx.Connect(cfg1)
41+
checkErr(err)
42+
deferconn1.Close()
43+
44+
conn2,err:=pgx.Connect(cfg2)
45+
checkErr(err)
46+
deferconn2.Close()
47+
48+
exec(conn1,"drop extension if exists pg_dtm")
49+
exec(conn1,"create extension pg_dtm")
50+
exec(conn1,"drop table if exists t")
51+
exec(conn1,"create table t(u int primary key, v int)")
52+
53+
exec(conn2,"drop extension if exists pg_dtm")
54+
exec(conn2,"create extension pg_dtm")
55+
exec(conn2,"drop table if exists t")
56+
exec(conn2,"create table t(u int primary key, v int)")
57+
58+
exec(conn1,"CREATE EXTENSION postgres_fdw");
59+
exec(conn1,"CREATE SERVER dtm FOREIGN DATA WRAPPER postgres_fdw options (dbname 'postgres', host '127.0.0.1', port '5433')");
60+
exec(conn1,"CREATE FOREIGN TABLE t_fdw() inherits (t) server dtm options(table_name 't')");
61+
exec(conn1,"CREATE USER MAPPING for knizhnik SERVER dtm options (user 'knizhnik')");
62+
63+
// start transaction
64+
exec(conn1,"begin transaction isolation level "+ISOLATION_LEVEL)
65+
exec(conn2,"begin transaction isolation level "+ISOLATION_LEVEL)
66+
67+
fori:=0;i<N_ACCOUNTS;i++ {
68+
exec(conn1,"insert into t values($1, $2)",i,INIT_AMOUNT)
69+
exec(conn2,"insert into t values($1, $2)",-i,INIT_AMOUNT)
70+
}
71+
72+
exec(conn1,"commit")
73+
exec(conn2,"commit")
74+
}
75+
76+
funcprogress(totalint,cCommitschanint,cAbortschanint) {
77+
commits:=0
78+
aborts:=0
79+
start:=time.Now()
80+
fornewcommits:=rangecCommits {
81+
newaborts:=<-cAborts
82+
commits+=newcommits
83+
aborts+=newaborts
84+
iftime.Since(start).Seconds()>10 {
85+
fmt.Printf(
86+
"progress %0.2f%%: %d commits, %d aborts\n",
87+
float32(commits)*100.0/float32(total),commits,aborts,
88+
)
89+
start=time.Now()
90+
}
91+
}
92+
}
93+
94+
functransfer(idint,cCommitschanint,cAbortschanint,wg*sync.WaitGroup) {
95+
varerrerror
96+
varxidint32
97+
varnAborts=0
98+
varnCommits=0
99+
varmyCommits=0
100+
101+
conn,err:=pgx.Connect(cfg1)
102+
checkErr(err)
103+
deferconn.Close()
104+
105+
start:=time.Now()
106+
formyCommits<N_ITERATIONS {
107+
amount:=1
108+
account1:=rand.Intn(N_ACCOUNTS)
109+
account2:=-rand.Intn(N_ACCOUNTS)
110+
111+
exec(conn,"begin")
112+
xid=execQuery(conn,"select dtm_begin_transaction(2)")
113+
exec(conn,"select postgres_fdw_exec('t_fdw'::regclass::oid, 'select public.dtm_join_transaction("+strconv.Itoa(int(xid))+")')")
114+
exec(conn,"commit")
115+
116+
exec(conn,"begin transaction isolation level "+ISOLATION_LEVEL)
117+
118+
ok1:=execUpdate(conn,"update t set v = v - $1 where u=$2",amount,account1)
119+
ok2:=execUpdate(conn,"update t set v = v + $1 where u=$2",amount,account2)
120+
121+
if!ok1||!ok2 {
122+
exec(conn,"rollback")
123+
nAborts+=1
124+
}else {
125+
exec(conn,"commit")
126+
nCommits+=1
127+
myCommits+=1
128+
}
129+
130+
iftime.Since(start).Seconds()>10 {
131+
cCommits<-nCommits
132+
cAborts<-nAborts
133+
nCommits=0
134+
nAborts=0
135+
start=time.Now()
136+
}
137+
}
138+
cCommits<-nCommits
139+
cAborts<-nAborts
140+
wg.Done()
141+
}
142+
143+
funcinspect(wg*sync.WaitGroup) {
144+
varsumint64
145+
varprevSumint64=0
146+
varxidint32
147+
148+
{
149+
conn,err:=pgx.Connect(cfg1)
150+
checkErr(err)
151+
152+
forrunning {
153+
exec(conn,"begin")
154+
xid=execQuery(conn,"select dtm_begin_transaction(2)")
155+
exec(conn,"select postgres_fdw_exec('t_fdw'::regclass::oid, 'select public.dtm_join_transaction("+strconv.Itoa(int(xid))+")')")
156+
exec(conn,"commit")
157+
158+
exec(conn,"begin transaction isolation level "+ISOLATION_LEVEL)
159+
160+
sum=execQuery64(conn,"select sum(v) from t")
161+
162+
if (sum!=prevSum) {
163+
fmt.Printf("Total=%d xid=%d\n",sum,xid)
164+
prevSum=sum
165+
}
166+
167+
exec(conn,"commit")
168+
}
169+
conn.Close()
170+
}
171+
wg.Done()
172+
}
173+
174+
funcmain() {
175+
vartransferWg sync.WaitGroup
176+
varinspectWg sync.WaitGroup
177+
178+
prepare_db()
179+
180+
cCommits:=make(chanint)
181+
cAborts:=make(chanint)
182+
goprogress(TRANSFER_CONNECTIONS*N_ITERATIONS,cCommits,cAborts)
183+
184+
transferWg.Add(TRANSFER_CONNECTIONS)
185+
fori:=0;i<TRANSFER_CONNECTIONS;i++ {
186+
gotransfer(i,cCommits,cAborts,&transferWg)
187+
}
188+
running=true
189+
inspectWg.Add(1)
190+
goinspect(&inspectWg)
191+
192+
transferWg.Wait()
193+
running=false
194+
inspectWg.Wait()
195+
196+
fmt.Printf("done\n")
197+
}
198+
199+
funcexec(conn*pgx.Conn,stmtstring,arguments...interface{}) {
200+
varerrerror
201+
// fmt.Println(stmt)
202+
_,err=conn.Exec(stmt,arguments... )
203+
checkErr(err)
204+
}
205+
206+
funcexecUpdate(conn*pgx.Conn,stmtstring,arguments...interface{})bool {
207+
varerrerror
208+
// fmt.Println(stmt)
209+
_,err=conn.Exec(stmt,arguments... )
210+
//if err != nil {
211+
// fmt.Println(err)
212+
//}
213+
returnerr==nil
214+
}
215+
216+
funcexecQuery(conn*pgx.Conn,stmtstring,arguments...interface{})int32 {
217+
varerrerror
218+
varresultint32
219+
err=conn.QueryRow(stmt,arguments...).Scan(&result)
220+
checkErr(err)
221+
returnresult
222+
}
223+
224+
funcexecQuery64(conn*pgx.Conn,stmtstring,arguments...interface{})int64 {
225+
varerrerror
226+
varresultint64
227+
err=conn.QueryRow(stmt,arguments...).Scan(&result)
228+
checkErr(err)
229+
returnresult
230+
}
231+
232+
funccheckErr(errerror) {
233+
iferr!=nil {
234+
panic(err)
235+
}
236+
}
237+
238+
// vim: expandtab ts=4 sts=4 sw=4

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp