Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit27bba25

Browse files
committed
Add dtmbench
1 parent50c8a80 commit27bba25

File tree

5 files changed

+309
-1
lines changed

5 files changed

+309
-1
lines changed

‎contrib/pg_dtm/dtmd/src/ddd.c‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ bool detectDeadLock(Graph* graph, xid_t root)
151151
return++v->deadlock_duration >=graph->min_deadlock_duration;
152152
}
153153
v->deadlock_duration=0;
154+
break;
154155
}
155156
}
156157
return false;

‎contrib/pg_dtm/libdtm.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ bool DtmGlobalDetectDeadLock(int port, TransactionId xid, void* data, int size)
487487
intdata_size=sizeof(ShubMessageHdr)+msg_size;
488488
char*buf= (char*)malloc(data_size);
489489
ShubMessageHdr*msg= (ShubMessageHdr*)buf;
490-
xid_t*body= (xid_t*)(msg+2);
490+
xid_t*body= (xid_t*)(msg+1);
491491
intsent;
492492
intreslen;
493493
xid_tresults[RESULTS_SIZE];

‎contrib/pg_dtm/tests/dtmbench.cpp‎

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
#include<time.h>
2+
#include<stdio.h>
3+
#include<stdarg.h>
4+
#include<stdlib.h>
5+
#include<inttypes.h>
6+
#include<sys/time.h>
7+
#include<pthread.h>
8+
9+
#include<string>
10+
#include<vector>
11+
12+
#include<pqxx/connection>
13+
#include<pqxx/transaction>
14+
#include<pqxx/nontransaction>
15+
#include<pqxx/pipeline>
16+
17+
usingnamespacestd;
18+
usingnamespacepqxx;
19+
20+
template<classT>
21+
classunique_ptr
22+
{
23+
T* ptr;
24+
25+
public:
26+
unique_ptr(T* p =NULL) : ptr(p) {}
27+
~unique_ptr() {delete ptr; }
28+
T&operator*() {return *ptr; }
29+
T*operator->() {return ptr; }
30+
voidoperator=(T* p) { ptr = p; }
31+
voidoperator=(unique_ptr& other) {
32+
ptr = other.ptr;
33+
other.ptr =NULL;
34+
}
35+
};
36+
37+
typedefvoid* (*thread_proc_t)(void*);
38+
typedefuint32_txid_t;
39+
40+
structthread
41+
{
42+
pthread_t t;
43+
size_t proceeded;
44+
size_t aborts;
45+
int id;
46+
47+
voidstart(int tid,thread_proc_t proc) {
48+
id = tid;
49+
proceeded =0;
50+
aborts =0;
51+
pthread_create(&t,NULL, proc,this);
52+
}
53+
54+
voidwait() {
55+
pthread_join(t,NULL);
56+
}
57+
};
58+
59+
structconfig
60+
{
61+
int nReaders;
62+
int nWriters;
63+
int nIterations;
64+
int nAccounts;
65+
charconst* isolationLevel;
66+
vector<string> connections;
67+
68+
config() {
69+
nReaders =1;
70+
nWriters =10;
71+
nIterations =1000;
72+
nAccounts =1000;
73+
isolationLevel ="read committed";
74+
}
75+
};
76+
77+
config cfg;
78+
bool running;
79+
80+
#defineUSEC1000000
81+
82+
statictime_tgetCurrentTime()
83+
{
84+
structtimeval tv;
85+
gettimeofday(&tv,NULL);
86+
return (time_t)tv.tv_sec*USEC + tv.tv_usec;
87+
}
88+
89+
90+
voidexec(transaction_base& txn,charconst* sql, ...)
91+
{
92+
va_list args;
93+
va_start(args, sql);
94+
char buf[1024];
95+
vsprintf(buf, sql, args);
96+
va_end(args);
97+
txn.exec(buf);
98+
}
99+
100+
xid_texecQuery( transaction_base& txn,charconst* sql, ...)
101+
{
102+
va_list args;
103+
va_start(args, sql);
104+
char buf[1024];
105+
vsprintf(buf, sql, args);
106+
va_end(args);
107+
result r = txn.exec(buf);
108+
return r[0][0].as(xid_t());
109+
}
110+
111+
void*reader(void* arg)
112+
{
113+
thread& t = *(thread*)arg;
114+
vector< unique_ptr<connection> >conns(cfg.connections.size());
115+
for (size_t i =0; i < conns.size(); i++) {
116+
conns[i] =newconnection(cfg.connections[i]);
117+
}
118+
int64_t prevSum =0;
119+
120+
while (running) {
121+
xid_t xid =0;
122+
for (size_t i =0; i < conns.size(); i++) {
123+
worktxn(*conns[i]);
124+
if (i ==0) {
125+
xid =execQuery(txn,"select dtm_begin_transaction()");
126+
}else {
127+
exec(txn,"select dtm_join_transaction(%u)", xid);
128+
}
129+
txn.commit();
130+
}
131+
vector< unique_ptr<nontransaction> >txns(conns.size());
132+
vector< unique_ptr<pipeline> >pipes(conns.size());
133+
vector<pipeline::query_id>results(conns.size());
134+
for (size_t i =0; i < conns.size(); i++) {
135+
txns[i] =newnontransaction(*conns[i]);
136+
pipes[i] =newpipeline(*txns[i]);
137+
results[i] = pipes[i]->insert("select sum(v) from t");
138+
}
139+
int64_t sum =0;
140+
for (size_t i =0; i < conns.size(); i++) {
141+
pipes[i]->complete();
142+
result r = pipes[i]->retrieve(results[i]);
143+
sum += r[0][0].as(int64_t());
144+
}
145+
if (sum != prevSum) {
146+
printf("Total=%ld xid=%u\n", sum, xid);
147+
prevSum = sum;
148+
}
149+
t.proceeded +=1;
150+
}
151+
returnNULL;
152+
}
153+
154+
void*writer(void* arg)
155+
{
156+
thread& t = *(thread*)arg;
157+
vector< unique_ptr<connection> >conns(cfg.connections.size());
158+
for (size_t i =0; i < conns.size(); i++) {
159+
conns[i] =newconnection(cfg.connections[i]);
160+
}
161+
for (int i =0; i < cfg.nIterations; i++)
162+
{
163+
int srcCon, dstCon;
164+
int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
165+
int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
166+
167+
do {
168+
srcCon =random() % cfg.connections.size();
169+
dstCon =random() % cfg.connections.size();
170+
}while (srcCon == dstCon);
171+
172+
nontransactionsrcTx(*conns[srcCon]);
173+
nontransactiondstTx(*conns[dstCon]);
174+
175+
xid_t xid =execQuery(srcTx,"select dtm_begin_transaction()");
176+
exec(dstTx,"select dtm_join_transaction(%u)", xid);
177+
178+
exec(srcTx,"begin transaction isolation level %s", cfg.isolationLevel);
179+
exec(dstTx,"begin transaction isolation level %s", cfg.isolationLevel);
180+
181+
try {
182+
exec(srcTx,"update t set v = v - 1 where u=%d", srcAcc);
183+
exec(dstTx,"update t set v = v + 1 where u=%d", dstAcc);
184+
}catch (pqxx_exceptionconst& x) {
185+
exec(srcTx,"rollback");
186+
exec(srcTx,"rollback");
187+
t.aborts +=1;
188+
i -=1;
189+
continue;
190+
}
191+
pipelinesrcPipe(srcTx);
192+
pipelinedstPipe(dstTx);
193+
srcPipe.insert("commit transaction");
194+
dstPipe.insert("commit transaction");
195+
srcPipe.complete();
196+
dstPipe.complete();
197+
198+
t.proceeded +=1;
199+
}
200+
returnNULL;
201+
}
202+
203+
voidinitializeDatabase()
204+
{
205+
for (size_t i =0; i < cfg.connections.size(); i++) {
206+
connectionconn(cfg.connections[i]);
207+
worktxn(conn);
208+
exec(txn,"drop extension if exists pg_dtm");
209+
exec(txn,"create extension pg_dtm");
210+
exec(txn,"drop table if exists t");
211+
exec(txn,"create table t(u int primary key, v int)");
212+
exec(txn,"insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1,0);
213+
txn.commit();
214+
}
215+
}
216+
217+
intmain (int argc,char* argv[])
218+
{
219+
bool initialize =false;
220+
for (int i =1; i < argc; i++) {
221+
if (argv[i][0] =='-') {
222+
switch (argv[i][1]) {
223+
case'r':
224+
cfg.nReaders =atoi(argv[++i]);
225+
continue;
226+
case'w':
227+
cfg.nWriters =atoi(argv[++i]);
228+
continue;
229+
case'a':
230+
cfg.nAccounts =atoi(argv[++i]);
231+
continue;
232+
case'n':
233+
cfg.nIterations =atoi(argv[++i]);
234+
continue;
235+
case'c':
236+
cfg.connections.push_back(string(argv[++i]));
237+
continue;
238+
case'l':
239+
cfg.isolationLevel = argv[++i];
240+
continue;
241+
case'i':
242+
initialize =true;
243+
continue;
244+
}
245+
}
246+
printf("Options:\n"
247+
"\t-r N\tnumber of readers (1)\n"
248+
"\t-w N\tnumber of writers (10)\n"
249+
"\t-a N\tnumber of accounts (1000)\n"
250+
"\t-n N\tnumber of iterations (1000)\n"
251+
"\t-l STR\tisolation level (read committed)\n"
252+
"\t-c STR\tdatabase connection string\n"
253+
"\t-i\tinitialize datanase\n");
254+
return1;
255+
}
256+
if (initialize) {
257+
initializeDatabase();
258+
}
259+
260+
time_t start =getCurrentTime();
261+
running =true;
262+
263+
vector<thread>readers(cfg.nReaders);
264+
vector<thread>writers(cfg.nWriters);
265+
size_t nReads =0;
266+
size_t nWrites =0;
267+
size_t nAborts =0;
268+
269+
for (int i =0; i < cfg.nReaders; i++) {
270+
readers[i].start(i, reader);
271+
}
272+
for (int i =0; i < cfg.nWriters; i++) {
273+
writers[i].start(i, writer);
274+
}
275+
276+
for (int i =0; i < cfg.nWriters; i++) {
277+
writers[i].wait();
278+
nWrites += writers[i].proceeded;
279+
nAborts += writers[i].aborts;
280+
}
281+
282+
running =false;
283+
284+
for (int i =0; i < cfg.nReaders; i++) {
285+
readers[i].wait();
286+
nReads += readers[i].proceeded;
287+
}
288+
289+
time_t elapsed =getCurrentTime() - start;
290+
printf("TPS(updates)=%f, TPS(selects)=%f, aborts=%ld\n", (double)(nWrites*USEC)/elapsed, (double)(nReads*USEC)/elapsed, nAborts);
291+
return0;
292+
}

‎contrib/pg_dtm/tests/makefile‎

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CXX=g++
2+
CXXFLAGS=-g -Wall -O2 -pthread
3+
4+
all: dtmbench
5+
6+
dtmbench: dtmbench.cpp
7+
$(CXX)$(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx
8+
9+
clean:
10+
rm -f dtmbench

‎contrib/pg_dtm/tests/run.sh‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
./dtmbench \
2+
-c"dbname=postgres host=localhost user=knizhnik port=5432 sslmode=disable" \
3+
-c"dbname=postgres host=localhost user=knizhnik port=5433 sslmode=disable" \
4+
-c"dbname=postgres host=localhost user=knizhnik port=5434 sslmode=disable" \
5+
-n 1000 -a 1000 -w 10 -r 1$*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp