|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | +"fmt" |
| 5 | +"sync" |
| 6 | + _"github.com/lib/pq" |
| 7 | +"database/sql" |
| 8 | +"strconv" |
| 9 | +"math/rand" |
| 10 | +) |
| 11 | + |
| 12 | +typeTransfersPgShardstruct {} |
| 13 | + |
| 14 | +func (tTransfersPgShard)prepare(connstrs []string) { |
| 15 | +varwg sync.WaitGroup |
| 16 | +wg.Add(len(connstrs)-1) |
| 17 | +fori,connstr:=rangeconnstrs[1:] { |
| 18 | +got.prepare_slave(i,connstr,&wg) |
| 19 | + } |
| 20 | +wg.Wait() |
| 21 | +t.prepare_master() |
| 22 | +} |
| 23 | + |
| 24 | +func (tTransfersPgShard)prepare_slave(idint,connstrstring,wg*sync.WaitGroup) { |
| 25 | +conn,err:=sql.Open("postgres",connstr) |
| 26 | +checkErr(err) |
| 27 | +deferconn.Close() |
| 28 | + |
| 29 | +ifcfg.UseDtm { |
| 30 | +_exec(conn,"drop extension if exists pg_dtm --") |
| 31 | +_exec(conn,"create extension pg_dtm") |
| 32 | + } |
| 33 | + |
| 34 | +drop_sql:=fmt.Sprintf("drop table if exists t_1000%d",id) |
| 35 | +_exec(conn,drop_sql) |
| 36 | + |
| 37 | +conn.Close() |
| 38 | +wg.Done() |
| 39 | +} |
| 40 | + |
| 41 | +func (tTransfersPgShard)prepare_master() { |
| 42 | +conn,err:=sql.Open("postgres",cfg.ConnStrs[0]) |
| 43 | +checkErr(err) |
| 44 | + |
| 45 | +_exec(conn,"drop extension if exists pg_shard CASCADE") |
| 46 | +_exec(conn,"create extension pg_shard") |
| 47 | +_exec(conn,"drop table if exists t") |
| 48 | +_exec(conn,"create table t(u int primary key, v int)") |
| 49 | +_exec(conn,"select master_create_distributed_table(table_name := 't', partition_column := 'u')") |
| 50 | + |
| 51 | +master_sql:=fmt.Sprintf( |
| 52 | +"select master_create_worker_shards(table_name := 't', shard_count := %d, replication_factor := 1)", |
| 53 | +len(cfg.ConnStrs)-1) |
| 54 | +_exec(conn,master_sql) |
| 55 | + |
| 56 | +fmt.Println("Database feed started") |
| 57 | +fori:=0;i<=cfg.AccountsNum;i++ { |
| 58 | +_exec(conn,"insert into t values("+strconv.Itoa(i)+", 0)") |
| 59 | + } |
| 60 | +fmt.Println("Database feed finished") |
| 61 | + |
| 62 | +conn.Close() |
| 63 | +} |
| 64 | + |
| 65 | +func (tTransfersPgShard)writer(idint,cCommitschanint,cAbortschanint,wg*sync.WaitGroup) { |
| 66 | +conn,err:=sql.Open("postgres",cfg.ConnStrs[0]) |
| 67 | +checkErr(err) |
| 68 | + |
| 69 | +i:=0 |
| 70 | +fori=0;i<cfg.IterNum;i++ { |
| 71 | +amount:=1 |
| 72 | +account1:=rand.Intn(cfg.AccountsNum-1)+1 |
| 73 | +account2:=rand.Intn(cfg.AccountsNum-1)+1 |
| 74 | + |
| 75 | +_exec(conn,"begin") |
| 76 | +_exec(conn,fmt.Sprintf("update t set v = v - %d where u=%d",amount,account1)) |
| 77 | +_exec(conn,fmt.Sprintf("update t set v = v + %d where u=%d",amount,account2)) |
| 78 | +_exec(conn,"commit") |
| 79 | + |
| 80 | +ifi%1000==0 { |
| 81 | +fmt.Printf("%d tx processed.\n",i) |
| 82 | + } |
| 83 | + } |
| 84 | + |
| 85 | +cCommits<-i |
| 86 | +cAborts<-0 |
| 87 | + |
| 88 | +conn.Close() |
| 89 | +wg.Done() |
| 90 | +} |
| 91 | + |
| 92 | +func (tTransfersPgShard)reader(wg*sync.WaitGroup,cFetcheschanint,inconsistency*bool) { |
| 93 | +varsumint64 |
| 94 | +varprevSumint64=0 |
| 95 | + |
| 96 | +conn,err:=sql.Open("postgres",cfg.ConnStrs[0]) |
| 97 | +checkErr(err) |
| 98 | + |
| 99 | +forrunning { |
| 100 | +sum=_execQuery(conn,"select sum(v) from t") |
| 101 | +ifsum!=prevSum { |
| 102 | +fmt.Println("Total = ",sum) |
| 103 | +*inconsistency=true |
| 104 | +prevSum=sum |
| 105 | + } |
| 106 | + } |
| 107 | + |
| 108 | +conn.Close() |
| 109 | +wg.Done() |
| 110 | +} |
| 111 | + |
| 112 | +func_exec(conn*sql.DB,stmtstring) { |
| 113 | +varerrerror |
| 114 | +_,err=conn.Exec(stmt) |
| 115 | +checkErr(err) |
| 116 | +} |
| 117 | + |
| 118 | +func_execQuery(conn*sql.DB,stmtstring)int64 { |
| 119 | +varerrerror |
| 120 | +varresultint64 |
| 121 | +err=conn.QueryRow(stmt).Scan(&result) |
| 122 | +checkErr(err) |
| 123 | +returnresult |
| 124 | +} |