Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0e2369f

Browse files
committed
2 parentsc7211e7 +bec183f commit0e2369f

File tree

7 files changed

+338
-23
lines changed

7 files changed

+338
-23
lines changed
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
#include<time.h>
2+
#include<stdio.h>
3+
#include<stdarg.h>
4+
#include<stdlib.h>
5+
#include<inttypes.h>
6+
#include<sys/time.h>
7+
#include<pthread.h>
8+
9+
#include<string>
10+
#include<vector>
11+
12+
#include<pqxx/connection>
13+
#include<pqxx/transaction>
14+
#include<pqxx/nontransaction>
15+
16+
usingnamespacestd;
17+
usingnamespacepqxx;
18+
19+
template<classT>
20+
classunique_ptr
21+
{
22+
T* ptr;
23+
24+
public:
25+
unique_ptr(T* p =NULL) : ptr(p) {}
26+
~unique_ptr() {delete ptr; }
27+
T&operator*() {return *ptr; }
28+
T*operator->() {return ptr; }
29+
voidoperator=(T* p) { ptr = p; }
30+
voidoperator=(unique_ptr& other) {
31+
ptr = other.ptr;
32+
other.ptr =NULL;
33+
}
34+
};
35+
36+
typedefvoid* (*thread_proc_t)(void*);
37+
typedefint64_tcsn_t;
38+
39+
structthread
40+
{
41+
pthread_t t;
42+
size_t proceeded;
43+
int id;
44+
45+
voidstart(int tid,thread_proc_t proc) {
46+
id = tid;
47+
proceeded =0;
48+
pthread_create(&t,NULL, proc,this);
49+
}
50+
51+
voidwait() {
52+
pthread_join(t,NULL);
53+
}
54+
};
55+
56+
structconfig
57+
{
58+
int nReaders;
59+
int nWriters;
60+
int nIterations;
61+
int nAccounts;
62+
vector<string> connections;
63+
64+
config() {
65+
nReaders =1;
66+
nWriters =10;
67+
nIterations =1000;
68+
nAccounts =1000;
69+
}
70+
};
71+
72+
config cfg;
73+
bool running;
74+
75+
#defineUSEC1000000
76+
77+
statictime_tgetCurrentTime()
78+
{
79+
structtimeval tv;
80+
gettimeofday(&tv,NULL);
81+
return (time_t)tv.tv_sec*USEC + tv.tv_usec;
82+
}
83+
84+
85+
voidexec(transaction_base& txn,charconst* sql, ...)
86+
{
87+
va_list args;
88+
va_start(args, sql);
89+
char buf[1024];
90+
vsprintf(buf, sql, args);
91+
va_end(args);
92+
txn.exec(buf);
93+
}
94+
95+
int64_texecQuery( transaction_base& txn,charconst* sql, ...)
96+
{
97+
va_list args;
98+
va_start(args, sql);
99+
char buf[1024];
100+
vsprintf(buf, sql, args);
101+
va_end(args);
102+
result r = txn.exec(buf);
103+
return r[0][0].as(int64_t());
104+
}
105+
106+
void*reader(void* arg)
107+
{
108+
thread& t = *(thread*)arg;
109+
vector< unique_ptr<connection> >conns(cfg.connections.size());
110+
for (size_t i =0; i < conns.size(); i++) {
111+
conns[i] =newconnection(cfg.connections[i]);
112+
}
113+
int64_t prevSum =0;
114+
115+
while (running) {
116+
csn_t snapshot;
117+
vector< unique_ptr<work> >txns(conns.size());
118+
for (size_t i =0; i < conns.size(); i++) {
119+
txns[i] =newwork(*conns[i]);
120+
}
121+
for (size_t i =0; i < txns.size(); i++) {
122+
if (i ==0) {
123+
snapshot =execQuery(*txns[i],"select dtm_extend()");
124+
}else {
125+
snapshot =execQuery(*txns[i],"select dtm_access(%ld)", snapshot);
126+
}
127+
}
128+
int64_t sum =0;
129+
for (size_t i =0; i < txns.size(); i++) {
130+
sum +=execQuery(*txns[i],"select sum(v) from t");
131+
}
132+
if (sum != prevSum) {
133+
printf("Total=%ld snapshot=%ld\n", sum, snapshot);
134+
prevSum = sum;
135+
}
136+
t.proceeded +=1;
137+
}
138+
returnNULL;
139+
}
140+
141+
void*writer(void* arg)
142+
{
143+
thread& t = *(thread*)arg;
144+
vector< unique_ptr<connection> >conns(cfg.connections.size());
145+
for (size_t i =0; i < conns.size(); i++) {
146+
conns[i] =newconnection(cfg.connections[i]);
147+
}
148+
for (int i =0; i < cfg.nIterations; i++)
149+
{
150+
char gtid[32];
151+
int srcCon, dstCon;
152+
int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
153+
int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
154+
155+
sprintf(gtid,"%d.%d", t.id, i);
156+
157+
do {
158+
srcCon =random() % cfg.connections.size();
159+
dstCon =random() % cfg.connections.size();
160+
}while (srcCon == dstCon);
161+
162+
nontransactionsrcTx(*conns[srcCon]);
163+
nontransactiondstTx(*conns[dstCon]);
164+
165+
exec(srcTx,"begin transaction");
166+
exec(dstTx,"begin transaction");
167+
168+
csn_t snapshot =execQuery(srcTx,"select dtm_extend('%s')", gtid);
169+
snapshot =execQuery(dstTx,"select dtm_access(%ld, '%s')", snapshot, gtid);
170+
171+
exec(srcTx,"update t set v = v - 1 where u=%d", srcAcc);
172+
exec(dstTx,"update t set v = v + 1 where u=%d", dstAcc);
173+
174+
exec(srcTx,"prepare transaction '%s'", gtid);
175+
exec(dstTx,"prepare transaction '%s'", gtid);
176+
exec(srcTx,"select dtm_begin_prepare('%s')", gtid);
177+
exec(dstTx,"select dtm_begin_prepare('%s')", gtid);
178+
csn_t csn =execQuery(srcTx,"select dtm_prepare('%s', 0)", gtid);
179+
csn =execQuery(dstTx,"select dtm_prepare('%s', %ld)", gtid, csn);
180+
exec(srcTx,"select dtm_end_prepare('%s', %ld)", gtid, csn);
181+
exec(dstTx,"select dtm_end_prepare('%s', %ld)", gtid, csn);
182+
exec(srcTx,"commit prepared '%s'", gtid);
183+
exec(dstTx,"commit prepared '%s'", gtid);
184+
185+
t.proceeded +=1;
186+
}
187+
returnNULL;
188+
}
189+
190+
voidinitializeDatabase()
191+
{
192+
for (size_t i =0; i < cfg.connections.size(); i++) {
193+
connectionconn(cfg.connections[i]);
194+
worktxn(conn);
195+
exec(txn,"drop extension if exists pg_dtm");
196+
exec(txn,"create extension pg_dtm");
197+
exec(txn,"drop table if exists t");
198+
exec(txn,"create table t(u int primary key, v int)");
199+
exec(txn,"insert into t (select generate_series(0,%d), %d)", cfg.nAccounts,0);
200+
txn.commit();
201+
}
202+
}
203+
204+
intmain (int argc,char* argv[])
205+
{
206+
bool initialize =false;
207+
for (int i =1; i < argc; i++) {
208+
if (argv[i][0] =='-') {
209+
switch (argv[i][1]) {
210+
case'r':
211+
cfg.nReaders =atoi(argv[++i]);
212+
continue;
213+
case'w':
214+
cfg.nWriters =atoi(argv[++i]);
215+
continue;
216+
case'a':
217+
cfg.nAccounts =atoi(argv[++i]);
218+
continue;
219+
case'n':
220+
cfg.nIterations =atoi(argv[++i]);
221+
continue;
222+
case'c':
223+
cfg.connections.push_back(string(argv[++i]));
224+
continue;
225+
case'i':
226+
initialize =true;
227+
continue;
228+
}
229+
}
230+
printf("Options:\n"
231+
"\t-r N\tnumber of readers (1)\n"
232+
"\t-w N\tnumber of writers (10)\n"
233+
"\t-a N\tnumber of accounts (1000)\n"
234+
"\t-n N\tnumber of iterations (1000)\n"
235+
"\t-c STR\tdatabase connection string\n"
236+
"\t-i\tinitialize datanase\n");
237+
return1;
238+
}
239+
if (initialize) {
240+
initializeDatabase();
241+
}
242+
243+
time_t start =getCurrentTime();
244+
running =true;
245+
246+
vector<thread>readers(cfg.nReaders);
247+
vector<thread>writers(cfg.nWriters);
248+
size_t nReads =0;
249+
size_t nWrites =0;
250+
251+
for (int i =0; i < cfg.nReaders; i++) {
252+
readers[i].start(i, reader);
253+
}
254+
for (int i =0; i < cfg.nWriters; i++) {
255+
writers[i].start(i, writer);
256+
}
257+
258+
for (int i =0; i < cfg.nWriters; i++) {
259+
writers[i].wait();
260+
nWrites += writers[i].proceeded;
261+
}
262+
263+
running =false;
264+
265+
for (int i =0; i < cfg.nReaders; i++) {
266+
readers[i].wait();
267+
nReads += writers[i].proceeded;
268+
}
269+
270+
time_t elapsed =getCurrentTime() - start;
271+
printf("TPS(updates)=%f, TPS(selects)=%f\n", (double)(nWrites*USEC)/elapsed, (double)(nReads*USEC)/elapsed);
272+
return0;
273+
}

‎contrib/pg_tsdtm/tests/makefile‎

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CXX=g++
2+
CXXFLAGS=-g -Wall -O2 -pthread
3+
4+
all: dtmbench
5+
6+
dtmbench: dtmbench.cpp
7+
$(CXX)$(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx
8+
9+
clean:
10+
rm -f dtmbench

‎contrib/pg_tsdtm/tests/perf/perf.go‎

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"os"
77
"sync"
88
"time"
9+
"runtime"
910
"github.com/jackc/pgx"
11+
"runtime/pprof"
1012
)
1113

1214
typeConnStrings []string
@@ -28,6 +30,7 @@ var cfg struct {
2830
AccountsNumint
2931
ReadersNumint
3032
IterNumint
33+
Profilestring
3134

3235
Writersstruct {
3336
Numint
@@ -99,6 +102,7 @@ func init() {
99102
"Show progress and other stuff for mortals")
100103
flag.BoolVar(&cfg.Parallel,"p",false,
101104
"Use parallel execs")
105+
flag.StringVar(&cfg.Profile,"cpuprofile","","write cpu profile to file")
102106
repread:=flag.Bool("l",false,
103107
"Use 'repeatable read' isolation level instead of 'read committed'")
104108
flag.Parse()
@@ -129,6 +133,17 @@ func main() {
129133
fmt.Println("ERROR: This test needs at leas two connections")
130134
os.Exit(1)
131135
}
136+
runtime.GOMAXPROCS(100)
137+
138+
ifcfg.Profile!="" {
139+
f,err:=os.Create(cfg.Profile)
140+
iferr!=nil {
141+
fmt.Println("Failed to create profile file")
142+
os.Exit(1)
143+
}
144+
pprof.StartCPUProfile(f)
145+
deferpprof.StopCPUProfile()
146+
}
132147

133148
// switch cfg.Backend {
134149
// case "transfers":

‎contrib/pg_tsdtm/tests/perf/run.sh‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
go run*.go \
2+
-C"dbname=postgres user=knizhnik port=5432 sslmode=disable" \
3+
-C"dbname=postgres user=knizhnik port=5433 sslmode=disable" \
4+
-C"dbname=postgres user=knizhnik port=5434 sslmode=disable" \
5+
-g -n 1000 -a 1000 -w 10 -r 1$*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp