Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7a43e43

Browse files
committed
Add progress indication to the transfers test.
1 parent2b560c2 commit7a43e43

File tree

2 files changed

+90
-45
lines changed

2 files changed

+90
-45
lines changed

‎contrib/pg_xtm/tests/daemons.go‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ func postgres(bin string, datadir string, port int, nodeid int, wg *sync.WaitGro
9191
"-D",datadir,
9292
"-p",strconv.Itoa(port),
9393
"-c","dtm.node_id="+strconv.Itoa(nodeid),
94+
"-c","autovacuum=off",
95+
"-c","fsync=off",
96+
"-c","synchronous_commit=off",
9497
}
9598
name:="postgres "+datadir
9699
c:=make(chanstring)

‎contrib/pg_xtm/tests/transfers.go‎

Lines changed: 87 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"sync"
66
"math/rand"
7+
"time"
78
"github.com/jackc/pgx"
89
)
910

@@ -34,10 +35,10 @@ var running = false
3435
varnodes []int32= []int32{0,1}
3536

3637
funcasyncCommit(conn*pgx.Conn,wg*sync.WaitGroup) {
37-
exec(conn,"commit")
38+
exec(conn,"commit")
3839
wg.Done()
3940
}
40-
41+
4142
funccommit(conn1,conn2*pgx.Conn) {
4243
varwg sync.WaitGroup
4344
wg.Add(2)
@@ -66,33 +67,52 @@ func prepare_db() {
6667
exec(conn2,"create extension pg_dtm")
6768
exec(conn2,"drop table if exists t")
6869
exec(conn2,"create table t(u int primary key, v int)")
69-
70+
7071
// xid = execQuery(conn1, "select dtm_begin_transaction(2)")
7172
// exec(conn2, "select dtm_join_transaction($1)", xid)
7273

7374
// strt transaction
7475
exec(conn1,"begin transaction isolation level "+ISOLATION_LEVEL)
7576
exec(conn2,"begin transaction isolation level "+ISOLATION_LEVEL)
76-
77+
7778
fori:=0;i<N_ACCOUNTS;i++ {
7879
exec(conn1,"insert into t values($1, $2)",i,INIT_AMOUNT)
7980
exec(conn2,"insert into t values($1, $2)",i,INIT_AMOUNT)
8081
}
81-
82+
8283
commit(conn1,conn2)
8384
}
8485

8586
funcmax(a,bint64)int64 {
8687
ifa>=b {
8788
returna
88-
}
89+
}
8990
returnb
9091
}
9192

92-
functransfer(idint,wg*sync.WaitGroup) {
93+
funcprogress(totalint,cCommitschanint,cAbortschanint) {
94+
commits:=0
95+
aborts:=0
96+
start:=time.Now()
97+
fornewcommits:=rangecCommits {
98+
newaborts:=<-cAborts
99+
commits+=newcommits
100+
aborts+=newaborts
101+
iftime.Since(start).Seconds()>1 {
102+
fmt.Printf(
103+
"progress %0.2f%%: %d commits, %d aborts\n",
104+
float32(commits)*100.0/float32(total),commits,aborts,
105+
)
106+
start=time.Now()
107+
}
108+
}
109+
}
110+
111+
functransfer(idint,cCommitschanint,cAbortschanint,wg*sync.WaitGroup) {
93112
varerrerror
94113
varxidint32
95-
varnConflicts=0
114+
varnAborts=0
115+
varnCommits=0
96116

97117
conn1,err:=pgx.Connect(cfg1)
98118
checkErr(err)
@@ -102,10 +122,11 @@ func transfer(id int, wg *sync.WaitGroup) {
102122
checkErr(err)
103123
deferconn2.Close()
104124

105-
fori:=0;i<N_ITERATIONS;i++ {
106-
//amount := 2*rand.Intn(2) - 1
107-
amount:=1
108-
account1:=rand.Intn(N_ACCOUNTS)
125+
start:=time.Now()
126+
fornCommits<N_ITERATIONS {
127+
amount:=2*rand.Intn(2000)-1
128+
//amount := 1
129+
account1:=rand.Intn(N_ACCOUNTS)
109130
account2:=rand.Intn(N_ACCOUNTS)
110131

111132
xid=execQuery(conn1,"select dtm_begin_transaction(2)")
@@ -114,25 +135,34 @@ func transfer(id int, wg *sync.WaitGroup) {
114135
// start transaction
115136
exec(conn1,"begin transaction isolation level "+ISOLATION_LEVEL)
116137
exec(conn2,"begin transaction isolation level "+ISOLATION_LEVEL)
117-
118-
ok1:=execUpdate(conn1,"update t set v = v + $1 where u=$2",amount,account1)
119-
ok2:=execUpdate(conn2,"update t set v = v - $1 where u=$2",amount,account2)
120-
if!ok1||!ok2 {
138+
139+
ok1:=execUpdate(conn1,"update t set v = v + $1 where u=$2",amount,account1)
140+
ok2:=execUpdate(conn2,"update t set v = v - $1 where u=$2",amount,account2)
141+
if!ok1||!ok2 {
121142
exec(conn1,"rollback")
122143
exec(conn2,"rollback")
123-
nConflicts+=1
124-
i-=1
125-
}else {
144+
nAborts+=1
145+
}else {
126146
commit(conn1,conn2)
127-
}
147+
nCommits+=1
148+
}
149+
150+
iftime.Since(start).Seconds()>1 {
151+
cCommits<-nCommits
152+
cAborts<-nAborts
153+
nCommits=0
154+
nAborts=0
155+
start=time.Now()
156+
}
128157
}
129-
fmt.Println("Test completed with ",nConflicts," conflicts")
158+
cCommits<-nCommits
159+
cAborts<-nAborts
130160
wg.Done()
131161
}
132162

133163
funcinspect(wg*sync.WaitGroup) {
134164
varsum1,sum2,sumint64
135-
varprevSumint64=0
165+
varprevSumint64=0
136166
varxidint32
137167

138168
{
@@ -142,28 +172,33 @@ func inspect(wg *sync.WaitGroup) {
142172
conn2,err:=pgx.Connect(cfg2)
143173
checkErr(err)
144174

145-
forrunning {
175+
forrunning {
176+
xid=execQuery(conn1,"select dtm_begin_transaction(2)")
177+
exec(conn2,"select dtm_join_transaction($1)",xid)
178+
179+
exec(conn1,"begin transaction isolation level "+ISOLATION_LEVEL)
180+
exec(conn2,"begin transaction isolation level "+ISOLATION_LEVEL)
181+
182+
sum1=execQuery64(conn1,"select sum(v) from t")
183+
sum2=execQuery64(conn2,"select sum(v) from t")
184+
185+
sum=sum1+sum2
186+
if (sum!=prevSum) {
187+
xmin1:=execQuery(conn1,"select dtm_get_current_snapshot_xmin()")
188+
xmax1:=execQuery(conn1,"select dtm_get_current_snapshot_xmax()")
189+
xmin2:=execQuery(conn2,"select dtm_get_current_snapshot_xmin()")
190+
xmax2:=execQuery(conn2,"select dtm_get_current_snapshot_xmax()")
191+
fmt.Printf(
192+
"Total=%d xid=%d snap1=[%d, %d) snap2=[%d, %d)\n",
193+
sum,xid,xmin1,xmax1,xmin2,xmax2,
194+
)
195+
prevSum=sum
196+
}
146197

147-
148-
xid=execQuery(conn1,"select dtm_begin_transaction(2)")
149-
exec(conn2,"select dtm_join_transaction($1)",xid)
150-
151-
exec(conn1,"begin transaction isolation level "+ISOLATION_LEVEL)
152-
exec(conn2,"begin transaction isolation level "+ISOLATION_LEVEL)
153-
154-
sum1=execQuery64(conn1,"select sum(v) from t")
155-
sum2=execQuery64(conn2,"select sum(v) from t")
156-
157-
sum=sum1+sum2
158-
if (sum!=prevSum) {
159-
fmt.Println("Total = ",sum,"xid=",xid,"snap1={",execQuery(conn1,"select dtm_get_current_snapshot_xmin()"),execQuery(conn1,"select dtm_get_current_snapshot_xmax()"),"}, snap2={",execQuery(conn2,"select dtm_get_current_snapshot_xmin()"),execQuery(conn2,"select dtm_get_current_snapshot_xmax()"),"}")
160-
prevSum=sum
161-
}
162-
163-
commit(conn1,conn2)
164-
}
165-
conn1.Close()
166-
conn2.Close()
198+
commit(conn1,conn2)
199+
}
200+
conn1.Close()
201+
conn2.Close()
167202
}
168203
wg.Done()
169204
}
@@ -174,9 +209,13 @@ func main() {
174209

175210
prepare_db()
176211

212+
cCommits:=make(chanint)
213+
cAborts:=make(chanint)
214+
goprogress(TRANSFER_CONNECTIONS*N_ITERATIONS,cCommits,cAborts)
215+
177216
transferWg.Add(TRANSFER_CONNECTIONS)
178217
fori:=0;i<TRANSFER_CONNECTIONS;i++ {
179-
gotransfer(i,&transferWg)
218+
gotransfer(i,cCommits,cAborts,&transferWg)
180219
}
181220
running=true
182221
inspectWg.Add(1)
@@ -185,6 +224,8 @@ func main() {
185224
transferWg.Wait()
186225
running=false
187226
inspectWg.Wait()
227+
228+
fmt.Printf("done\n")
188229
}
189230

190231
funcexec(conn*pgx.Conn,stmtstring,arguments...interface{}) {
@@ -216,10 +257,11 @@ func execQuery64(conn *pgx.Conn, stmt string, arguments ...interface{}) int64 {
216257
checkErr(err)
217258
returnresult
218259
}
260+
219261
funccheckErr(errerror) {
220262
iferr!=nil {
221263
panic(err)
222264
}
223265
}
224266

225-
267+
// vim: expandtab ts=4 sts=4 sw=4

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp