Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit14bf526

Browse files
committed
transfers-pgshard as a perf backend
1 parentc459061 commit14bf526

File tree

2 files changed

+126
-0
lines changed

2 files changed

+126
-0
lines changed

‎contrib/pg_dtm/tests/perf/perf.go‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ func main() {
137137
backend=new(TransfersFDW)
138138
case"readers":
139139
backend=new(Readers)
140+
case"pgshard":
141+
backend=new(TransfersPgShard)
140142
default:
141143
fmt.Println("No backend named: '%s'\n",cfg.Backend)
142144
return
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
_"github.com/lib/pq"
7+
"database/sql"
8+
"strconv"
9+
"math/rand"
10+
)
11+
12+
typeTransfersPgShardstruct {}
13+
14+
func (tTransfersPgShard)prepare(connstrs []string) {
15+
varwg sync.WaitGroup
16+
wg.Add(len(connstrs)-1)
17+
fori,connstr:=rangeconnstrs[1:] {
18+
got.prepare_slave(i,connstr,&wg)
19+
}
20+
wg.Wait()
21+
t.prepare_master()
22+
}
23+
24+
func (tTransfersPgShard)prepare_slave(idint,connstrstring,wg*sync.WaitGroup) {
25+
conn,err:=sql.Open("postgres",connstr)
26+
checkErr(err)
27+
deferconn.Close()
28+
29+
ifcfg.UseDtm {
30+
_exec(conn,"drop extension if exists pg_dtm --")
31+
_exec(conn,"create extension pg_dtm")
32+
}
33+
34+
drop_sql:=fmt.Sprintf("drop table if exists t_1000%d",id)
35+
_exec(conn,drop_sql)
36+
37+
conn.Close()
38+
wg.Done()
39+
}
40+
41+
func (tTransfersPgShard)prepare_master() {
42+
conn,err:=sql.Open("postgres",cfg.ConnStrs[0])
43+
checkErr(err)
44+
45+
_exec(conn,"drop extension if exists pg_shard CASCADE")
46+
_exec(conn,"create extension pg_shard")
47+
_exec(conn,"drop table if exists t")
48+
_exec(conn,"create table t(u int primary key, v int)")
49+
_exec(conn,"select master_create_distributed_table(table_name := 't', partition_column := 'u')")
50+
51+
master_sql:=fmt.Sprintf(
52+
"select master_create_worker_shards(table_name := 't', shard_count := %d, replication_factor := 1)",
53+
len(cfg.ConnStrs)-1)
54+
_exec(conn,master_sql)
55+
56+
fmt.Println("Database feed started")
57+
fori:=0;i<=cfg.AccountsNum;i++ {
58+
_exec(conn,"insert into t values("+strconv.Itoa(i)+", 0)")
59+
}
60+
fmt.Println("Database feed finished")
61+
62+
conn.Close()
63+
}
64+
65+
func (tTransfersPgShard)writer(idint,cCommitschanint,cAbortschanint,wg*sync.WaitGroup) {
66+
conn,err:=sql.Open("postgres",cfg.ConnStrs[0])
67+
checkErr(err)
68+
69+
i:=0
70+
fori=0;i<cfg.IterNum;i++ {
71+
amount:=1
72+
account1:=rand.Intn(cfg.AccountsNum-1)+1
73+
account2:=rand.Intn(cfg.AccountsNum-1)+1
74+
75+
_exec(conn,"begin")
76+
_exec(conn,fmt.Sprintf("update t set v = v - %d where u=%d",amount,account1))
77+
_exec(conn,fmt.Sprintf("update t set v = v + %d where u=%d",amount,account2))
78+
_exec(conn,"commit")
79+
80+
ifi%1000==0 {
81+
fmt.Printf("%d tx processed.\n",i)
82+
}
83+
}
84+
85+
cCommits<-i
86+
cAborts<-0
87+
88+
conn.Close()
89+
wg.Done()
90+
}
91+
92+
func (tTransfersPgShard)reader(wg*sync.WaitGroup,cFetcheschanint,inconsistency*bool) {
93+
varsumint64
94+
varprevSumint64=0
95+
96+
conn,err:=sql.Open("postgres",cfg.ConnStrs[0])
97+
checkErr(err)
98+
99+
forrunning {
100+
sum=_execQuery(conn,"select sum(v) from t")
101+
ifsum!=prevSum {
102+
fmt.Println("Total = ",sum)
103+
*inconsistency=true
104+
prevSum=sum
105+
}
106+
}
107+
108+
conn.Close()
109+
wg.Done()
110+
}
111+
112+
func_exec(conn*sql.DB,stmtstring) {
113+
varerrerror
114+
_,err=conn.Exec(stmt)
115+
checkErr(err)
116+
}
117+
118+
func_execQuery(conn*sql.DB,stmtstring)int64 {
119+
varerrerror
120+
varresultint64
121+
err=conn.QueryRow(stmt).Scan(&result)
122+
checkErr(err)
123+
returnresult
124+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp