Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit19d012c

Browse files
committed
Add dtmbench
1 parent0dbd7be commit19d012c

File tree

3 files changed

+307
-0
lines changed

3 files changed

+307
-0
lines changed

‎tests/dtmbench.cpp‎

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
#include<time.h>
2+
#include<stdio.h>
3+
#include<stdarg.h>
4+
#include<stdlib.h>
5+
#include<inttypes.h>
6+
#include<sys/time.h>
7+
#include<pthread.h>
8+
9+
#include<string>
10+
#include<vector>
11+
12+
#include<pqxx/connection>
13+
#include<pqxx/transaction>
14+
#include<pqxx/nontransaction>
15+
#include<pqxx/pipeline>
16+
17+
usingnamespacestd;
18+
usingnamespacepqxx;
19+
20+
template<classT>
21+
classunique_ptr
22+
{
23+
T* ptr;
24+
25+
public:
26+
unique_ptr(T* p =NULL) : ptr(p) {}
27+
~unique_ptr() {delete ptr; }
28+
T&operator*() {return *ptr; }
29+
T*operator->() {return ptr; }
30+
voidoperator=(T* p) { ptr = p; }
31+
voidoperator=(unique_ptr& other) {
32+
ptr = other.ptr;
33+
other.ptr =NULL;
34+
}
35+
};
36+
37+
typedefvoid* (*thread_proc_t)(void*);
38+
typedefuint32_txid_t;
39+
40+
structthread
41+
{
42+
pthread_t t;
43+
size_t proceeded;
44+
size_t aborts;
45+
int id;
46+
47+
voidstart(int tid,thread_proc_t proc) {
48+
id = tid;
49+
proceeded =0;
50+
aborts =0;
51+
pthread_create(&t,NULL, proc,this);
52+
}
53+
54+
voidwait() {
55+
pthread_join(t,NULL);
56+
}
57+
};
58+
59+
structconfig
60+
{
61+
int nReaders;
62+
int nWriters;
63+
int nIterations;
64+
int nAccounts;
65+
charconst* isolationLevel;
66+
vector<string> connections;
67+
68+
config() {
69+
nReaders =1;
70+
nWriters =10;
71+
nIterations =1000;
72+
nAccounts =1000;
73+
isolationLevel ="read committed";
74+
}
75+
};
76+
77+
config cfg;
78+
bool running;
79+
80+
#defineUSEC1000000
81+
82+
statictime_tgetCurrentTime()
83+
{
84+
structtimeval tv;
85+
gettimeofday(&tv,NULL);
86+
return (time_t)tv.tv_sec*USEC + tv.tv_usec;
87+
}
88+
89+
90+
voidexec(transaction_base& txn,charconst* sql, ...)
91+
{
92+
va_list args;
93+
va_start(args, sql);
94+
char buf[1024];
95+
vsprintf(buf, sql, args);
96+
va_end(args);
97+
txn.exec(buf);
98+
}
99+
100+
xid_texecQuery( transaction_base& txn,charconst* sql, ...)
101+
{
102+
va_list args;
103+
va_start(args, sql);
104+
char buf[1024];
105+
vsprintf(buf, sql, args);
106+
va_end(args);
107+
result r = txn.exec(buf);
108+
return r[0][0].as(xid_t());
109+
}
110+
111+
void*reader(void* arg)
112+
{
113+
thread& t = *(thread*)arg;
114+
vector< unique_ptr<connection> >conns(cfg.connections.size());
115+
for (size_t i =0; i < conns.size(); i++) {
116+
conns[i] =newconnection(cfg.connections[i]);
117+
}
118+
int64_t prevSum =0;
119+
120+
while (running) {
121+
xid_t xid =0;
122+
for (size_t i =0; i < conns.size(); i++) {
123+
worktxn(*conns[i]);
124+
if (i ==0) {
125+
xid =execQuery(txn,"select dtm_begin_transaction()");
126+
}else {
127+
exec(txn,"select dtm_join_transaction(%u)", xid);
128+
}
129+
txn.commit();
130+
}
131+
vector< unique_ptr<nontransaction> >txns(conns.size());
132+
vector< unique_ptr<pipeline> >pipes(conns.size());
133+
vector<pipeline::query_id>results(conns.size());
134+
for (size_t i =0; i < conns.size(); i++) {
135+
txns[i] =newnontransaction(*conns[i]);
136+
pipes[i] =newpipeline(*txns[i]);
137+
results[i] = pipes[i]->insert("select sum(v) from t");
138+
}
139+
int64_t sum =0;
140+
for (size_t i =0; i < conns.size(); i++) {
141+
pipes[i]->complete();
142+
result r = pipes[i]->retrieve(results[i]);
143+
sum += r[0][0].as(int64_t());
144+
}
145+
if (sum != prevSum) {
146+
printf("Total=%ld xid=%u\n", sum, xid);
147+
prevSum = sum;
148+
}
149+
t.proceeded +=1;
150+
}
151+
returnNULL;
152+
}
153+
154+
void*writer(void* arg)
155+
{
156+
thread& t = *(thread*)arg;
157+
vector< unique_ptr<connection> >conns(cfg.connections.size());
158+
for (size_t i =0; i < conns.size(); i++) {
159+
conns[i] =newconnection(cfg.connections[i]);
160+
}
161+
for (int i =0; i < cfg.nIterations; i++)
162+
{
163+
int srcCon, dstCon;
164+
int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
165+
int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
166+
167+
do {
168+
srcCon =random() % cfg.connections.size();
169+
dstCon =random() % cfg.connections.size();
170+
}while (srcCon == dstCon);
171+
172+
nontransactionsrcTx(*conns[srcCon]);
173+
nontransactiondstTx(*conns[dstCon]);
174+
175+
xid_t xid =execQuery(srcTx,"select dtm_begin_transaction()");
176+
exec(dstTx,"select dtm_join_transaction(%u)", xid);
177+
178+
exec(srcTx,"begin transaction isolation level %s", cfg.isolationLevel);
179+
exec(dstTx,"begin transaction isolation level %s", cfg.isolationLevel);
180+
181+
try {
182+
exec(srcTx,"update t set v = v - 1 where u=%d", srcAcc);
183+
exec(dstTx,"update t set v = v + 1 where u=%d", dstAcc);
184+
}catch (pqxx_exceptionconst& x) {
185+
exec(srcTx,"rollback");
186+
exec(srcTx,"rollback");
187+
t.aborts +=1;
188+
i -=1;
189+
continue;
190+
}
191+
pipelinesrcPipe(srcTx);
192+
pipelinedstPipe(dstTx);
193+
srcPipe.insert("commit transaction");
194+
dstPipe.insert("commit transaction");
195+
srcPipe.complete();
196+
dstPipe.complete();
197+
198+
t.proceeded +=1;
199+
}
200+
returnNULL;
201+
}
202+
203+
voidinitializeDatabase()
204+
{
205+
for (size_t i =0; i < cfg.connections.size(); i++) {
206+
connectionconn(cfg.connections[i]);
207+
worktxn(conn);
208+
exec(txn,"drop extension if exists pg_dtm");
209+
exec(txn,"create extension pg_dtm");
210+
exec(txn,"drop table if exists t");
211+
exec(txn,"create table t(u int primary key, v int)");
212+
exec(txn,"insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1,0);
213+
txn.commit();
214+
}
215+
}
216+
217+
intmain (int argc,char* argv[])
218+
{
219+
bool initialize =false;
220+
for (int i =1; i < argc; i++) {
221+
if (argv[i][0] =='-') {
222+
switch (argv[i][1]) {
223+
case'r':
224+
cfg.nReaders =atoi(argv[++i]);
225+
continue;
226+
case'w':
227+
cfg.nWriters =atoi(argv[++i]);
228+
continue;
229+
case'a':
230+
cfg.nAccounts =atoi(argv[++i]);
231+
continue;
232+
case'n':
233+
cfg.nIterations =atoi(argv[++i]);
234+
continue;
235+
case'c':
236+
cfg.connections.push_back(string(argv[++i]));
237+
continue;
238+
case'l':
239+
cfg.isolationLevel = argv[++i];
240+
continue;
241+
case'i':
242+
initialize =true;
243+
continue;
244+
}
245+
}
246+
printf("Options:\n"
247+
"\t-r N\tnumber of readers (1)\n"
248+
"\t-w N\tnumber of writers (10)\n"
249+
"\t-a N\tnumber of accounts (1000)\n"
250+
"\t-n N\tnumber of iterations (1000)\n"
251+
"\t-l STR\tisolation level (read committed)\n"
252+
"\t-c STR\tdatabase connection string\n"
253+
"\t-i\tinitialize datanase\n");
254+
return1;
255+
}
256+
if (initialize) {
257+
initializeDatabase();
258+
}
259+
260+
time_t start =getCurrentTime();
261+
running =true;
262+
263+
vector<thread>readers(cfg.nReaders);
264+
vector<thread>writers(cfg.nWriters);
265+
size_t nReads =0;
266+
size_t nWrites =0;
267+
size_t nAborts =0;
268+
269+
for (int i =0; i < cfg.nReaders; i++) {
270+
readers[i].start(i, reader);
271+
}
272+
for (int i =0; i < cfg.nWriters; i++) {
273+
writers[i].start(i, writer);
274+
}
275+
276+
for (int i =0; i < cfg.nWriters; i++) {
277+
writers[i].wait();
278+
nWrites += writers[i].proceeded;
279+
nAborts += writers[i].aborts;
280+
}
281+
282+
running =false;
283+
284+
for (int i =0; i < cfg.nReaders; i++) {
285+
readers[i].wait();
286+
nReads += readers[i].proceeded;
287+
}
288+
289+
time_t elapsed =getCurrentTime() - start;
290+
printf("TPS(updates)=%f, TPS(selects)=%f, aborts=%ld\n", (double)(nWrites*USEC)/elapsed, (double)(nReads*USEC)/elapsed, nAborts);
291+
return0;
292+
}

‎tests/makefile‎

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CXX=g++
2+
CXXFLAGS=-g -Wall -O2 -pthread
3+
4+
all: dtmbench
5+
6+
dtmbench: dtmbench.cpp
7+
$(CXX)$(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx
8+
9+
clean:
10+
rm -f dtmbench

‎tests/run.sh‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
./dtmbench \
2+
-c"dbname=postgres host=localhost user=knizhnik port=5432 sslmode=disable" \
3+
-c"dbname=postgres host=localhost user=knizhnik port=5433 sslmode=disable" \
4+
-c"dbname=postgres host=localhost user=knizhnik port=5434 sslmode=disable" \
5+
-n 1000 -a 1000 -w 10 -r 1$*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp