Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit73132fb

Browse files
committed
Add dtmbench.cpp
2 parents2c24d13 +14d6ed8 commit73132fb

File tree

16 files changed

+808
-1842
lines changed

16 files changed

+808
-1842
lines changed
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
---
22
-hosts:nodes[1]
3-
roles:
4-
-role:postgrespro
5-
deploy_dtm:true
3+
#roles:
4+
#- role: postgrespro
5+
# deploy_dtm: true
66

77
-hosts:nodes
88
roles:
99
-role:postgrespro
1010
pg_port:15432
1111
deploy_postgres:true
1212
pg_dtm_enable:true
13-
pg_dtm_enable:false
14-
# pg_config_role:
15-
# - line: "dtm.buffer_size = 65536"
16-
pg_dtm_host:"{{ groups['nodes'][0] }}"
13+
pg_config_role:
14+
-line:"dtm.vacuum_delay = 1"
15+
#pg_dtm_host: "{{ groups['nodes'][0] }}"
16+
1717

‎contrib/pg_dtm/tests/deploy_layouts/roles/postgrespro/tasks/postgres.yml‎

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,18 @@
77
-name:remove dtm.so
88
shell:rm -f {{pg_dst}}/lib/pg_dtm.so
99

10-
-name:build dtm extension
11-
shell:make clean && make && make install
12-
args:
13-
chdir:"{{pg_src}}/contrib/pg_dtm"
14-
creates:"{{pg_dst}}/lib/pg_dtm.so"
15-
16-
# - name: build ts-dtm extension
10+
# - name: build dtm extension
1711
# shell: make clean && make && make install
1812
# args:
19-
# chdir: "{{pg_src}}/contrib/pg_tsdtm"
13+
# chdir: "{{pg_src}}/contrib/pg_dtm"
2014
# creates: "{{pg_dst}}/lib/pg_dtm.so"
2115

16+
-name:build ts-dtm extension
17+
shell:make clean && make && make install
18+
args:
19+
chdir:"{{pg_src}}/contrib/pg_tsdtm"
20+
creates:"{{pg_dst}}/lib/pg_dtm.so"
21+
2222
-stat:path={{pg_datadir}}/postmaster.pid
2323
register:pg_pidfile
2424

‎contrib/pg_dtm/tests/deploy_layouts/single.yml‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
-hosts:nodes[-1]
33
roles:
44

5-
-role:postgrespro
6-
deploy_dtm:true
5+
#- role: postgrespro
6+
# deploy_dtm: true
77

88
-role:postgrespro
99
deploy_postgres:true

‎contrib/pg_dtm/tests/farms/sai‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
158.250.29.6 ansible_ssh_user=cluster offset=4001
88
158.250.29.8 ansible_ssh_user=cluster offset=2001
99
158.250.29.9 ansible_ssh_user=cluster offset=1001
10-
158.250.29.10 ansible_ssh_user=cluster offset=1
10+
#158.250.29.10 ansible_ssh_user=cluster offset=1
1111

1212
[master]
1313
158.250.29.10 ansible_ssh_user=cluster offset=1

‎contrib/pg_tsdtm/pg_dtm.c‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,10 @@ static TransactionId DtmAdjustOldestXid(TransactionId xid)
393393
DtmTransStatus*ts,*prev=NULL;
394394
timestamp_tcutoff_time=dtm_get_current_time()-DtmVacuumDelay*USEC;
395395
SpinLockAcquire(&local->lock);
396+
ts= (DtmTransStatus*)hash_search(xid2status,&xid,HASH_FIND,NULL);
397+
if (ts!=NULL) {
398+
cutoff_time=ts->cid-DtmVacuumDelay*USEC;
399+
}
396400
for (ts=local->trans_list_head;ts!=NULL&&ts->cid<cutoff_time;prev=ts,ts=ts->next) {
397401
if (prev!=NULL) {
398402
hash_search(xid2status,&prev->xid,HASH_REMOVE,NULL);
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
#include<time.h>
2+
#include<stdio.h>
3+
#include<stdarg.h>
4+
#include<stdlib.h>
5+
#include<inttypes.h>
6+
#include<sys/time.h>
7+
#include<pthread.h>
8+
9+
#include<string>
10+
#include<vector>
11+
12+
#include<pqxx/connection>
13+
#include<pqxx/transaction>
14+
#include<pqxx/nontransaction>
15+
16+
usingnamespacestd;
17+
usingnamespacepqxx;
18+
19+
template<classT>
20+
classunique_ptr
21+
{
22+
T* ptr;
23+
24+
public:
25+
unique_ptr(T* p =NULL) : ptr(p) {}
26+
~unique_ptr() {delete ptr; }
27+
T&operator*() {return *ptr; }
28+
T*operator->() {return ptr; }
29+
voidoperator=(T* p) { ptr = p; }
30+
voidoperator=(unique_ptr& other) {
31+
ptr = other.ptr;
32+
other.ptr =NULL;
33+
}
34+
};
35+
36+
typedefvoid* (*thread_proc_t)(void*);
37+
typedefint64_tcsn_t;
38+
39+
structthread
40+
{
41+
pthread_t t;
42+
size_t proceeded;
43+
int id;
44+
45+
voidstart(int tid,thread_proc_t proc) {
46+
id = tid;
47+
proceeded =0;
48+
pthread_create(&t,NULL, proc,this);
49+
}
50+
51+
voidwait() {
52+
pthread_join(t,NULL);
53+
}
54+
};
55+
56+
structconfig
57+
{
58+
int nReaders;
59+
int nWriters;
60+
int nIterations;
61+
int nAccounts;
62+
vector<string> connections;
63+
64+
config() {
65+
nReaders =1;
66+
nWriters =10;
67+
nIterations =1000;
68+
nAccounts =1000;
69+
}
70+
};
71+
72+
config cfg;
73+
bool running;
74+
75+
#defineUSEC1000000
76+
77+
statictime_tgetCurrentTime()
78+
{
79+
structtimeval tv;
80+
gettimeofday(&tv,NULL);
81+
return (time_t)tv.tv_sec*USEC + tv.tv_usec;
82+
}
83+
84+
85+
voidexec(transaction_base& txn,charconst* sql, ...)
86+
{
87+
va_list args;
88+
va_start(args, sql);
89+
char buf[1024];
90+
vsprintf(buf, sql, args);
91+
va_end(args);
92+
txn.exec(buf);
93+
}
94+
95+
int64_texecQuery( transaction_base& txn,charconst* sql, ...)
96+
{
97+
va_list args;
98+
va_start(args, sql);
99+
char buf[1024];
100+
vsprintf(buf, sql, args);
101+
va_end(args);
102+
result r = txn.exec(buf);
103+
return r[0][0].as(int64_t());
104+
}
105+
106+
void*reader(void* arg)
107+
{
108+
thread& t = *(thread*)arg;
109+
vector< unique_ptr<connection> >conns(cfg.connections.size());
110+
for (size_t i =0; i < conns.size(); i++) {
111+
conns[i] =newconnection(cfg.connections[i]);
112+
}
113+
int64_t prevSum =0;
114+
115+
while (running) {
116+
csn_t snapshot;
117+
vector< unique_ptr<work> >txns(conns.size());
118+
for (size_t i =0; i < conns.size(); i++) {
119+
txns[i] =newwork(*conns[i]);
120+
}
121+
for (size_t i =0; i < txns.size(); i++) {
122+
if (i ==0) {
123+
snapshot =execQuery(*txns[i],"select dtm_extend()");
124+
}else {
125+
snapshot =execQuery(*txns[i],"select dtm_access(%ld)", snapshot);
126+
}
127+
}
128+
int64_t sum =0;
129+
for (size_t i =0; i < txns.size(); i++) {
130+
sum +=execQuery(*txns[i],"select sum(v) from t");
131+
}
132+
if (sum != prevSum) {
133+
printf("Total=%ld snapshot=%ld\n", sum, snapshot);
134+
prevSum = sum;
135+
}
136+
t.proceeded +=1;
137+
}
138+
returnNULL;
139+
}
140+
141+
void*writer(void* arg)
142+
{
143+
thread& t = *(thread*)arg;
144+
vector< unique_ptr<connection> >conns(cfg.connections.size());
145+
for (size_t i =0; i < conns.size(); i++) {
146+
conns[i] =newconnection(cfg.connections[i]);
147+
}
148+
for (int i =0; i < cfg.nIterations; i++)
149+
{
150+
char gtid[32];
151+
int srcCon, dstCon;
152+
int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
153+
int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
154+
155+
sprintf(gtid,"%d.%d", t.id, i);
156+
157+
do {
158+
srcCon =random() % cfg.connections.size();
159+
dstCon =random() % cfg.connections.size();
160+
}while (srcCon == dstCon);
161+
162+
nontransactionsrcTx(*conns[srcCon]);
163+
nontransactiondstTx(*conns[dstCon]);
164+
165+
exec(srcTx,"begin transaction");
166+
exec(dstTx,"begin transaction");
167+
168+
csn_t snapshot =execQuery(srcTx,"select dtm_extend('%s')", gtid);
169+
snapshot =execQuery(dstTx,"select dtm_access(%ld, '%s')", snapshot, gtid);
170+
171+
exec(srcTx,"update t set v = v - 1 where u=%d", srcAcc);
172+
exec(dstTx,"update t set v = v + 1 where u=%d", dstAcc);
173+
174+
exec(srcTx,"prepare transaction '%s'", gtid);
175+
exec(dstTx,"prepare transaction '%s'", gtid);
176+
exec(srcTx,"select dtm_begin_prepare('%s')", gtid);
177+
exec(dstTx,"select dtm_begin_prepare('%s')", gtid);
178+
csn_t csn =execQuery(srcTx,"select dtm_prepare('%s', 0)", gtid);
179+
csn =execQuery(dstTx,"select dtm_prepare('%s', %ld)", gtid, csn);
180+
exec(srcTx,"select dtm_end_prepare('%s', %ld)", gtid, csn);
181+
exec(dstTx,"select dtm_end_prepare('%s', %ld)", gtid, csn);
182+
exec(srcTx,"commit prepared '%s'", gtid);
183+
exec(dstTx,"commit prepared '%s'", gtid);
184+
185+
t.proceeded +=1;
186+
}
187+
returnNULL;
188+
}
189+
190+
voidinitializeDatabase()
191+
{
192+
for (size_t i =0; i < cfg.connections.size(); i++) {
193+
connectionconn(cfg.connections[i]);
194+
worktxn(conn);
195+
exec(txn,"drop extension if exists pg_dtm");
196+
exec(txn,"create extension pg_dtm");
197+
exec(txn,"drop table if exists t");
198+
exec(txn,"create table t(u int primary key, v int)");
199+
exec(txn,"insert into t (select generate_series(0,%d), %d)", cfg.nAccounts,0);
200+
txn.commit();
201+
}
202+
}
203+
204+
intmain (int argc,char* argv[])
205+
{
206+
bool initialize =false;
207+
for (int i =1; i < argc; i++) {
208+
if (argv[i][0] =='-') {
209+
switch (argv[i][1]) {
210+
case'r':
211+
cfg.nReaders =atoi(argv[++i]);
212+
continue;
213+
case'w':
214+
cfg.nWriters =atoi(argv[++i]);
215+
continue;
216+
case'a':
217+
cfg.nAccounts =atoi(argv[++i]);
218+
continue;
219+
case'n':
220+
cfg.nIterations =atoi(argv[++i]);
221+
continue;
222+
case'c':
223+
cfg.connections.push_back(string(argv[++i]));
224+
continue;
225+
case'i':
226+
initialize =true;
227+
continue;
228+
}
229+
}
230+
printf("Options:\n"
231+
"\t-r N\tnumber of readers (1)\n"
232+
"\t-w N\tnumber of writers (10)\n"
233+
"\t-a N\tnumber of accounts (1000)\n"
234+
"\t-n N\tnumber of iterations (1000)\n"
235+
"\t-c STR\tdatabase connection string\n"
236+
"\t-i\tinitialize datanase\n");
237+
return1;
238+
}
239+
if (initialize) {
240+
initializeDatabase();
241+
}
242+
243+
time_t start =getCurrentTime();
244+
running =true;
245+
246+
vector<thread>readers(cfg.nReaders);
247+
vector<thread>writers(cfg.nWriters);
248+
size_t nReads =0;
249+
size_t nWrites =0;
250+
251+
for (int i =0; i < cfg.nReaders; i++) {
252+
readers[i].start(i, reader);
253+
}
254+
for (int i =0; i < cfg.nWriters; i++) {
255+
writers[i].start(i, writer);
256+
}
257+
258+
for (int i =0; i < cfg.nWriters; i++) {
259+
writers[i].wait();
260+
nWrites += writers[i].proceeded;
261+
}
262+
263+
running =false;
264+
265+
for (int i =0; i < cfg.nReaders; i++) {
266+
readers[i].wait();
267+
nReads += writers[i].proceeded;
268+
}
269+
270+
time_t elapsed =getCurrentTime() - start;
271+
printf("TPS(updates)=%f, TPS(selects)=%f\n", (double)(nWrites*USEC)/elapsed, (double)(nReads*USEC)/elapsed);
272+
return0;
273+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp