Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit21e563f

Browse files
committed
Merge branch 'xtm' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into xtm
2 parents7a19770 +88a60f5 commit21e563f

File tree

3 files changed

+282
-0
lines changed

3 files changed

+282
-0
lines changed

‎contrib/pg_dtm/docs/DTM.odp‎

962 KB
Binary file not shown.

‎contrib/pg_dtm/tests/benchmark.go‎

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"flag"
6+
"os"
7+
"sync"
8+
"math/rand"
9+
"time"
10+
"github.com/jackc/pgx"
11+
)
12+
13+
typeConnStrings []string
14+
15+
// The first method of flag.Value interface
16+
func (c*ConnStrings)String()string {
17+
iflen(*c)>0 {
18+
return (*c)[0]
19+
}else {
20+
return""
21+
}
22+
}
23+
24+
// The second method of flag.Value interface
25+
func (c*ConnStrings)Set(valuestring)error {
26+
*c=append(*c,value)
27+
returnnil
28+
}
29+
30+
varcfgstruct {
31+
ConnStrsConnStrings
32+
33+
Verbosebool
34+
UseDtmbool
35+
InitOnlybool
36+
SkipInitbool
37+
38+
Isolationstring// "repeatable read" or "read committed"
39+
40+
Timeint
41+
42+
Accountsstruct {
43+
Numint
44+
Balanceint
45+
}
46+
47+
Readersstruct {
48+
Numint
49+
}
50+
51+
Writersstruct {
52+
Numint
53+
}
54+
}
55+
funcappend_with_comma(s*string,xstring) {
56+
iflen(*s)>0 {
57+
*s=*s+", "+x
58+
}else {
59+
*s=x
60+
}
61+
}
62+
63+
funcdump_cfg() {
64+
fmt.Printf("Connections: %d\n",len(cfg.ConnStrs))
65+
for_,cs:=rangecfg.ConnStrs {
66+
fmt.Printf(" %s\n",cs)
67+
}
68+
fmt.Printf("Isolation: %s\n",cfg.Isolation)
69+
fmt.Printf(
70+
"Accounts: %d × $%d\n",
71+
cfg.Accounts.Num,cfg.Accounts.Balance,
72+
)
73+
fmt.Printf("Readers: %d\n",cfg.Readers.Num)
74+
fmt.Printf("Writers: %d\n",cfg.Writers.Num)
75+
}
76+
77+
funcinit() {
78+
flag.Var(&cfg.ConnStrs,"d","Connection string (repeat for multiple connections)")
79+
repread:=flag.Bool("i",false,"Use 'repeatable read' isolation level instead of 'read committed'")
80+
flag.IntVar(&cfg.Accounts.Num,"a",100000,"The number of bank accounts")
81+
flag.IntVar(&cfg.Accounts.Balance,"b",0,"The initial balance of each bank account")
82+
flag.IntVar(&cfg.Readers.Num,"r",4,"The number of readers")
83+
flag.IntVar(&cfg.Writers.Num,"w",4,"The number of writers")
84+
flag.IntVar(&cfg.Time,"t",10,"Time in seconds of running test")
85+
flag.BoolVar(&cfg.UseDtm,"m",false,"Use DTM to keep global consistency")
86+
flag.BoolVar(&cfg.InitOnly,"f",false,"Only feed databses with data")
87+
flag.BoolVar(&cfg.SkipInit,"s",false,"Skip init phase")
88+
flag.Parse()
89+
90+
iflen(cfg.ConnStrs)==0 {
91+
flag.PrintDefaults()
92+
os.Exit(1)
93+
}
94+
95+
if*repread {
96+
cfg.Isolation="repeatable read"
97+
}else {
98+
cfg.Isolation="read committed"
99+
}
100+
101+
dump_cfg()
102+
}
103+
104+
105+
funcmain() {
106+
start:=time.Now()
107+
108+
if (!cfg.SkipInit){
109+
prepare(cfg.ConnStrs)
110+
fmt.Printf("database prepared in %0.2f seconds\n",time.Since(start).Seconds())
111+
}
112+
113+
if (cfg.InitOnly) {
114+
return
115+
}
116+
117+
varwg sync.WaitGroup
118+
119+
cUpdates:=make(chanint)
120+
cFetches:=make(chanint)
121+
122+
wg.Add(cfg.Writers.Num+cfg.Readers.Num)
123+
running=true
124+
125+
fori:=0;i<cfg.Writers.Num;i++ {
126+
gowriter(&wg,cUpdates)
127+
}
128+
129+
fori:=0;i<cfg.Readers.Num;i++ {
130+
goreader(&wg,cFetches)
131+
}
132+
133+
time.Sleep(time.Duration(cfg.Time)*time.Second)
134+
running=false
135+
136+
totalUpdates:=0
137+
fori:=0;i<cfg.Writers.Num;i++ {
138+
totalUpdates+=<-cUpdates
139+
}
140+
141+
totalFetches:=0
142+
fori:=0;i<cfg.Readers.Num;i++ {
143+
totalFetches+=<-cFetches
144+
}
145+
146+
wg.Wait()
147+
fmt.Printf("Perform %d updates and %d fetches\n",totalUpdates,totalFetches)
148+
}
149+
150+
varrunning=false
151+
152+
funcasyncCommit(conn*pgx.Conn,wg*sync.WaitGroup) {
153+
exec(conn,"commit")
154+
wg.Done()
155+
}
156+
157+
funccommit(conns...*pgx.Conn) {
158+
varwg sync.WaitGroup
159+
wg.Add(len(conns))
160+
for_,conn:=rangeconns {
161+
goasyncCommit(conn,&wg)
162+
}
163+
wg.Wait()
164+
}
165+
166+
funcprepare_one(connstrstring,wg*sync.WaitGroup) {
167+
dbconf,err:=pgx.ParseDSN(connstr)
168+
checkErr(err)
169+
170+
conn,err:=pgx.Connect(dbconf)
171+
checkErr(err)
172+
173+
deferconn.Close()
174+
175+
ifcfg.UseDtm {
176+
exec(conn,"drop extension if exists pg_dtm")
177+
exec(conn,"create extension pg_dtm")
178+
}
179+
exec(conn,"drop table if exists t")
180+
exec(conn,"create table t(u int primary key, v int)")
181+
exec(conn,"insert into t (select generate_series(0,$1-1), $2)",cfg.Accounts.Num,cfg.Accounts.Balance)
182+
exec(conn,"commit")
183+
wg.Done()
184+
}
185+
186+
funcprepare(connstrs []string) {
187+
varwg sync.WaitGroup
188+
wg.Add(len(connstrs))
189+
for_,connstr:=rangeconnstrs {
190+
goprepare_one(connstr,&wg)
191+
}
192+
wg.Wait()
193+
}
194+
195+
funcwriter(wg*sync.WaitGroup,cUpdateschanint) {
196+
varupdates=0
197+
varconns []*pgx.Conn
198+
199+
for_,connstr:=rangecfg.ConnStrs {
200+
dbconf,err:=pgx.ParseDSN(connstr)
201+
checkErr(err)
202+
203+
conn,err:=pgx.Connect(dbconf)
204+
checkErr(err)
205+
206+
deferconn.Close()
207+
conns=append(conns,conn)
208+
}
209+
forrunning {
210+
acc:=rand.Intn(cfg.Accounts.Num)
211+
xid:=execQuery(conns[0],"select dtm_begin_transaction()")
212+
fori:=1;i<len(conns);i++ {
213+
exec(conns[i],"select dtm_join_transaction($1)",xid)
214+
}
215+
for_,conn:=rangeconns {
216+
exec(conn,"begin transaction isolation level "+cfg.Isolation)
217+
exec(conn,"update t set v = v + 1 where u=$1",acc)
218+
}
219+
commit(conns...)
220+
updates++
221+
}
222+
cUpdates<-updates
223+
wg.Done()
224+
}
225+
226+
227+
funcreader(wg*sync.WaitGroup,cFetcheschanint) {
228+
varfetches=0
229+
varconns []*pgx.Conn
230+
varsumint32=0
231+
232+
for_,connstr:=rangecfg.ConnStrs {
233+
dbconf,err:=pgx.ParseDSN(connstr)
234+
checkErr(err)
235+
236+
conn,err:=pgx.Connect(dbconf)
237+
checkErr(err)
238+
239+
deferconn.Close()
240+
conns=append(conns,conn)
241+
}
242+
forrunning {
243+
acc:=rand.Intn(cfg.Accounts.Num)
244+
con:=rand.Intn(len(conns))
245+
sum+=execQuery(conns[con],"select v from t where u=$1",acc)
246+
fetches++
247+
}
248+
cFetches<-fetches
249+
wg.Done()
250+
}
251+
252+
funcexec(conn*pgx.Conn,stmtstring,arguments...interface{}) {
253+
varerrerror
254+
// fmt.Println(stmt)
255+
_,err=conn.Exec(stmt,arguments... )
256+
checkErr(err)
257+
}
258+
259+
funcexecQuery(conn*pgx.Conn,stmtstring,arguments...interface{})int32 {
260+
varerrerror
261+
varresultint32
262+
err=conn.QueryRow(stmt,arguments...).Scan(&result)
263+
checkErr(err)
264+
returnresult
265+
}
266+
267+
funccheckErr(errerror) {
268+
iferr!=nil {
269+
panic(err)
270+
}
271+
}
272+
273+
// vim: expandtab ts=4 sts=4 sw=4

‎contrib/pg_dtm/tests/benchmark.sh‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#!/bin/sh
2+
3+
go run benchmark.go \
4+
-d'dbname=postgres port=5432' \
5+
-d'dbname=postgres port=5433' \
6+
-m \
7+
-w 4 \
8+
-r 4 \
9+
-t 10

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp