Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitecaba19

Browse files
kvapkelvich
authored andcommitted
Actualize daemons.go and dtmbench for multimaster. Works partially :(
1 parentcf201db commitecaba19

File tree

4 files changed

+137
-23
lines changed

4 files changed

+137
-23
lines changed

‎contrib/arbiter/Makefile‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
top_builddir = ../..
22
include$(top_builddir)/src/Makefile.global
33

4-
overrideCFLAGS += -fPIC
4+
overrideCFLAGS += -fPIC -O0
55
overrideCPPFLAGS += -I. -Iinclude -Iapi -D_LARGEFILE64_SOURCE
66

77
AR = ar

‎contrib/multimaster/tests/daemons.go‎

Lines changed: 103 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package main
22

33
import (
44
"log"
5+
"fmt"
56
"os/exec"
67
"io"
78
"bufio"
89
"sync"
10+
"flag"
911
"os"
1012
"strconv"
1113
"strings"
14+
"time"
1215
)
1316

1417
funcread_to_channel(r io.Reader,cchanstring,wg*sync.WaitGroup) {
@@ -56,9 +59,24 @@ func cmd_to_channel(argv []string, name string, out chan string) {
5659
log.Printf("'%s' finished\n",name)
5760
}
5861

59-
funcdtmd(binstring,wg*sync.WaitGroup) {
60-
argv:= []string{bin}
61-
name:="dtmd"
62+
const (
63+
DtmHost="127.0.0.1"
64+
DtmPort=5431
65+
PgPort=5432
66+
)
67+
68+
funcarbiter(binstring,datadirstring,servers []string,idint,wg*sync.WaitGroup) {
69+
argv:= []string{
70+
bin,
71+
"-d",datadir,
72+
"-i",strconv.Itoa(id),
73+
}
74+
for_,server:=rangeservers {
75+
argv=append(argv,"-r",server)
76+
}
77+
log.Println(argv)
78+
79+
name:="arbiter "+datadir
6280
c:=make(chanstring)
6381

6482
gocmd_to_channel(argv,name,c)
@@ -70,6 +88,21 @@ func dtmd(bin string, wg *sync.WaitGroup) {
7088
wg.Done()
7189
}
7290

91+
funcappendfile(filenamestring,lines...string) {
92+
f,err:=os.OpenFile(filename,os.O_APPEND|os.O_WRONLY,0600)
93+
iferr!=nil {
94+
log.Fatal(err)
95+
}
96+
97+
deferf.Close()
98+
99+
for_,l:=rangelines {
100+
if_,err=f.WriteString(l+"\n");err!=nil {
101+
log.Fatal(err)
102+
}
103+
}
104+
}
105+
73106
funcinitdb(binstring,datadirstring) {
74107
iferr:=os.RemoveAll(datadir);err!=nil {
75108
log.Fatal(err)
@@ -84,20 +117,45 @@ func initdb(bin string, datadir string) {
84117
fors:=rangec {
85118
log.Printf("[%s] %s\n",name,s)
86119
}
120+
121+
appendfile(
122+
datadir+"/pg_hba.conf",
123+
"local replication all trust",
124+
"host replication all 127.0.0.1/32 trust",
125+
"host replication all ::1/128 trust",
126+
)
127+
}
128+
129+
funcinitarbiter(arbiterdirstring) {
130+
iferr:=os.RemoveAll(arbiterdir);err!=nil {
131+
log.Fatal(err)
132+
}
133+
iferr:=os.MkdirAll(arbiterdir,os.ModeDir|0777);err!=nil {
134+
log.Fatal(err)
135+
}
87136
}
88137

89-
funcpostgres(binstring,datadirstring,portint,nodeidint,wg*sync.WaitGroup) {
138+
funcpostgres(binstring,datadirstring,postgresi []string,arbiters []string,portint,nodeidint,wg*sync.WaitGroup) {
90139
argv:= []string{
91140
bin,
92141
"-D",datadir,
93142
"-p",strconv.Itoa(port),
94-
"-c","dtm.buffer_size=65536",
95-
"-c","dtm.host=127.0.0.1",
96-
"-c","dtm.port="+strconv.Itoa(5431),
143+
"-c","multimaster.buffer_size=65536",
144+
"-c","multimaster.conn_strings="+strings.Join(postgresi,","),
145+
"-c","multimaster.node_id="+strconv.Itoa(nodeid+1),
146+
"-c","multimaster.arbiters="+strings.Join(arbiters,","),
147+
"-c","multimaster.workers=8",
148+
"-c","multimaster.queue_size=1073741824",
149+
"-c","wal_level=logical",
150+
"-c","wal_sender_timeout=0",
151+
"-c","max_wal_senders=10",
152+
"-c","max_worker_processes=100",
153+
"-c","max_replication_slots=10",
97154
"-c","autovacuum=off",
98155
"-c","fsync=off",
99-
"-c","synchronous_commit=off",
100-
"-c","shared_preload_libraries=pg_dtm",
156+
"-c","synchronous_commit=on",
157+
"-c","max_connections=200",
158+
"-c","shared_preload_libraries=multimaster",
101159
}
102160
name:="postgres "+datadir
103161
c:=make(chanstring)
@@ -138,32 +196,60 @@ func get_prefix(srcroot string) string {
138196
return"."
139197
}
140198

199+
vardoInitDbbool=false
200+
funcinit() {
201+
flag.BoolVar(&doInitDb,"i",false,"perform initdb")
202+
flag.Parse()
203+
}
204+
141205
funcmain() {
142206
srcroot:="../../.."
143207
prefix:=get_prefix(srcroot)
144208

145209
bin:=map[string]string{
146-
"dtmd":srcroot+"/contrib/pg_dtm/dtmd/bin/dtmd",
210+
"arbiter":srcroot+"/contrib/arbiter/bin/arbiter",
147211
"initdb":prefix+"/bin/initdb",
148212
"postgres":prefix+"/bin/postgres",
149213
}
150214

151-
datadirs:= []string{"/tmp/data1","/tmp/data2","/tmp/data3"}
215+
datadirs:= []string{"/tmp/data0","/tmp/data1","/tmp/data2"}
216+
//arbiterdirs := []string{"/tmp/arbiter0", "/tmp/arbiter1", "/tmp/arbiter2"}
217+
arbiterdirs:= []string{"/tmp/arbiter0"}
152218

153219
check_bin(&bin);
154220

155-
for_,datadir:=rangedatadirs {
156-
initdb(bin["initdb"],datadir)
221+
ifdoInitDb {
222+
for_,datadir:=rangedatadirs {
223+
initdb(bin["initdb"],datadir)
224+
}
225+
for_,arbiterdir:=rangearbiterdirs {
226+
initarbiter(arbiterdir)
227+
}
157228
}
158229

159230
varwg sync.WaitGroup
160231

161-
wg.Add(1)
162-
godtmd(bin["dtmd"],&wg)
232+
vararbiters []string
233+
fori:=rangearbiterdirs {
234+
arbiters=append(arbiters,DtmHost+":"+strconv.Itoa(DtmPort-i))
235+
}
236+
fori,dir:=rangearbiterdirs {
237+
wg.Add(1)
238+
goarbiter(bin["arbiter"],dir,arbiters,i,&wg)
239+
}
240+
241+
time.Sleep(3*time.Second)
163242

164-
fori,datadir:=rangedatadirs {
243+
varpostgresi []string
244+
fori:=rangedatadirs {
245+
postgresi=append(
246+
postgresi,
247+
fmt.Sprintf("dbname=postgres host=127.0.0.1 port=%d sslmode=disable",PgPort+i),
248+
)
249+
}
250+
fori,dir:=rangedatadirs {
165251
wg.Add(1)
166-
gopostgres(bin["postgres"],datadir,5432+i,i,&wg)
252+
gopostgres(bin["postgres"],dir,postgresi,arbiters,PgPort+i,i,&wg)
167253
}
168254

169255
wg.Wait()

‎contrib/multimaster/tests/dtmbench.cpp‎

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,38 @@ void* writer(void* arg)
178178

179179
voidinitializeDatabase()
180180
{
181+
//for (size_t i = 0; i < cfg.connections.size(); i++) {
182+
//printf("creating extension %lu\n", i);
183+
//try {
184+
//connection conn(cfg.connections[i]);
185+
//work txn(conn);
186+
////exec(txn, "drop extension if exists multimaster");
187+
//exec(txn, "create extension multimaster");
188+
//txn.commit();
189+
//} catch (pqxx_exception const& x) {
190+
//i -= 1;
191+
//continue;
192+
//}
193+
//printf("extension %lu created\n", i);
194+
//}
195+
196+
printf("creating table t\n");
181197
connectionconn(cfg.connections[0]);
182-
worktxn(conn);
183-
exec(txn,"insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1,0);
184-
txn.commit();
198+
{
199+
worktxn(conn);
200+
exec(txn,"drop table if exists t");
201+
exec(txn,"create table t(u int primary key, v int)");
202+
txn.commit();
203+
}
204+
printf("table t created\n");
205+
206+
printf("inserting stuff into t\n");
207+
{
208+
worktxn(conn);
209+
exec(txn,"insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1,0);
210+
txn.commit();
211+
}
212+
printf("stuff inserted\n");
185213
}
186214

187215
intmain (int argc,char* argv[])

‎contrib/multimaster/tests/makefile‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ CXXFLAGS=-g -Wall -O2 -pthread
44
all: dtmbench
55

66
dtmbench: dtmbench.cpp
7-
$(CXX)$(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx
7+
$(CXX)$(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx -lpq
88

99
clean:
10-
rm -f dtmbench
10+
rm -f dtmbench

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp