Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitcfe0454

Browse files
committed
Use repeatable read isolation level
1 parent8a7dfb1 commitcfe0454

File tree

2 files changed

+40
-24
lines changed

2 files changed

+40
-24
lines changed

‎contrib/pg_xtm/pg_dtm.c‎

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,20 @@ static void DtmUpdateRecentXmin(void)
209209

210210
staticSnapshotDtmGetSnapshot(Snapshotsnapshot)
211211
{
212-
XTM_TRACE("XTM: DtmGetSnapshot \n");
213-
if (DtmGlobalTransaction/* && !DtmHasSnapshot*/) {
214-
DtmHasSnapshot= true;
215-
DtmEnsureConnection();
216-
DtmGlobalGetSnapshot(DtmConn,DtmNodeId,GetCurrentTransactionId(),&DtmSnapshot);
217-
}
218-
snapshot=GetLocalSnapshotData(snapshot);
219-
if (DtmHasSnapshot) {
220-
DtmCopySnapshot(snapshot,&DtmSnapshot);
221-
DtmUpdateRecentXmin();
212+
if (!IsMVCCSnapshot(snapshot)||snapshot==&CatalogSnapshotData) {
213+
snapshot=GetLocalSnapshotData(snapshot);
214+
}else {
215+
XTM_TRACE("XTM: DtmGetSnapshot \n");
216+
if (DtmGlobalTransaction/* && !DtmHasSnapshot*/) {
217+
DtmHasSnapshot= true;
218+
DtmEnsureConnection();
219+
DtmGlobalGetSnapshot(DtmConn,DtmNodeId,GetCurrentTransactionId(),&DtmSnapshot);
220+
}
221+
snapshot=GetLocalSnapshotData(snapshot);
222+
if (DtmHasSnapshot) {
223+
DtmCopySnapshot(snapshot,&DtmSnapshot);
224+
DtmUpdateRecentXmin();
225+
}
222226
}
223227
returnsnapshot;
224228
}

‎contrib/pg_xtm/tests/transfers.go‎

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package main
33
import (
44
"fmt"
55
"sync"
6-
"math/rand"
6+
// "math/rand"
77
"github.com/jackc/pgx"
88
)
99

@@ -94,6 +94,7 @@ func max(a, b int64) int64 {
9494
functransfer(idint,wg*sync.WaitGroup) {
9595
varerrerror
9696
varxids []int32=make([]int32,2)
97+
varnConflicts=0
9798

9899
conn1,err:=pgx.Connect(cfg1)
99100
checkErr(err)
@@ -106,12 +107,12 @@ func transfer(id int, wg *sync.WaitGroup) {
106107
fori:=0;i<N_ITERATIONS;i++ {
107108
//amount := 2*rand.Intn(2) - 1
108109
amount:=1
109-
account1:=rand.Intn(N_ACCOUNTS)
110-
account2:=rand.Intn(N_ACCOUNTS)
110+
account1:=id//rand.Intn(N_ACCOUNTS)
111+
account2:=id//rand.Intn(N_ACCOUNTS)
111112

112113
// strt transaction
113-
exec(conn1,"begin")
114-
exec(conn2,"begin")
114+
exec(conn1,"begin transaction isolation level repeatable read")
115+
exec(conn2,"begin transaction isolation level repeatable read")
115116

116117
// obtain XIDs of paticipants
117118
xids[0]=execQuery(conn1,"select txid_current()")
@@ -121,13 +122,17 @@ func transfer(id int, wg *sync.WaitGroup) {
121122
exec(conn1,"select dtm_begin_transaction($1, $2)",nodes,xids)
122123
exec(conn2,"select dtm_begin_transaction($1, $2)",nodes,xids)
123124

124-
exec(conn1,"update t set v = v + $1 where u=$2",amount,account1)
125-
exec(conn2,"update t set v = v - $1 where u=$2",amount,account2)
126-
127-
commit(conn1,conn2)
125+
if!execUpdate(conn1,"update t set v = v + $1 where u=$2",amount,account1)||
126+
!execUpdate(conn2,"update t set v = v - $1 where u=$2",amount,account2) {
127+
exec(conn1,"rollback")
128+
exec(conn2,"rollback")
129+
nConflicts+=1
130+
i-=1
131+
}else {
132+
commit(conn1,conn2)
133+
}
128134
}
129-
130-
fmt.Println("Test completed")
135+
fmt.Println("Test completed with ",nConflicts," conflicts")
131136
wg.Done()
132137
}
133138

@@ -143,8 +148,8 @@ func inspect(wg *sync.WaitGroup) {
143148
conn2,err:=pgx.Connect(cfg2)
144149
checkErr(err)
145150

146-
exec(conn1,"begin")
147-
exec(conn2,"begin")
151+
exec(conn1,"begin transaction isolation level repeatable read")
152+
exec(conn2,"begin transaction isolation level repeatable read")
148153

149154
// obtain XIDs of paticipants
150155
xids[0]=execQuery(conn1,"select txid_current()")
@@ -192,10 +197,17 @@ func main() {
192197
funcexec(conn*pgx.Conn,stmtstring,arguments...interface{}) {
193198
varerrerror
194199
// fmt.Println(stmt)
195-
_,_=conn.Exec(stmt,arguments... )
200+
_,err=conn.Exec(stmt,arguments... )
196201
checkErr(err)
197202
}
198203

204+
funcexecUpdate(conn*pgx.Conn,stmtstring,arguments...interface{})bool {
205+
varerrerror
206+
// fmt.Println(stmt)
207+
_,err=conn.Exec(stmt,arguments... )
208+
returnerr==nil
209+
}
210+
199211
funcexecQuery(conn*pgx.Conn,stmtstring,arguments...interface{})int32 {
200212
varerrerror
201213
varresultint64

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp