Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8ed9197

Browse files
committed
Add active snapshot
2 parents99affcb +eeb95fe commit8ed9197

File tree

2 files changed

+67
-26
lines changed

2 files changed

+67
-26
lines changed

‎contrib/pg_xtm/dtmd/src/main.c‎

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -227,17 +227,17 @@ static void gen_snapshot(Transaction *t) {
227227

228228
staticxid_tget_global_xmin() {
229229
inti,j;
230-
xid_txmin=MAX_XID;
230+
xid_txmin=INVALID_XID;
231231
Transaction*t;
232232
for (i=0;i<transactions_count;i++) {
233233
t=transactions+i;
234-
j=t->snapshots_count>MAX_SNAPSHOTS_PER_TRANS ?MAX_SNAPSHOTS_PER_TRANS :t->snapshots_count;
235-
while (--j >=0) {
236-
Snapshot*s=transaction_snapshot(t,j);
237-
if(s->xmin<xmin) {
238-
xmin=s->xmin;
239-
}
240-
// minor TODO: Use 'times_sent' to generate a bit greater xmin?
234+
j=t->snapshots_count>MAX_SNAPSHOTS_PER_TRANS ?MAX_SNAPSHOTS_PER_TRANS :t->snapshots_count;
235+
while (--j >=0) {
236+
Snapshot*s=transaction_snapshot(t,j);
237+
if ((xmin==INVALID_XID)||(s->xmin<xmin)) {
238+
xmin=s->xmin;
239+
}
240+
// minor TODO: Use 'times_sent' to generate a bit greater xmin?
241241
}
242242
}
243243
returnxmin;

‎contrib/pg_xtm/tests/transfers.go‎

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ const (
1515
N_ACCOUNTS=TRANSFER_CONNECTIONS//100000
1616
//ISOLATION_LEVEL = "repeatable read"
1717
ISOLATION_LEVEL="read committed"
18+
GLOBAL_UPDATES=true
19+
LOCAL_UPDATES=false
1820
)
1921

2022

@@ -115,13 +117,15 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
115117
varnCommits=0
116118
varmyCommits=0
117119

118-
conn1,err:=pgx.Connect(cfg1)
120+
varconn [2]*pgx.Conn
121+
122+
conn[0],err=pgx.Connect(cfg1)
119123
checkErr(err)
120-
deferconn1.Close()
124+
deferconn[0].Close()
121125

122-
conn2,err:=pgx.Connect(cfg2)
126+
conn[1],err=pgx.Connect(cfg2)
123127
checkErr(err)
124-
deferconn2.Close()
128+
deferconn[1].Close()
125129

126130
start:=time.Now()
127131
formyCommits<N_ITERATIONS {
@@ -130,23 +134,57 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
130134
account1:=rand.Intn(N_ACCOUNTS)
131135
account2:=rand.Intn(N_ACCOUNTS)
132136

133-
xid=execQuery(conn1,"select dtm_begin_transaction(2)")
134-
exec(conn2,"select dtm_join_transaction($1)",xid)
137+
if (account1>=account2) {
138+
continue
139+
}
140+
141+
src:=conn[rand.Intn(2)]
142+
dst:=conn[rand.Intn(2)]
135143

136-
// start transaction
137-
exec(conn1,"begin transaction isolation level "+ISOLATION_LEVEL)
138-
exec(conn2,"begin transaction isolation level "+ISOLATION_LEVEL)
144+
ifsrc==dst {
145+
// local update
146+
if!LOCAL_UPDATES {
147+
// which we do not want
148+
continue
149+
}
139150

140-
ok1:=execUpdate(conn1,"update t set v = v + $1 where u=$2",amount,account1)
141-
ok2:=execUpdate(conn2,"update t set v = v - $1 where u=$2",amount,account2)
142-
if!ok1||!ok2 {
143-
exec(conn1,"rollback")
144-
exec(conn2,"rollback")
145-
nAborts+=1
151+
exec(src,"begin transaction isolation level "+ISOLATION_LEVEL)
152+
ok1:=execUpdate(src,"update t set v = v - $1 where u=$2",amount,account1)
153+
ok2:=execUpdate(src,"update t set v = v + $1 where u=$2",amount,account2)
154+
if!ok1||!ok2 {
155+
exec(src,"rollback")
156+
nAborts+=1
157+
}else {
158+
exec(src,"commit")
159+
nCommits+=1
160+
myCommits+=1
161+
}
146162
}else {
147-
commit(conn1,conn2)
148-
nCommits+=1
149-
myCommits+=1
163+
// global update
164+
if!GLOBAL_UPDATES {
165+
// which we do not want
166+
continue
167+
}
168+
169+
xid=execQuery(src,"select dtm_begin_transaction(2)")
170+
exec(dst,"select dtm_join_transaction($1)",xid)
171+
172+
// start transaction
173+
exec(src,"begin transaction isolation level "+ISOLATION_LEVEL)
174+
exec(dst,"begin transaction isolation level "+ISOLATION_LEVEL)
175+
176+
ok1:=execUpdate(src,"update t set v = v - $1 where u=$2",amount,account1)
177+
ok2:=execUpdate(dst,"update t set v = v + $1 where u=$2",amount,account2)
178+
179+
if!ok1||!ok2 {
180+
exec(src,"rollback")
181+
exec(dst,"rollback")
182+
nAborts+=1
183+
}else {
184+
commit(src,dst)
185+
nCommits+=1
186+
myCommits+=1
187+
}
150188
}
151189

152190
iftime.Since(start).Seconds()>1 {
@@ -241,6 +279,9 @@ func execUpdate(conn *pgx.Conn, stmt string, arguments ...interface{}) bool {
241279
varerrerror
242280
// fmt.Println(stmt)
243281
_,err=conn.Exec(stmt,arguments... )
282+
iferr!=nil {
283+
fmt.Println(err)
284+
}
244285
returnerr==nil
245286
}
246287

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp