Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2d626c2

Browse files
committed
2 parents9226662 +eeeb868 commit2d626c2

File tree

2 files changed

+93
-96
lines changed

2 files changed

+93
-96
lines changed

‎contrib/pg_xtm/tests/pg_shard_transfers.go‎

Lines changed: 78 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,38 +3,72 @@ package main
33
import (
44
"fmt"
55
"sync"
6-
// "github.com/jackc/pgx"
7-
"pgx"
6+
_"github.com/jgallagher/go-libpq"
7+
"database/sql"
8+
"strconv"
89
)
910

1011
const (
11-
TRANSFER_CONNECTIONS=1
12+
TRANSFER_CONNECTIONS=8
1213
INIT_AMOUNT=10000
13-
N_ITERATIONS=5000
14-
N_ACCOUNTS=TRANSFER_CONNECTIONS
14+
N_ITERATIONS=10000
15+
N_ACCOUNTS=100//2*TRANSFER_CONNECTIONS
1516
)
1617

17-
varcfg= pgx.ConnConfig{
18-
Host:"127.0.0.1",
19-
Port:5433,
20-
// Database: "postgres",
21-
}
18+
varcfg="host=127.0.0.1 port=5432 sslmode=disable"
19+
varcfg1="host=127.0.0.1 port=5433 sslmode=disable"
20+
varcfg2="host=127.0.0.1 port=5434 sslmode=disable"
2221

2322
varrunning=false
2423

25-
functransfer(idint,wg*sync.WaitGroup) {
26-
varerrerror
27-
varconn*pgx.Conn
24+
funcprepare_db() {
25+
conn1,err:=sql.Open("libpq",cfg1)
26+
checkErr(err)
27+
exec(conn1,"drop table if exists t_10000")
28+
conn1.Close()
29+
30+
conn2,err:=sql.Open("libpq",cfg2)
31+
checkErr(err)
32+
exec(conn2,"drop table if exists t_10001")
33+
conn2.Close()
34+
2835

29-
conn,err=pgx.Connect(cfg)
36+
conn,err:=sql.Open("libpq",cfg)
37+
checkErr(err)
38+
39+
exec(conn,"drop extension if exists pg_shard CASCADE")
40+
exec(conn,"create extension pg_shard")
41+
exec(conn,"drop table if exists t")
42+
exec(conn,"create table t(u int, v int)")
43+
exec(conn,"select master_create_distributed_table(table_name := 't', partition_column := 'u')")
44+
exec(conn,"select master_create_worker_shards(table_name := 't', shard_count := 2, replication_factor := 1)")
45+
46+
fori:=1;i<=N_ACCOUNTS;i++ {
47+
exec(conn,"insert into t values("+strconv.Itoa(i)+",10000)")
48+
}
49+
50+
conn.Close()
51+
}
52+
53+
functransfer(idint,wg*sync.WaitGroup) {
54+
conn,err:=sql.Open("libpq",cfg)
3055
checkErr(err)
3156
deferconn.Close()
3257

58+
uids1:= []int{1,3,4,5,7,8,10,14}
59+
uids2:= []int{2,6,9,11,12,13,18,21}
60+
3361
fori:=0;i<N_ITERATIONS;i++ {
3462
exec(conn,"begin")
35-
exec(conn,"update t_10000 set v = v + 1 where u=3")
36-
exec(conn,"update t_10000 set v = v - 1 where u=4")
63+
exec(conn,"update t set v = v + 1 where u="+strconv.Itoa(uids1[id]))
64+
exec(conn,"update t set v = v - 1 where u="+strconv.Itoa(uids2[id]))
65+
// exec(conn, "update t set v = v + 1 where u=1")
66+
// exec(conn, "update t set v = v - 1 where u=2")
3767
exec(conn,"commit")
68+
69+
ifi%1000==0 {
70+
fmt.Printf("%u tx processed.\n",i)
71+
}
3872
}
3973

4074
wg.Done()
@@ -44,14 +78,14 @@ func inspect(wg *sync.WaitGroup) {
4478
varsumint64
4579
varprevSumint64=0
4680

47-
conn,err:=pgx.Connect(cfg)
81+
conn,err:=sql.Open("libpq",cfg)
4882
checkErr(err)
4983

5084
forrunning {
51-
sum=execQuery(conn,"select sum(v) fromt_10000")
85+
sum=execQuery(conn,"select sum(v) fromt")
5286
ifsum!=prevSum {
53-
fmt.Println("Total = ",sum);
54-
prevSum=sum
87+
fmt.Println("Total = ",sum);
88+
prevSum=sum
5589
}
5690
}
5791

@@ -60,52 +94,47 @@ func inspect(wg *sync.WaitGroup) {
6094
}
6195

6296
funcmain() {
63-
// var transferWg sync.WaitGroup
64-
// var inspectWg sync.WaitGroup
65-
varerrerror
66-
varconn*pgx.Conn
67-
varsint64
97+
vartransferWg sync.WaitGroup
98+
varinspectWg sync.WaitGroup
6899

69-
conn,err=pgx.Connect(cfg)
70-
checkErr(err)
71-
deferconn.Close()
72-
73-
// err = conn.QueryRow("select sum(v) from t_10000").Scan(&s)
74-
// checkErr(err)
100+
prepare_db()
75101

76-
s=execQuery(conn,"select sum(v) from t_10000")
77-
fmt.Println(s)
102+
transferWg.Add(TRANSFER_CONNECTIONS)
103+
fori:=0;i<TRANSFER_CONNECTIONS;i++ {
104+
gotransfer(i,&transferWg)
105+
}
78106

107+
running=true
108+
inspectWg.Add(1)
109+
goinspect(&inspectWg)
79110

80-
//transferWg.Add(TRANSFER_CONNECTIONS)
81-
// for i:=0; i<TRANSFER_CONNECTIONS; i++ {
82-
// go transfer(i, &transferWg)
83-
// }
111+
transferWg.Wait()
112+
running=false
113+
114+
inspectWg.Wait()
84115

85-
// running = true
86-
// inspectWg.Add(1)
87-
// go inspect(&inspectWg)
116+
// conn, err := sql.Open("libpq", cfg)
117+
// checkErr(err)
88118

89-
// transferWg.Wait()
119+
// exec(conn, "begin")
120+
// sum := execQuery(conn, "select sum(v) from t")
121+
// exec(conn, "commit")
90122

91-
// running = false
92-
// inspectWg.Wait()
123+
// fmt.Println(sum)
93124

94125
fmt.Printf("done\n")
95126
}
96127

97-
funcexec(conn*pgx.Conn,stmtstring) {
128+
funcexec(conn*sql.DB,stmtstring) {
98129
varerrerror
99130
_,err=conn.Exec(stmt)
100131
checkErr(err)
101132
}
102133

103-
funcexecQuery(conn*pgx.Conn,stmtstring)int64 {
134+
funcexecQuery(conn*sql.DB,stmtstring)int64 {
104135
varerrerror
105136
varresultint64
106-
// result, err = conn.SimpleQuery(stmt)
107-
// err = conn.QueryRow(stmt).Scan(&result)
108-
err=conn.SimpleQuery(stmt).Scan(&result)
137+
err=conn.QueryRow(stmt).Scan(&result)
109138
checkErr(err)
110139
returnresult
111140
}
@@ -116,3 +145,4 @@ func checkErr(err error) {
116145
}
117146
}
118147

148+

‎install_pg_shard_xtm.sh‎

Lines changed: 15 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/sh
22

3-
PG_SHARD_DIR=~/code/pg_shard
3+
PG_SHARD_DIR=~/code/pg_shard_master
44
PG_DIR=~/code/postgresql
55
PG_XTM_DIR=$PG_DIR/contrib/pg_xtm
66

@@ -42,24 +42,23 @@ cd $PG_DIR
4242
./install/bin/initdb -D ./install/data2
4343
./install/bin/initdb -D ./install/data3
4444

45-
46-
sed -i'''s/#port =.*/port = 5433/' ./install/data2/postgresql.conf
47-
sed -i'''s/#port =.*/port = 5434/' ./install/data3/postgresql.conf
48-
4945
sed -i''"s/shared_preload_libraries =.*/shared_preload_libraries = 'pg_dtm,pg_shard'/" ./install/data1/postgresql.conf
50-
sed -i''"s/shared_preload_libraries =.*/shared_preload_libraries = 'pg_dtm,pg_shard'/" ./install/data2/postgresql.conf
51-
sed -i''"s/shared_preload_libraries =.*/shared_preload_libraries = 'pg_dtm,pg_shard'/" ./install/data3/postgresql.conf
46+
sed -i''"s/shared_preload_libraries =.*/shared_preload_libraries = 'pg_dtm'/" ./install/data2/postgresql.conf
47+
sed -i''"s/shared_preload_libraries =.*/shared_preload_libraries = 'pg_dtm'/" ./install/data3/postgresql.conf
5248

53-
sed -i'''s/#fsync =.*/fsync = off/' ./install/data1/postgresql.conf
54-
sed -i'''s/#fsync =.*/fsync = off/' ./install/data2/postgresql.conf
55-
sed -i'''s/#fsync =.*/fsync = off/' ./install/data3/postgresql.conf
49+
echo"port = 5433">> ./install/data2/postgresql.conf
50+
echo"port = 5434">> ./install/data3/postgresql.conf
5651

52+
echo'fsync = off'>> ./install/data1/postgresql.conf
53+
echo'fsync = off'>> ./install/data2/postgresql.conf
54+
echo'fsync = off'>> ./install/data3/postgresql.conf
5755

5856
echo'pg_shard.use_dtm_transactions=1'>> ./install/data1/postgresql.conf
59-
echo'pg_shard.use_dtm_transactions=1'>> ./install/data2/postgresql.conf
60-
echo'pg_shard.use_dtm_transactions=1'>> ./install/data3/postgresql.conf
61-
57+
echo'pg_shard.all_modifications_commutative=1'>> ./install/data1/postgresql.conf
6258

59+
echo"log_statement = 'all'">> ./install/data1/postgresql.conf
60+
echo"log_statement = 'all'">> ./install/data2/postgresql.conf
61+
echo"log_statement = 'all'">> ./install/data3/postgresql.conf
6362

6463
./install/bin/pg_ctl -D ./install/data1 -l ./install/data1/log start
6564
./install/bin/pg_ctl -D ./install/data2 -l ./install/data2/log start
@@ -85,46 +84,14 @@ echo "127.0.0.1 5434" >> ./install/data3/pg_worker_list.conf
8584
./install/bin/createdb`whoami` -p5434
8685

8786

88-
./install/bin/psql -p5433<<SQL
87+
./install/bin/psql -p5432<<SQL
8988
CREATE EXTENSION pg_dtm;
9089
SQL
9190

92-
./install/bin/psql -p5434<<SQL
91+
./install/bin/psql -p5433<<SQL
9392
CREATE EXTENSION pg_dtm;
9493
SQL
9594

96-
./install/bin/psql<<SQL
97-
95+
./install/bin/psql -p 5434<<SQL
9896
CREATE EXTENSION pg_dtm;
99-
CREATE EXTENSION pg_shard;
100-
CREATE TABLE t(u int, v int);
101-
SELECT master_create_distributed_table(table_name := 't', partition_column := 'u');
102-
SELECT master_create_worker_shards(table_name := 't', shard_count := 8, replication_factor := 1);
103-
insert into t values(1,10000);
104-
insert into t values(2,10000);
105-
insert into t values(3,10000);
106-
insert into t values(4,10000);
107-
insert into t values(5,10000);
108-
insert into t values(6,10000);
109-
insert into t values(7,10000);
110-
insert into t values(8,10000);
111-
11297
SQL
113-
114-
115-
116-
117-
# insert into t (select generate_series(0,10), random()::integer);
118-
119-
# cd contrib/pg_xtm/dtmd
120-
# make clean
121-
# make
122-
# rm -rf /tmp/clog/*
123-
# ./bin/dtmd
124-
125-
126-
127-
128-
129-
130-

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp