Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbf4a71e

Browse files
committed
readers backend. (partial support)
1 parentdf4606c commitbf4a71e

File tree

4 files changed

+119
-24
lines changed

4 files changed

+119
-24
lines changed

‎contrib/pg_dtm/tests/perf/perf.go‎

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type ConnStrings []string
1414
varbackendinterface{
1515
prepare(connstrs []string)
1616
writer(idint,cCommitschanint,cAbortschanint,wg*sync.WaitGroup)
17-
reader(wg*sync.WaitGroup,inconsistency*bool)
17+
reader(wg*sync.WaitGroup,cFetcheschanint,inconsistency*bool)
1818
}
1919

2020
varcfgstruct {
@@ -27,10 +27,10 @@ var cfg struct {
2727
Isolationstring
2828
AccountsNumint
2929
ReadersNumint
30+
IterNumint
3031

3132
Writersstruct {
3233
Numint
33-
Updatesint
3434
StartIdint
3535
}
3636
}
@@ -72,7 +72,7 @@ func dump_cfg() {
7272

7373
fmt.Printf(
7474
"Writers: %d × %d updates\n",
75-
cfg.Writers.Num,cfg.Writers.Updates,
75+
cfg.Writers.Num,cfg.IterNum,
7676
)
7777
}
7878

@@ -89,8 +89,8 @@ func init() {
8989
"The number of bank accounts")
9090
flag.IntVar(&cfg.Writers.StartId,"s",0,
9191
"StartID. Script will update rows starting from this value")
92-
flag.IntVar(&cfg.Writers.Updates,"u",10000,
93-
"The number updates each writer performs")
92+
flag.IntVar(&cfg.IterNum,"n",10000,
93+
"The number updates each writer(reader in case of Reades backend)performs")
9494
flag.IntVar(&cfg.ReadersNum,"r",1,
9595
"The number of readers")
9696
flag.IntVar(&cfg.Writers.Num,"w",8,
@@ -125,12 +125,18 @@ func init() {
125125
}
126126

127127
funcmain() {
128+
iflen(cfg.ConnStrs)<2 {
129+
fmt.Println("ERROR: This test needs at leas two connections")
130+
os.Exit(1)
131+
}
128132

129133
switchcfg.Backend {
130134
case"transfers":
131135
backend=new(Transfers)
132136
case"fdw":
133137
backend=new(TransfersFDW)
138+
case"readers":
139+
backend=new(Readers)
134140
default:
135141
fmt.Println("No backend named: '%s'\n",cfg.Backend)
136142
return
@@ -148,9 +154,10 @@ func main() {
148154
varreaderWg sync.WaitGroup
149155

150156
cCommits:=make(chanint)
157+
cFetches:=make(chanint)
151158
cAborts:=make(chanint)
152159

153-
goprogress(cfg.Writers.Num*cfg.Writers.Updates,cCommits,cAborts)
160+
goprogress(cfg.Writers.Num*cfg.IterNum,cCommits,cAborts)
154161

155162
start=time.Now()
156163
writerWg.Add(cfg.Writers.Num)
@@ -162,17 +169,17 @@ func main() {
162169
inconsistency:=false
163170
readerWg.Add(cfg.ReadersNum)
164171
fori:=0;i<cfg.ReadersNum;i++ {
165-
gobackend.reader(&readerWg,&inconsistency)
172+
gobackend.reader(&readerWg,cFetches,&inconsistency)
166173
}
167174

168175
writerWg.Wait()
176+
running=false
177+
readerWg.Wait()
178+
169179
fmt.Printf("writers finished in %0.2f seconds\n",
170180
time.Since(start).Seconds())
171181
fmt.Printf("TPS = %0.2f\n",
172-
float64(cfg.Writers.Num*cfg.Writers.Updates)/time.Since(start).Seconds())
173-
174-
running=false
175-
readerWg.Wait()
182+
float64(cfg.Writers.Num*cfg.IterNum)/time.Since(start).Seconds())
176183

177184
ifinconsistency {
178185
fmt.Printf("INCONSISTENCY DETECTED\n")
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package main
2+
3+
import (
4+
"sync"
5+
"math/rand"
6+
"github.com/jackc/pgx"
7+
)
8+
9+
typeReadersstruct {}
10+
11+
func (tReaders)prepare(connstrs []string) {
12+
varwg sync.WaitGroup
13+
wg.Add(len(connstrs))
14+
for_,connstr:=rangeconnstrs {
15+
got.prepare_one(connstr,&wg)
16+
}
17+
wg.Wait()
18+
}
19+
20+
func (tReaders)prepare_one(connstrstring,wg*sync.WaitGroup) {
21+
dbconf,err:=pgx.ParseDSN(connstr)
22+
checkErr(err)
23+
conn,err:=pgx.Connect(dbconf)
24+
checkErr(err)
25+
deferconn.Close()
26+
27+
ifcfg.UseDtm {
28+
exec(conn,"drop extension if exists pg_dtm")
29+
exec(conn,"create extension pg_dtm")
30+
}
31+
exec(conn,"drop table if exists t cascade")
32+
exec(conn,"create table t(u int primary key, v int)")
33+
exec(conn,"insert into t (select generate_series(0,$1-1), $2)",cfg.AccountsNum,0)
34+
wg.Done()
35+
}
36+
37+
func (tReaders)writer(idint,cCommitschanint,cAbortschanint,wg*sync.WaitGroup) {
38+
varupdates=0
39+
varconns []*pgx.Conn
40+
41+
for_,connstr:=rangecfg.ConnStrs {
42+
dbconf,err:=pgx.ParseDSN(connstr)
43+
checkErr(err)
44+
conn,err:=pgx.Connect(dbconf)
45+
checkErr(err)
46+
deferconn.Close()
47+
conns=append(conns,conn)
48+
}
49+
forupdates<cfg.IterNum {
50+
acc:=rand.Intn(cfg.AccountsNum)
51+
52+
ifcfg.UseDtm {
53+
xid:=execQuery(conns[0],"select dtm_begin_transaction()")
54+
fori:=1;i<len(conns);i++ {
55+
exec(conns[i],"select dtm_join_transaction($1)",xid)
56+
}
57+
}
58+
for_,conn:=rangeconns {
59+
exec(conn,"begin transaction isolation level "+cfg.Isolation)
60+
exec(conn,"update t set v = v + 1 where u=$1",acc)
61+
}
62+
commit(conns...)
63+
updates++
64+
}
65+
// cCommits <- updates
66+
wg.Done()
67+
}
68+
69+
func (tReaders)reader(wg*sync.WaitGroup,cFetcheschanint,inconsistency*bool) {
70+
varfetches=0
71+
varconns []*pgx.Conn
72+
varsumint32=0
73+
74+
for_,connstr:=rangecfg.ConnStrs {
75+
dbconf,err:=pgx.ParseDSN(connstr)
76+
checkErr(err)
77+
conn,err:=pgx.Connect(dbconf)
78+
checkErr(err)
79+
deferconn.Close()
80+
conns=append(conns,conn)
81+
}
82+
forrunning {
83+
acc:=rand.Intn(cfg.AccountsNum)
84+
con:=rand.Intn(len(conns))
85+
sum+=execQuery(conns[con],"select v from t where u=$1",acc)
86+
fetches++
87+
}
88+
// cFetches <- fetches
89+
wg.Done()
90+
}
91+
92+
// vim: expandtab ts=4 sts=4 sw=4

‎contrib/pg_dtm/tests/perf/transfers-fdw.go‎

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,6 @@ func (t TransfersFDW) prepare(connstrs []string) {
1414
varwg sync.WaitGroup
1515
wg.Add(len(connstrs))
1616

17-
iflen(connstrs)<2 {
18-
fmt.Println("ERROR: FDW test needs at leas two connections")
19-
os.Exit(1)
20-
}
21-
iflen(connstrs)<2 {
22-
fmt.Println("ERROR: FDW test need explicit usernames in connection strings")
23-
os.Exit(2)
24-
}
25-
2617
fori,connstr:=rangeconnstrs {
2718
got.prepare_slave(i,connstr,&wg)
2819
}
@@ -37,6 +28,11 @@ func (t TransfersFDW) prepare_slave(id int, connstr string, wg *sync.WaitGroup)
3728
checkErr(err)
3829
deferconn.Close()
3930

31+
iflen(dbconf.User)==0 {
32+
fmt.Println("ERROR: FDW test need explicit usernames in connection strings")
33+
os.Exit(2)
34+
}
35+
4036
ifcfg.UseDtm {
4137
exec(conn,"drop extension if exists pg_dtm")
4238
exec(conn,"create extension pg_dtm")
@@ -94,7 +90,7 @@ func (t TransfersFDW) writer(id int, cCommits chan int, cAborts chan int, wg *sy
9490
deferconn.Close()
9591

9692
start:=time.Now()
97-
formyCommits<cfg.Writers.Updates {
93+
formyCommits<cfg.IterNum {
9894
amount:=1
9995
from_acc:=cfg.Writers.StartId+2*id+1
10096
to_acc:=cfg.Writers.StartId+2*id+2
@@ -128,7 +124,7 @@ func (t TransfersFDW) writer(id int, cCommits chan int, cAborts chan int, wg *sy
128124
wg.Done()
129125
}
130126

131-
func (tTransfersFDW)reader(wg*sync.WaitGroup,inconsistency*bool) {
127+
func (tTransfersFDW)reader(wg*sync.WaitGroup,cFetcheschanint,inconsistency*bool) {
132128
varsumint64
133129
varprevSumint64=0
134130

‎contrib/pg_dtm/tests/perf/transfers.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (t Transfers) writer(id int, cCommits chan int, cAborts chan int, wg *sync.
6363
}
6464

6565
start:=time.Now()
66-
formyCommits<cfg.Writers.Updates {
66+
formyCommits<cfg.IterNum {
6767
amount:=1
6868

6969
from_acc:=cfg.Writers.StartId+2*id+1
@@ -119,7 +119,7 @@ func (t Transfers) writer(id int, cCommits chan int, cAborts chan int, wg *sync.
119119
wg.Done()
120120
}
121121

122-
func (tTransfers)reader(wg*sync.WaitGroup,inconsistency*bool) {
122+
func (tTransfers)reader(wg*sync.WaitGroup,cFetcheschanint,inconsistency*bool) {
123123
varprevSumint64=0
124124

125125
varconns []*pgx.Conn

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp