|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | +"fmt" |
| 5 | +"flag" |
| 6 | +"os" |
| 7 | +"sync" |
| 8 | +"math/rand" |
| 9 | +"time" |
| 10 | +"github.com/jackc/pgx" |
| 11 | +) |
| 12 | + |
| 13 | +typeConnStrings []string |
| 14 | + |
| 15 | +// The first method of flag.Value interface |
| 16 | +func (c*ConnStrings)String()string { |
| 17 | +iflen(*c)>0 { |
| 18 | +return (*c)[0] |
| 19 | + }else { |
| 20 | +return"" |
| 21 | + } |
| 22 | +} |
| 23 | + |
| 24 | +// The second method of flag.Value interface |
| 25 | +func (c*ConnStrings)Set(valuestring)error { |
| 26 | +*c=append(*c,value) |
| 27 | +returnnil |
| 28 | +} |
| 29 | + |
| 30 | +varcfgstruct { |
| 31 | +ConnStrsConnStrings |
| 32 | + |
| 33 | +Verbosebool |
| 34 | +UseDtmbool |
| 35 | +InitOnlybool |
| 36 | +SkipInitbool |
| 37 | + |
| 38 | +Isolationstring// "repeatable read" or "read committed" |
| 39 | + |
| 40 | +Timeint |
| 41 | + |
| 42 | +Accountsstruct { |
| 43 | +Numint |
| 44 | +Balanceint |
| 45 | + } |
| 46 | + |
| 47 | +Readersstruct { |
| 48 | +Numint |
| 49 | + } |
| 50 | + |
| 51 | +Writersstruct { |
| 52 | +Numint |
| 53 | + } |
| 54 | +} |
| 55 | +funcappend_with_comma(s*string,xstring) { |
| 56 | +iflen(*s)>0 { |
| 57 | +*s=*s+", "+x |
| 58 | + }else { |
| 59 | +*s=x |
| 60 | + } |
| 61 | +} |
| 62 | + |
| 63 | +funcdump_cfg() { |
| 64 | +fmt.Printf("Connections: %d\n",len(cfg.ConnStrs)) |
| 65 | +for_,cs:=rangecfg.ConnStrs { |
| 66 | +fmt.Printf(" %s\n",cs) |
| 67 | + } |
| 68 | +fmt.Printf("Isolation: %s\n",cfg.Isolation) |
| 69 | +fmt.Printf( |
| 70 | +"Accounts: %d × $%d\n", |
| 71 | +cfg.Accounts.Num,cfg.Accounts.Balance, |
| 72 | + ) |
| 73 | +fmt.Printf("Readers: %d\n",cfg.Readers.Num) |
| 74 | +fmt.Printf("Writers: %d\n",cfg.Writers.Num) |
| 75 | +} |
| 76 | + |
| 77 | +funcinit() { |
| 78 | +flag.Var(&cfg.ConnStrs,"d","Connection string (repeat for multiple connections)") |
| 79 | +repread:=flag.Bool("i",false,"Use 'repeatable read' isolation level instead of 'read committed'") |
| 80 | +flag.IntVar(&cfg.Accounts.Num,"a",100000,"The number of bank accounts") |
| 81 | +flag.IntVar(&cfg.Accounts.Balance,"b",0,"The initial balance of each bank account") |
| 82 | +flag.IntVar(&cfg.Readers.Num,"r",4,"The number of readers") |
| 83 | +flag.IntVar(&cfg.Writers.Num,"w",4,"The number of writers") |
| 84 | +flag.IntVar(&cfg.Time,"t",10,"Time in seconds of running test") |
| 85 | +flag.BoolVar(&cfg.UseDtm,"m",false,"Use DTM to keep global consistency") |
| 86 | +flag.BoolVar(&cfg.InitOnly,"f",false,"Only feed databses with data") |
| 87 | +flag.BoolVar(&cfg.SkipInit,"s",false,"Skip init phase") |
| 88 | +flag.Parse() |
| 89 | + |
| 90 | +iflen(cfg.ConnStrs)==0 { |
| 91 | +flag.PrintDefaults() |
| 92 | +os.Exit(1) |
| 93 | + } |
| 94 | + |
| 95 | +if*repread { |
| 96 | +cfg.Isolation="repeatable read" |
| 97 | + }else { |
| 98 | +cfg.Isolation="read committed" |
| 99 | + } |
| 100 | + |
| 101 | +dump_cfg() |
| 102 | +} |
| 103 | + |
| 104 | + |
| 105 | +funcmain() { |
| 106 | +start:=time.Now() |
| 107 | + |
| 108 | +if (!cfg.SkipInit){ |
| 109 | +prepare(cfg.ConnStrs) |
| 110 | +fmt.Printf("database prepared in %0.2f seconds\n",time.Since(start).Seconds()) |
| 111 | + } |
| 112 | + |
| 113 | +if (cfg.InitOnly) { |
| 114 | +return |
| 115 | + } |
| 116 | + |
| 117 | +varwg sync.WaitGroup |
| 118 | + |
| 119 | +cUpdates:=make(chanint) |
| 120 | +cFetches:=make(chanint) |
| 121 | + |
| 122 | +wg.Add(cfg.Writers.Num+cfg.Readers.Num) |
| 123 | +running=true |
| 124 | + |
| 125 | +fori:=0;i<cfg.Writers.Num;i++ { |
| 126 | +gowriter(&wg,cUpdates) |
| 127 | + } |
| 128 | + |
| 129 | +fori:=0;i<cfg.Readers.Num;i++ { |
| 130 | +goreader(&wg,cFetches) |
| 131 | + } |
| 132 | + |
| 133 | +time.Sleep(time.Duration(cfg.Time)*time.Second) |
| 134 | +running=false |
| 135 | + |
| 136 | +totalUpdates:=0 |
| 137 | +fori:=0;i<cfg.Writers.Num;i++ { |
| 138 | +totalUpdates+=<-cUpdates |
| 139 | + } |
| 140 | + |
| 141 | +totalFetches:=0 |
| 142 | +fori:=0;i<cfg.Readers.Num;i++ { |
| 143 | +totalFetches+=<-cFetches |
| 144 | + } |
| 145 | + |
| 146 | +wg.Wait() |
| 147 | +fmt.Printf("Perform %d updates and %d fetches\n",totalUpdates,totalFetches) |
| 148 | +} |
| 149 | + |
| 150 | +varrunning=false |
| 151 | + |
| 152 | +funcasyncCommit(conn*pgx.Conn,wg*sync.WaitGroup) { |
| 153 | +exec(conn,"commit") |
| 154 | +wg.Done() |
| 155 | +} |
| 156 | + |
| 157 | +funccommit(conns...*pgx.Conn) { |
| 158 | +varwg sync.WaitGroup |
| 159 | +wg.Add(len(conns)) |
| 160 | +for_,conn:=rangeconns { |
| 161 | +goasyncCommit(conn,&wg) |
| 162 | + } |
| 163 | +wg.Wait() |
| 164 | +} |
| 165 | + |
| 166 | +funcprepare_one(connstrstring,wg*sync.WaitGroup) { |
| 167 | +dbconf,err:=pgx.ParseDSN(connstr) |
| 168 | +checkErr(err) |
| 169 | + |
| 170 | +conn,err:=pgx.Connect(dbconf) |
| 171 | +checkErr(err) |
| 172 | + |
| 173 | +deferconn.Close() |
| 174 | + |
| 175 | +ifcfg.UseDtm { |
| 176 | +exec(conn,"drop extension if exists pg_dtm") |
| 177 | +exec(conn,"create extension pg_dtm") |
| 178 | + } |
| 179 | +exec(conn,"drop table if exists t") |
| 180 | +exec(conn,"create table t(u int primary key, v int)") |
| 181 | +exec(conn,"insert into t (select generate_series(0,$1-1), $2)",cfg.Accounts.Num,cfg.Accounts.Balance) |
| 182 | +exec(conn,"commit") |
| 183 | +wg.Done() |
| 184 | +} |
| 185 | + |
| 186 | +funcprepare(connstrs []string) { |
| 187 | +varwg sync.WaitGroup |
| 188 | +wg.Add(len(connstrs)) |
| 189 | +for_,connstr:=rangeconnstrs { |
| 190 | +goprepare_one(connstr,&wg) |
| 191 | + } |
| 192 | +wg.Wait() |
| 193 | +} |
| 194 | + |
| 195 | +funcwriter(wg*sync.WaitGroup,cUpdateschanint) { |
| 196 | +varupdates=0 |
| 197 | +varconns []*pgx.Conn |
| 198 | + |
| 199 | +for_,connstr:=rangecfg.ConnStrs { |
| 200 | +dbconf,err:=pgx.ParseDSN(connstr) |
| 201 | +checkErr(err) |
| 202 | + |
| 203 | +conn,err:=pgx.Connect(dbconf) |
| 204 | +checkErr(err) |
| 205 | + |
| 206 | +deferconn.Close() |
| 207 | +conns=append(conns,conn) |
| 208 | + } |
| 209 | +forrunning { |
| 210 | +acc:=rand.Intn(cfg.Accounts.Num) |
| 211 | +xid:=execQuery(conns[0],"select dtm_begin_transaction()") |
| 212 | +fori:=1;i<len(conns);i++ { |
| 213 | +exec(conns[i],"select dtm_join_transaction($1)",xid) |
| 214 | + } |
| 215 | +for_,conn:=rangeconns { |
| 216 | +exec(conn,"begin transaction isolation level "+cfg.Isolation) |
| 217 | +exec(conn,"update t set v = v + 1 where u=$1",acc) |
| 218 | + } |
| 219 | +commit(conns...) |
| 220 | +updates++ |
| 221 | + } |
| 222 | +cUpdates<-updates |
| 223 | +wg.Done() |
| 224 | +} |
| 225 | + |
| 226 | + |
| 227 | +funcreader(wg*sync.WaitGroup,cFetcheschanint) { |
| 228 | +varfetches=0 |
| 229 | +varconns []*pgx.Conn |
| 230 | +varsumint32=0 |
| 231 | + |
| 232 | +for_,connstr:=rangecfg.ConnStrs { |
| 233 | +dbconf,err:=pgx.ParseDSN(connstr) |
| 234 | +checkErr(err) |
| 235 | + |
| 236 | +conn,err:=pgx.Connect(dbconf) |
| 237 | +checkErr(err) |
| 238 | + |
| 239 | +deferconn.Close() |
| 240 | +conns=append(conns,conn) |
| 241 | + } |
| 242 | +forrunning { |
| 243 | +acc:=rand.Intn(cfg.Accounts.Num) |
| 244 | +con:=rand.Intn(len(conns)) |
| 245 | +sum+=execQuery(conns[con],"select v from t where u=$1",acc) |
| 246 | +fetches++ |
| 247 | + } |
| 248 | +cFetches<-fetches |
| 249 | +wg.Done() |
| 250 | +} |
| 251 | + |
| 252 | +funcexec(conn*pgx.Conn,stmtstring,arguments...interface{}) { |
| 253 | +varerrerror |
| 254 | +// fmt.Println(stmt) |
| 255 | +_,err=conn.Exec(stmt,arguments... ) |
| 256 | +checkErr(err) |
| 257 | +} |
| 258 | + |
| 259 | +funcexecQuery(conn*pgx.Conn,stmtstring,arguments...interface{})int32 { |
| 260 | +varerrerror |
| 261 | +varresultint32 |
| 262 | +err=conn.QueryRow(stmt,arguments...).Scan(&result) |
| 263 | +checkErr(err) |
| 264 | +returnresult |
| 265 | +} |
| 266 | + |
| 267 | +funccheckErr(errerror) { |
| 268 | +iferr!=nil { |
| 269 | +panic(err) |
| 270 | + } |
| 271 | +} |
| 272 | + |
| 273 | +// vim: expandtab ts=4 sts=4 sw=4 |