Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7a19770

Browse files
committed
rework transfers.go logic for one node
1 parent9ebaab8 commit7a19770

File tree

1 file changed

+82
-154
lines changed

1 file changed

+82
-154
lines changed

‎contrib/pg_dtm/tests/transfers.go‎

Lines changed: 82 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ var cfg struct {
3434
UseDtmbool
3535
InitOnlybool
3636
SkipInitbool
37+
Parallelbool
3738

3839
Isolationstring// "repeatable read" or "read committed"
3940

@@ -49,6 +50,7 @@ var cfg struct {
4950
Writersstruct {
5051
Numint
5152
Updatesint
53+
StartIdint
5254
AllowGlobalbool
5355
AllowLocalbool
5456
PrivateRowsbool
@@ -105,14 +107,16 @@ func init() {
105107
flag.IntVar(&cfg.Readers.Num,"r",1,"The number of readers")
106108
flag.IntVar(&cfg.Writers.Num,"w",8,"The number of writers")
107109
flag.IntVar(&cfg.Writers.Updates,"u",10000,"The number updates each writer performs")
110+
flag.IntVar(&cfg.Writers.StartId,"k",0,"Script will update rows starting from this value")
108111
flag.BoolVar(&cfg.Verbose,"v",false,"Show progress and other stuff for mortals")
109112
flag.BoolVar(&cfg.UseDtm,"m",false,"Use DTM to keep global consistency")
110113
flag.BoolVar(&cfg.Writers.AllowGlobal,"g",false,"Allow global updates")
111114
flag.BoolVar(&cfg.Writers.AllowLocal,"l",false,"Allow local updates")
112115
flag.BoolVar(&cfg.Writers.PrivateRows,"p",false,"Private rows (avoid waits/aborts caused by concurrent updates of the same rows)")
113116
flag.BoolVar(&cfg.Writers.UseCursors,"c",false,"Use cursors for updates")
114-
flag.BoolVar(&cfg.InitOnly,"f",false,"Only feeddatabses with data")
117+
flag.BoolVar(&cfg.InitOnly,"f",false,"Only feeddatabases with data")
115118
flag.BoolVar(&cfg.SkipInit,"s",false,"Skip init phase")
119+
flag.BoolVar(&cfg.Parallel,"o",false,"Use parallel execs")
116120
flag.Parse()
117121

118122
iflen(cfg.ConnStrs)==0 {
@@ -205,6 +209,31 @@ func commit(conns ...*pgx.Conn) {
205209
wg.Wait()
206210
}
207211

212+
funcparallel_exec(conns []*pgx.Conn,requests []string)bool {
213+
varwg sync.WaitGroup
214+
state:=true
215+
wg.Add(len(conns))
216+
fori:=rangeconns {
217+
ifcfg.Parallel {
218+
gofunc(jint) {
219+
_,err:=conns[j].Exec(requests[j])
220+
iferr!=nil {
221+
state=false
222+
}
223+
wg.Done()
224+
}(i)
225+
}else {
226+
_,err:=conns[i].Exec(requests[i])
227+
iferr!=nil {
228+
state=false
229+
}
230+
wg.Done()
231+
}
232+
}
233+
wg.Wait()
234+
returnstate
235+
}
236+
208237
funcprepare_one(connstrstring,wg*sync.WaitGroup) {
209238
dbconf,err:=pgx.ParseDSN(connstr)
210239
checkErr(err)
@@ -221,23 +250,7 @@ func prepare_one(connstr string, wg *sync.WaitGroup) {
221250
exec(conn,"drop table if exists t")
222251
exec(conn,"create table t(u int primary key, v int)")
223252
exec(conn,"insert into t (select generate_series(0,$1-1), $2)",cfg.Accounts.Num,cfg.Accounts.Balance)
224-
/*
225-
exec(conn, "begin transaction isolation level " + cfg.Isolation)
226253

227-
start := time.Now()
228-
for i := 0; i < cfg.Accounts.Num; i++ {
229-
exec(conn, "insert into t values ($1, $2)", i, cfg.Accounts.Balance)
230-
if time.Since(start).Seconds() > 1 {
231-
if cfg.Verbose {
232-
fmt.Printf(
233-
"inserted %0.2f%%: %d of %d records\n",
234-
float32(i + 1) * 100.0 / float32(cfg.Accounts.Num), i + 1, cfg.Accounts.Num,
235-
)
236-
}
237-
start = time.Now()
238-
}
239-
}
240-
*/
241254
exec(conn,"commit")
242255
wg.Done()
243256
}
@@ -278,6 +291,10 @@ func writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
278291

279292
varconns []*pgx.Conn
280293

294+
iflen(cfg.ConnStrs)==1 {
295+
cfg.ConnStrs.Set(cfg.ConnStrs[0])
296+
}
297+
281298
for_,connstr:=rangecfg.ConnStrs {
282299
dbconf,err:=pgx.ParseDSN(connstr)
283300
checkErr(err)
@@ -293,153 +310,62 @@ func writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
293310
formyCommits<cfg.Writers.Updates {
294311
amount:=1
295312

296-
from_acc:=rand.Intn(cfg.Accounts.Num)
297-
to_acc:=rand.Intn(cfg.Accounts.Num)
298-
299-
ifcfg.Writers.PrivateRows {
300-
from_acc+=id- (from_acc%cfg.Writers.Num)
301-
to_acc+=id- (to_acc%cfg.Writers.Num)
302-
if (from_acc==to_acc) {
303-
to_acc= (from_acc+cfg.Writers.Num)%cfg.Accounts.Num
304-
}
305-
}else {
306-
if (from_acc==to_acc) {
307-
to_acc= (from_acc+1)%cfg.Accounts.Num
308-
}
309-
}
310-
311-
if (from_acc>to_acc) {
312-
from_acc,to_acc=to_acc,from_acc
313-
}
313+
from_acc:=cfg.Writers.StartId+2*id+1
314+
to_acc:=cfg.Writers.StartId+2*id+2
314315

315316
src:=conns[rand.Intn(len(conns))]
316317
dst:=conns[rand.Intn(len(conns))]
317-
318318
ifsrc==dst {
319-
ifcfg.Writers.AllowLocal {
320-
// local update
321-
exec(src,"begin transaction isolation level "+cfg.Isolation)
322-
ok1:=execUpdate(src,"update t set v = v - $1 where u=$2",amount,from_acc)
323-
ok2:=execUpdate(src,"update t set v = v + $1 where u=$2",amount,to_acc)
324-
if!ok1||!ok2 {
325-
exec(src,"rollback")
326-
nAborts+=1
327-
}else {
328-
exec(src,"commit")
329-
nCommits+=1
330-
myCommits+=1
331-
}
332-
}else {
333-
iflen(conns)>1 {
334-
continue
335-
}
336-
337-
// global single-node update
338-
ifcfg.UseDtm {
339-
execQuery(src,"select dtm_begin_transaction()")
340-
}
319+
continue
320+
}
341321

342-
// start transaction
343-
exec(src,"begin transaction isolation level "+cfg.Isolation)
344-
345-
ok:=true
346-
if (cfg.Writers.UseCursors) {
347-
exec(
348-
src,
349-
"declare cur0 cursor for select * from t where u=$1 for update",
350-
from_acc,
351-
)
352-
353-
ok=execUpdate(src,"fetch from cur0")&&ok
354-
355-
ok=execUpdate(
356-
src,"update t set v = v - $1 where current of cur0",
357-
amount,
358-
)&&ok
359-
ok=execUpdate(
360-
src,"update t set v = v + $1 where current of cur0",
361-
amount,
362-
)&&ok
363-
}else {
364-
ok=execUpdate(
365-
src,"update t set v = v - $1 where u=$2",
366-
amount,from_acc,
367-
)&&ok
368-
ok=execUpdate(
369-
src,"update t set v = v + $1 where u=$2",
370-
amount,to_acc,
371-
)&&ok
372-
}
322+
ifcfg.UseDtm {
323+
xid:=execQuery(src,"select dtm_begin_transaction()")
324+
exec(dst,"select dtm_join_transaction($1)",xid)
325+
}
373326

374-
ifok {
375-
commit(src)
376-
nCommits+=1
377-
myCommits+=1
378-
}else {
379-
exec(src,"rollback")
380-
nAborts+=1
381-
}
382-
}
327+
parallel_exec([]*pgx.Conn{src,dst}, []string{"begin transaction isolation level "+cfg.Isolation,"begin transaction isolation level "+cfg.Isolation})
328+
329+
ok:=true
330+
if (cfg.Writers.UseCursors) {
331+
exec(
332+
src,
333+
"declare cur0 cursor for select * from t where u=$1 for update",
334+
from_acc,
335+
)
336+
exec(
337+
dst,
338+
"declare cur0 cursor for select * from t where u=$1 for update",
339+
to_acc,
340+
)
341+
342+
ok=execUpdate(src,"fetch from cur0")&&ok
343+
ok=execUpdate(dst,"fetch from cur0")&&ok
344+
345+
ok=execUpdate(
346+
src,"update t set v = v - $1 where current of cur0",
347+
amount,
348+
)&&ok
349+
ok=execUpdate(
350+
dst,"update t set v = v + $1 where current of cur0",
351+
amount,
352+
)&&ok
383353
}else {
384-
// global update
385-
if!cfg.Writers.AllowGlobal {
386-
// which we do not want
387-
continue
388-
}
389-
390-
ifcfg.UseDtm {
391-
xid:=execQuery(src,"select dtm_begin_transaction()")
392-
exec(dst,"select dtm_join_transaction($1)",xid)
393-
}
394354

395-
// start transaction
396-
exec(src,"begin transaction isolation level "+cfg.Isolation)
397-
exec(dst,"begin transaction isolation level "+cfg.Isolation)
398-
399-
ok:=true
400-
if (cfg.Writers.UseCursors) {
401-
exec(
402-
src,
403-
"declare cur0 cursor for select * from t where u=$1 for update",
404-
from_acc,
405-
)
406-
exec(
407-
dst,
408-
"declare cur0 cursor for select * from t where u=$1 for update",
409-
to_acc,
410-
)
355+
sql1:=fmt.Sprintf("update t set v = v - %d where u=%d",amount,from_acc)
356+
sql2:=fmt.Sprintf("update t set v = v + %d where u=%d",amount,to_acc)
411357

412-
ok=execUpdate(src,"fetch from cur0")&&ok
413-
ok=execUpdate(dst,"fetch from cur0")&&ok
414-
415-
ok=execUpdate(
416-
src,"update t set v = v - $1 where current of cur0",
417-
amount,
418-
)&&ok
419-
ok=execUpdate(
420-
dst,"update t set v = v + $1 where current of cur0",
421-
amount,
422-
)&&ok
423-
}else {
424-
ok=execUpdate(
425-
src,"update t set v = v - $1 where u=$2",
426-
amount,from_acc,
427-
)&&ok
428-
ok=execUpdate(
429-
dst,"update t set v = v + $1 where u=$2",
430-
amount,to_acc,
431-
)&&ok
432-
}
358+
ok=parallel_exec([]*pgx.Conn{src,dst}, []string{sql1,sql2})
359+
}
433360

434-
ifok {
435-
commit(src,dst)
436-
nCommits+=1
437-
myCommits+=1
438-
}else {
439-
exec(src,"rollback")
440-
exec(dst,"rollback")
441-
nAborts+=1
442-
}
361+
ifok {
362+
commit(src,dst)
363+
nCommits+=1
364+
myCommits+=1
365+
}else {
366+
exec(src,"rollback")
367+
exec(dst,"rollback")
368+
nAborts+=1
443369
}
444370

445371
iftime.Since(start).Seconds()>1 {
@@ -471,6 +397,8 @@ func reader(wg *sync.WaitGroup, inconsistency *bool) {
471397
conns=append(conns,conn)
472398
}
473399

400+
401+
474402
forrunning {
475403
varsumint64=0
476404
varxidint32

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp