@@ -34,6 +34,7 @@ var cfg struct {
34
34
UseDtm bool
35
35
InitOnly bool
36
36
SkipInit bool
37
+ Parallel bool
37
38
38
39
Isolation string // "repeatable read" or "read committed"
39
40
@@ -49,6 +50,7 @@ var cfg struct {
49
50
Writers struct {
50
51
Num int
51
52
Updates int
53
+ StartId int
52
54
AllowGlobal bool
53
55
AllowLocal bool
54
56
PrivateRows bool
@@ -105,14 +107,16 @@ func init() {
105
107
flag .IntVar (& cfg .Readers .Num ,"r" ,1 ,"The number of readers" )
106
108
flag .IntVar (& cfg .Writers .Num ,"w" ,8 ,"The number of writers" )
107
109
flag .IntVar (& cfg .Writers .Updates ,"u" ,10000 ,"The number updates each writer performs" )
110
+ flag .IntVar (& cfg .Writers .StartId ,"k" ,0 ,"Script will update rows starting from this value" )
108
111
flag .BoolVar (& cfg .Verbose ,"v" ,false ,"Show progress and other stuff for mortals" )
109
112
flag .BoolVar (& cfg .UseDtm ,"m" ,false ,"Use DTM to keep global consistency" )
110
113
flag .BoolVar (& cfg .Writers .AllowGlobal ,"g" ,false ,"Allow global updates" )
111
114
flag .BoolVar (& cfg .Writers .AllowLocal ,"l" ,false ,"Allow local updates" )
112
115
flag .BoolVar (& cfg .Writers .PrivateRows ,"p" ,false ,"Private rows (avoid waits/aborts caused by concurrent updates of the same rows)" )
113
116
flag .BoolVar (& cfg .Writers .UseCursors ,"c" ,false ,"Use cursors for updates" )
114
- flag .BoolVar (& cfg .InitOnly ,"f" ,false ,"Only feeddatabses with data" )
117
+ flag .BoolVar (& cfg .InitOnly ,"f" ,false ,"Only feeddatabases with data" )
115
118
flag .BoolVar (& cfg .SkipInit ,"s" ,false ,"Skip init phase" )
119
+ flag .BoolVar (& cfg .Parallel ,"o" ,false ,"Use parallel execs" )
116
120
flag .Parse ()
117
121
118
122
if len (cfg .ConnStrs )== 0 {
@@ -205,6 +209,31 @@ func commit(conns ...*pgx.Conn) {
205
209
wg .Wait ()
206
210
}
207
211
212
+ func parallel_exec (conns []* pgx.Conn ,requests []string )bool {
213
+ var wg sync.WaitGroup
214
+ state := true
215
+ wg .Add (len (conns ))
216
+ for i := range conns {
217
+ if cfg .Parallel {
218
+ go func (j int ) {
219
+ _ ,err := conns [j ].Exec (requests [j ])
220
+ if err != nil {
221
+ state = false
222
+ }
223
+ wg .Done ()
224
+ }(i )
225
+ }else {
226
+ _ ,err := conns [i ].Exec (requests [i ])
227
+ if err != nil {
228
+ state = false
229
+ }
230
+ wg .Done ()
231
+ }
232
+ }
233
+ wg .Wait ()
234
+ return state
235
+ }
236
+
208
237
func prepare_one (connstr string ,wg * sync.WaitGroup ) {
209
238
dbconf ,err := pgx .ParseDSN (connstr )
210
239
checkErr (err )
@@ -221,23 +250,7 @@ func prepare_one(connstr string, wg *sync.WaitGroup) {
221
250
exec (conn ,"drop table if exists t" )
222
251
exec (conn ,"create table t(u int primary key, v int)" )
223
252
exec (conn ,"insert into t (select generate_series(0,$1-1), $2)" ,cfg .Accounts .Num ,cfg .Accounts .Balance )
224
- /*
225
- exec(conn, "begin transaction isolation level " + cfg.Isolation)
226
253
227
- start := time.Now()
228
- for i := 0; i < cfg.Accounts.Num; i++ {
229
- exec(conn, "insert into t values ($1, $2)", i, cfg.Accounts.Balance)
230
- if time.Since(start).Seconds() > 1 {
231
- if cfg.Verbose {
232
- fmt.Printf(
233
- "inserted %0.2f%%: %d of %d records\n",
234
- float32(i + 1) * 100.0 / float32(cfg.Accounts.Num), i + 1, cfg.Accounts.Num,
235
- )
236
- }
237
- start = time.Now()
238
- }
239
- }
240
- */
241
254
exec (conn ,"commit" )
242
255
wg .Done ()
243
256
}
@@ -278,6 +291,10 @@ func writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
278
291
279
292
var conns []* pgx.Conn
280
293
294
+ if len (cfg .ConnStrs )== 1 {
295
+ cfg .ConnStrs .Set (cfg .ConnStrs [0 ])
296
+ }
297
+
281
298
for _ ,connstr := range cfg .ConnStrs {
282
299
dbconf ,err := pgx .ParseDSN (connstr )
283
300
checkErr (err )
@@ -293,153 +310,62 @@ func writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
293
310
for myCommits < cfg .Writers .Updates {
294
311
amount := 1
295
312
296
- from_acc := rand .Intn (cfg .Accounts .Num )
297
- to_acc := rand .Intn (cfg .Accounts .Num )
298
-
299
- if cfg .Writers .PrivateRows {
300
- from_acc += id - (from_acc % cfg .Writers .Num )
301
- to_acc += id - (to_acc % cfg .Writers .Num )
302
- if (from_acc == to_acc ) {
303
- to_acc = (from_acc + cfg .Writers .Num )% cfg .Accounts .Num
304
- }
305
- }else {
306
- if (from_acc == to_acc ) {
307
- to_acc = (from_acc + 1 )% cfg .Accounts .Num
308
- }
309
- }
310
-
311
- if (from_acc > to_acc ) {
312
- from_acc ,to_acc = to_acc ,from_acc
313
- }
313
+ from_acc := cfg .Writers .StartId + 2 * id + 1
314
+ to_acc := cfg .Writers .StartId + 2 * id + 2
314
315
315
316
src := conns [rand .Intn (len (conns ))]
316
317
dst := conns [rand .Intn (len (conns ))]
317
-
318
318
if src == dst {
319
- if cfg .Writers .AllowLocal {
320
- // local update
321
- exec (src ,"begin transaction isolation level " + cfg .Isolation )
322
- ok1 := execUpdate (src ,"update t set v = v - $1 where u=$2" ,amount ,from_acc )
323
- ok2 := execUpdate (src ,"update t set v = v + $1 where u=$2" ,amount ,to_acc )
324
- if ! ok1 || ! ok2 {
325
- exec (src ,"rollback" )
326
- nAborts += 1
327
- }else {
328
- exec (src ,"commit" )
329
- nCommits += 1
330
- myCommits += 1
331
- }
332
- }else {
333
- if len (conns )> 1 {
334
- continue
335
- }
336
-
337
- // global single-node update
338
- if cfg .UseDtm {
339
- execQuery (src ,"select dtm_begin_transaction()" )
340
- }
319
+ continue
320
+ }
341
321
342
- // start transaction
343
- exec (src ,"begin transaction isolation level " + cfg .Isolation )
344
-
345
- ok := true
346
- if (cfg .Writers .UseCursors ) {
347
- exec (
348
- src ,
349
- "declare cur0 cursor for select * from t where u=$1 for update" ,
350
- from_acc ,
351
- )
352
-
353
- ok = execUpdate (src ,"fetch from cur0" )&& ok
354
-
355
- ok = execUpdate (
356
- src ,"update t set v = v - $1 where current of cur0" ,
357
- amount ,
358
- )&& ok
359
- ok = execUpdate (
360
- src ,"update t set v = v + $1 where current of cur0" ,
361
- amount ,
362
- )&& ok
363
- }else {
364
- ok = execUpdate (
365
- src ,"update t set v = v - $1 where u=$2" ,
366
- amount ,from_acc ,
367
- )&& ok
368
- ok = execUpdate (
369
- src ,"update t set v = v + $1 where u=$2" ,
370
- amount ,to_acc ,
371
- )&& ok
372
- }
322
+ if cfg .UseDtm {
323
+ xid := execQuery (src ,"select dtm_begin_transaction()" )
324
+ exec (dst ,"select dtm_join_transaction($1)" ,xid )
325
+ }
373
326
374
- if ok {
375
- commit (src )
376
- nCommits += 1
377
- myCommits += 1
378
- }else {
379
- exec (src ,"rollback" )
380
- nAborts += 1
381
- }
382
- }
327
+ parallel_exec ([]* pgx.Conn {src ,dst }, []string {"begin transaction isolation level " + cfg .Isolation ,"begin transaction isolation level " + cfg .Isolation })
328
+
329
+ ok := true
330
+ if (cfg .Writers .UseCursors ) {
331
+ exec (
332
+ src ,
333
+ "declare cur0 cursor for select * from t where u=$1 for update" ,
334
+ from_acc ,
335
+ )
336
+ exec (
337
+ dst ,
338
+ "declare cur0 cursor for select * from t where u=$1 for update" ,
339
+ to_acc ,
340
+ )
341
+
342
+ ok = execUpdate (src ,"fetch from cur0" )&& ok
343
+ ok = execUpdate (dst ,"fetch from cur0" )&& ok
344
+
345
+ ok = execUpdate (
346
+ src ,"update t set v = v - $1 where current of cur0" ,
347
+ amount ,
348
+ )&& ok
349
+ ok = execUpdate (
350
+ dst ,"update t set v = v + $1 where current of cur0" ,
351
+ amount ,
352
+ )&& ok
383
353
}else {
384
- // global update
385
- if ! cfg .Writers .AllowGlobal {
386
- // which we do not want
387
- continue
388
- }
389
-
390
- if cfg .UseDtm {
391
- xid := execQuery (src ,"select dtm_begin_transaction()" )
392
- exec (dst ,"select dtm_join_transaction($1)" ,xid )
393
- }
394
354
395
- // start transaction
396
- exec (src ,"begin transaction isolation level " + cfg .Isolation )
397
- exec (dst ,"begin transaction isolation level " + cfg .Isolation )
398
-
399
- ok := true
400
- if (cfg .Writers .UseCursors ) {
401
- exec (
402
- src ,
403
- "declare cur0 cursor for select * from t where u=$1 for update" ,
404
- from_acc ,
405
- )
406
- exec (
407
- dst ,
408
- "declare cur0 cursor for select * from t where u=$1 for update" ,
409
- to_acc ,
410
- )
355
+ sql1 := fmt .Sprintf ("update t set v = v - %d where u=%d" ,amount ,from_acc )
356
+ sql2 := fmt .Sprintf ("update t set v = v + %d where u=%d" ,amount ,to_acc )
411
357
412
- ok = execUpdate (src ,"fetch from cur0" )&& ok
413
- ok = execUpdate (dst ,"fetch from cur0" )&& ok
414
-
415
- ok = execUpdate (
416
- src ,"update t set v = v - $1 where current of cur0" ,
417
- amount ,
418
- )&& ok
419
- ok = execUpdate (
420
- dst ,"update t set v = v + $1 where current of cur0" ,
421
- amount ,
422
- )&& ok
423
- }else {
424
- ok = execUpdate (
425
- src ,"update t set v = v - $1 where u=$2" ,
426
- amount ,from_acc ,
427
- )&& ok
428
- ok = execUpdate (
429
- dst ,"update t set v = v + $1 where u=$2" ,
430
- amount ,to_acc ,
431
- )&& ok
432
- }
358
+ ok = parallel_exec ([]* pgx.Conn {src ,dst }, []string {sql1 ,sql2 })
359
+ }
433
360
434
- if ok {
435
- commit (src ,dst )
436
- nCommits += 1
437
- myCommits += 1
438
- }else {
439
- exec (src ,"rollback" )
440
- exec (dst ,"rollback" )
441
- nAborts += 1
442
- }
361
+ if ok {
362
+ commit (src ,dst )
363
+ nCommits += 1
364
+ myCommits += 1
365
+ }else {
366
+ exec (src ,"rollback" )
367
+ exec (dst ,"rollback" )
368
+ nAborts += 1
443
369
}
444
370
445
371
if time .Since (start ).Seconds ()> 1 {
@@ -471,6 +397,8 @@ func reader(wg *sync.WaitGroup, inconsistency *bool) {
471
397
conns = append (conns ,conn )
472
398
}
473
399
400
+
401
+
474
402
for running {
475
403
var sum int64 = 0
476
404
var xid int32