Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1e3909c

Browse files
committed
new pg_shard driller
1 parentae9398a commit1e3909c

File tree

1 file changed

+78
-48
lines changed

1 file changed

+78
-48
lines changed

‎contrib/pg_xtm/tests/pg_shard_transfers.go‎

Lines changed: 78 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,38 +3,72 @@ package main
33
import (
44
"fmt"
55
"sync"
6-
// "github.com/jackc/pgx"
7-
"pgx"
6+
_"github.com/jgallagher/go-libpq"
7+
"database/sql"
8+
"strconv"
89
)
910

1011
const (
11-
TRANSFER_CONNECTIONS=1
12+
TRANSFER_CONNECTIONS=8
1213
INIT_AMOUNT=10000
13-
N_ITERATIONS=5000
14-
N_ACCOUNTS=TRANSFER_CONNECTIONS
14+
N_ITERATIONS=10000
15+
N_ACCOUNTS=100//2*TRANSFER_CONNECTIONS
1516
)
1617

17-
varcfg= pgx.ConnConfig{
18-
Host:"127.0.0.1",
19-
Port:5433,
20-
// Database: "postgres",
21-
}
18+
varcfg="host=127.0.0.1 port=5432 sslmode=disable"
19+
varcfg1="host=127.0.0.1 port=5433 sslmode=disable"
20+
varcfg2="host=127.0.0.1 port=5434 sslmode=disable"
2221

2322
varrunning=false
2423

25-
functransfer(idint,wg*sync.WaitGroup) {
26-
varerrerror
27-
varconn*pgx.Conn
24+
funcprepare_db() {
25+
conn1,err:=sql.Open("libpq",cfg1)
26+
checkErr(err)
27+
exec(conn1,"drop table if exists t_10000")
28+
conn1.Close()
29+
30+
conn2,err:=sql.Open("libpq",cfg2)
31+
checkErr(err)
32+
exec(conn2,"drop table if exists t_10001")
33+
conn2.Close()
34+
2835

29-
conn,err=pgx.Connect(cfg)
36+
conn,err:=sql.Open("libpq",cfg)
37+
checkErr(err)
38+
39+
exec(conn,"drop extension if exists pg_shard CASCADE")
40+
exec(conn,"create extension pg_shard")
41+
exec(conn,"drop table if exists t")
42+
exec(conn,"create table t(u int, v int)")
43+
exec(conn,"select master_create_distributed_table(table_name := 't', partition_column := 'u')")
44+
exec(conn,"select master_create_worker_shards(table_name := 't', shard_count := 2, replication_factor := 1)")
45+
46+
fori:=1;i<=N_ACCOUNTS;i++ {
47+
exec(conn,"insert into t values("+strconv.Itoa(i)+",10000)")
48+
}
49+
50+
conn.Close()
51+
}
52+
53+
functransfer(idint,wg*sync.WaitGroup) {
54+
conn,err:=sql.Open("libpq",cfg)
3055
checkErr(err)
3156
deferconn.Close()
3257

58+
uids1:= []int{1,3,4,5,7,8,10,14}
59+
uids2:= []int{2,6,9,11,12,13,18,21}
60+
3361
fori:=0;i<N_ITERATIONS;i++ {
3462
exec(conn,"begin")
35-
exec(conn,"update t_10000 set v = v + 1 where u=3")
36-
exec(conn,"update t_10000 set v = v - 1 where u=4")
63+
exec(conn,"update t set v = v + 1 where u="+strconv.Itoa(uids1[id]))
64+
exec(conn,"update t set v = v - 1 where u="+strconv.Itoa(uids2[id]))
65+
// exec(conn, "update t set v = v + 1 where u=1")
66+
// exec(conn, "update t set v = v - 1 where u=2")
3767
exec(conn,"commit")
68+
69+
ifi%1000==0 {
70+
fmt.Printf("%u tx processed.\n",i)
71+
}
3872
}
3973

4074
wg.Done()
@@ -44,14 +78,14 @@ func inspect(wg *sync.WaitGroup) {
4478
varsumint64
4579
varprevSumint64=0
4680

47-
conn,err:=pgx.Connect(cfg)
81+
conn,err:=sql.Open("libpq",cfg)
4882
checkErr(err)
4983

5084
forrunning {
51-
sum=execQuery(conn,"select sum(v) fromt_10000")
85+
sum=execQuery(conn,"select sum(v) fromt")
5286
ifsum!=prevSum {
53-
fmt.Println("Total = ",sum);
54-
prevSum=sum
87+
fmt.Println("Total = ",sum);
88+
prevSum=sum
5589
}
5690
}
5791

@@ -60,52 +94,47 @@ func inspect(wg *sync.WaitGroup) {
6094
}
6195

6296
funcmain() {
63-
// var transferWg sync.WaitGroup
64-
// var inspectWg sync.WaitGroup
65-
varerrerror
66-
varconn*pgx.Conn
67-
varsint64
97+
vartransferWg sync.WaitGroup
98+
varinspectWg sync.WaitGroup
6899

69-
conn,err=pgx.Connect(cfg)
70-
checkErr(err)
71-
deferconn.Close()
72-
73-
// err = conn.QueryRow("select sum(v) from t_10000").Scan(&s)
74-
// checkErr(err)
100+
prepare_db()
75101

76-
s=execQuery(conn,"select sum(v) from t_10000")
77-
fmt.Println(s)
102+
transferWg.Add(TRANSFER_CONNECTIONS)
103+
fori:=0;i<TRANSFER_CONNECTIONS;i++ {
104+
gotransfer(i,&transferWg)
105+
}
78106

107+
running=true
108+
inspectWg.Add(1)
109+
goinspect(&inspectWg)
79110

80-
//transferWg.Add(TRANSFER_CONNECTIONS)
81-
// for i:=0; i<TRANSFER_CONNECTIONS; i++ {
82-
// go transfer(i, &transferWg)
83-
// }
111+
transferWg.Wait()
112+
running=false
113+
114+
inspectWg.Wait()
84115

85-
// running = true
86-
// inspectWg.Add(1)
87-
// go inspect(&inspectWg)
116+
// conn, err := sql.Open("libpq", cfg)
117+
// checkErr(err)
88118

89-
// transferWg.Wait()
119+
// exec(conn, "begin")
120+
// sum := execQuery(conn, "select sum(v) from t")
121+
// exec(conn, "commit")
90122

91-
// running = false
92-
// inspectWg.Wait()
123+
// fmt.Println(sum)
93124

94125
fmt.Printf("done\n")
95126
}
96127

97-
funcexec(conn*pgx.Conn,stmtstring) {
128+
funcexec(conn*sql.DB,stmtstring) {
98129
varerrerror
99130
_,err=conn.Exec(stmt)
100131
checkErr(err)
101132
}
102133

103-
funcexecQuery(conn*pgx.Conn,stmtstring)int64 {
134+
funcexecQuery(conn*sql.DB,stmtstring)int64 {
104135
varerrerror
105136
varresultint64
106-
// result, err = conn.SimpleQuery(stmt)
107-
// err = conn.QueryRow(stmt).Scan(&result)
108-
err=conn.SimpleQuery(stmt).Scan(&result)
137+
err=conn.QueryRow(stmt).Scan(&result)
109138
checkErr(err)
110139
returnresult
111140
}
@@ -116,3 +145,4 @@ func checkErr(err error) {
116145
}
117146
}
118147

148+

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp