44#include < stdlib.h>
55#include < inttypes.h>
66#include < sys/time.h>
7+ #include < unistd.h>
78#include < pthread.h>
89
910#include < string>
@@ -22,7 +23,7 @@ template<class T>
2223class my_unique_ptr
2324{
2425 T* ptr;
25-
26+
2627public:
2728my_unique_ptr (T* p =NULL ) : ptr(p) {}
2829~my_unique_ptr () {delete ptr; }
@@ -32,7 +33,7 @@ class my_unique_ptr
3233void operator =(my_unique_ptr& other) {
3334 ptr = other.ptr ;
3435 other.ptr =NULL ;
35- }
36+ }
3637};
3738
3839typedef void * (*thread_proc_t )(void *);
@@ -47,7 +48,7 @@ struct thread
4748size_t aborts;
4849int id;
4950
50- void start (int tid,thread_proc_t proc) {
51+ void start (int tid,thread_proc_t proc) {
5152 id = tid;
5253 updates =0 ;
5354 selects =0 ;
@@ -56,7 +57,7 @@ struct thread
5657pthread_create (&t,NULL , proc,this );
5758 }
5859
59- void wait () {
60+ void wait () {
6061pthread_join (t,NULL );
6162 }
6263};
@@ -72,7 +73,7 @@ struct config
7273bool scatter;
7374bool avoidDeadlocks;
7475bool subtransactions;
75-
76+
7677config () {
7778 nReaders =1 ;
7879 nWriters =10 ;
@@ -118,7 +119,7 @@ T execQuery( transaction_base& txn, char const* sql, ...)
118119va_end (args);
119120 result r = txn.exec (buf);
120121return r[0 ][0 ].as (T ());
121- }
122+ }
122123
123124void *reader (void * arg)
124125{
@@ -134,7 +135,7 @@ void* reader(void* arg)
134135 result r = txn.exec (" select sum(v) from t" );
135136int64_t sum = r[0 ][0 ].as (int64_t ());
136137if (sum != prevSum) {
137- r = txn.exec (" select mtm.get_snapshot()" );
138+ r = txn.exec (" select mtm.get_snapshot()" );
138139printf (" Total=%ld, snapshot=%ld\n " , sum, r[0 ][0 ].as (int64_t ()));
139140 prevSum = sum;
140141 }
@@ -144,7 +145,7 @@ void* reader(void* arg)
144145 }
145146return NULL ;
146147}
147-
148+
148149void *writer (void * arg)
149150{
150151 thread& t = *(thread*)arg;
@@ -153,17 +154,17 @@ void* writer(void* arg)
153154 conns[i] =new connection (cfg.connections [i]);
154155 }
155156for (int i =0 ; i < cfg.nIterations ; i++)
156- {
157- // work
157+ {
158+ // work
158159// transaction<repeatable_read> txn(*conns[random() % conns.size()]);
159160 transaction<read_committed>txn (*conns[random () % conns.size ()]);
160161int srcAcc =random () % cfg.nAccounts ;
161162int dstAcc =random () % cfg.nAccounts ;
162- if (cfg.scatter ) {
163+ if (cfg.scatter ) {
163164srcAcc = srcAcc/cfg.nWriters *cfg.nWriters + t.id ;
164165dstAcc = dstAcc/cfg.nWriters *cfg.nWriters + t.id ;
165- }else if (cfg.subtransactions ) {
166- if (dstAcc < srcAcc) {
166+ }else if (cfg.subtransactions ) {
167+ if (dstAcc < srcAcc) {
167168int tmp = srcAcc;
168169srcAcc = dstAcc;
169170dstAcc = tmp;
@@ -173,7 +174,7 @@ void* writer(void* arg)
173174subtransactionsubtxn (txn," withdraw" );
174175exec (subtxn," update t set v = v - 1 where u=%d" , srcAcc);
175176break ;
176- }catch (pqxx_exceptionconst & x) {
177+ }catch (pqxx_exceptionconst & x) {
177178t.aborts +=1 ;
178179}
179180}
@@ -182,36 +183,36 @@ void* writer(void* arg)
182183subtransactionsubtxn (txn," deposit" );
183184exec (subtxn," update t set v = v + 1 where u=%d" , dstAcc);
184185break ;
185- }catch (pqxx_exceptionconst & x) {
186+ }catch (pqxx_exceptionconst & x) {
186187t.aborts +=1 ;
187188}
188189}
189- txn.commit ();
190+ txn.commit ();
190191 t.transactions +=1 ;
191192continue ;
192- }else if (cfg.avoidDeadlocks ) {
193- if (dstAcc < srcAcc) {
193+ }else if (cfg.avoidDeadlocks ) {
194+ if (dstAcc < srcAcc) {
194195int tmp = srcAcc;
195196srcAcc = dstAcc;
196197dstAcc = tmp;
197198}
198199}
199- try {
200- if (random () %100 < cfg.updatePercent ) {
200+ try {
201+ if (random () %100 < cfg.updatePercent ) {
201202exec (txn," update t set v = v - 1 where u=%d" , srcAcc);
202203exec (txn," update t set v = v + 1 where u=%d" , dstAcc);
203204 t.updates +=2 ;
204- }else {
205+ }else {
205206int64_t sum = execQuery<int64_t >(txn," select v from t where u=%d" , srcAcc)
206207 + execQuery<int64_t >(txn," select v from t where u=%d" , dstAcc);
207- if (sum > cfg.nIterations *cfg.nWriters || sum < -cfg.nIterations *cfg.nWriters ) {
208+ if (sum > cfg.nIterations *cfg.nWriters || sum < -cfg.nIterations *cfg.nWriters ) {
208209printf (" Wrong sum=%ld\n " , sum);
209210 }
210211 t.selects +=2 ;
211212 }
212- txn.commit ();
213+ txn.commit ();
213214 t.transactions +=1 ;
214- }catch (pqxx_exceptionconst & x) {
215+ }catch (pqxx_exceptionconst & x) {
215216 txn.abort ();
216217 t.aborts +=1 ;
217218 i -=1 ;
@@ -220,7 +221,24 @@ void* writer(void* arg)
220221 }
221222return NULL ;
222223}
223-
224+
225+ void *monitor (void * arg)
226+ {
227+ vector<thread>& writers = *(vector<thread>*)arg;
228+ time_t start =time (NULL );
229+ size_t elapsed =0 ;
230+ while (running) {
231+ sleep (1 );
232+ long total =0 ;
233+ for (int i =0 ; i < cfg.nWriters ; i++) {
234+ total += writers[i].transactions ;
235+ }
236+ printf (" %d: %5ld TPS\n " ,int (time (NULL ) - start),long (total - elapsed));
237+ elapsed = total;
238+ }
239+ return NULL ;
240+ }
241+
224242void initializeDatabase ()
225243{
226244connectionconn (cfg.connections [0 ]);
@@ -251,15 +269,15 @@ int main (int argc, char* argv[])
251269return 1 ;
252270 }
253271
254- for (int i =1 ; i < argc; i++) {
255- if (argv[i][0 ] ==' -' ) {
256- switch (argv[i][1 ]) {
272+ for (int i =1 ; i < argc; i++) {
273+ if (argv[i][0 ] ==' -' ) {
274+ switch (argv[i][1 ]) {
257275case ' r' :
258276 cfg.nReaders =atoi (argv[++i]);
259277continue ;
260278case ' w' :
261279 cfg.nWriters =atoi (argv[++i]);
262- continue ;
280+ continue ;
263281case ' a' :
264282 cfg.nAccounts =atoi (argv[++i]);
265283continue ;
@@ -300,7 +318,7 @@ int main (int argc, char* argv[])
300318return 1 ;
301319 }
302320
303- if (initialize) {
321+ if (initialize) {
304322initializeDatabase ();
305323printf (" %d accounts inserted\n " , cfg.nAccounts );
306324return 0 ;
@@ -311,34 +329,38 @@ int main (int argc, char* argv[])
311329
312330 vector<thread>readers (cfg.nReaders );
313331 vector<thread>writers (cfg.nWriters );
332+ pthread_t logger;
333+
314334size_t nAborts =0 ;
315335size_t nUpdates =0 ;
316336size_t nSelects =0 ;
317337size_t nTransactions =0 ;
318338
319- for (int i =0 ; i < cfg.nReaders ; i++) {
339+ for (int i =0 ; i < cfg.nReaders ; i++) {
320340 readers[i].start (i, reader);
321341 }
322- for (int i =0 ; i < cfg.nWriters ; i++) {
342+ for (int i =0 ; i < cfg.nWriters ; i++) {
323343 writers[i].start (i, writer);
324344 }
325-
326- for (int i =0 ; i < cfg.nWriters ; i++) {
345+ pthread_create (&logger,NULL , monitor, &writers);
346+
347+ for (int i =0 ; i < cfg.nWriters ; i++) {
327348 writers[i].wait ();
328349 nUpdates += writers[i].updates ;
329350 nSelects += writers[i].selects ;
330351 nAborts += writers[i].aborts ;
331352 nTransactions += writers[i].transactions ;
332353 }
333-
354+
334355 running =false ;
335356
336- for (int i =0 ; i < cfg.nReaders ; i++) {
357+ for (int i =0 ; i < cfg.nReaders ; i++) {
337358 readers[i].wait ();
338359 nSelects += readers[i].selects ;
339360 nTransactions += writers[i].transactions ;
340361 }
341-
362+ pthread_join (logger,NULL );
363+
342364time_t elapsed =getCurrentTime () - start;
343365
344366printf (
@@ -347,10 +369,10 @@ int main (int argc, char* argv[])
347369" \" readers\" :%d,\" writers\" :%d,\" update_percent\" :%d,\" accounts\" :%d,\" iterations\" :%d,\" hosts\" :%ld}\n " ,
348370 (double )(nTransactions*USEC)/elapsed,
349371 nTransactions,
350- nSelects,
372+ nSelects,
351373 nUpdates,
352374 nAborts,
353- (int )(nAborts*100 /nTransactions),
375+ (int )(nAborts*100 /nTransactions),
354376 cfg.nReaders ,
355377 cfg.nWriters ,
356378 cfg.updatePercent ,