1111
1212#include < pqxx/connection>
1313#include < pqxx/transaction>
14+ #include < pqxx/subtransaction.hxx>
1415#include < pqxx/nontransaction>
1516#include < pqxx/pipeline>
1617
@@ -70,7 +71,8 @@ struct config
7071 vector<string> connections;
7172bool scatter;
7273bool avoidDeadlocks;
73-
74+ bool subtransactions;
75+
7476config () {
7577 nReaders =1 ;
7678 nWriters =10 ;
@@ -79,6 +81,7 @@ struct config
7981 updatePercent =100 ;
8082scatter =false ;
8183avoidDeadlocks =false ;
84+ subtransactions =false ;
8285 }
8386};
8487
@@ -159,6 +162,33 @@ void* writer(void* arg)
159162if (cfg.scatter ) {
160163srcAcc = srcAcc/cfg.nWriters *cfg.nWriters + t.id ;
161164dstAcc = dstAcc/cfg.nWriters *cfg.nWriters + t.id ;
165+ }else if (cfg.subtransactions ) {
166+ if (dstAcc < srcAcc) {
167+ int tmp = srcAcc;
168+ srcAcc = dstAcc;
169+ dstAcc = tmp;
170+ }
171+ while (true ) {
172+ try {
173+ subtransactionsubtxn (txn," withdraw" );
174+ exec (subtxn," update t set v = v - 1 where u=%d" , srcAcc);
175+ break ;
176+ }catch (pqxx_exceptionconst & x) {
177+ t.aborts +=1 ;
178+ }
179+ }
180+ while (true ) {
181+ try {
182+ subtransactionsubtxn (txn," deposit" );
183+ exec (subtxn," update t set v = v + 1 where u=%d" , dstAcc);
184+ break ;
185+ }catch (pqxx_exceptionconst & x) {
186+ t.aborts +=1 ;
187+ }
188+ }
189+ txn.commit ();
190+ t.transactions +=1 ;
191+ continue ;
162192}else if (cfg.avoidDeadlocks ) {
163193if (dstAcc < srcAcc) {
164194int tmp = srcAcc;
@@ -198,8 +228,8 @@ void initializeDatabase()
198228printf (" Creating database schema...\n " );
199229{
200230nontransactiontxn (conn);
201- exec (txn," drop extension if exists multimaster" );
202- exec (txn," create extension multimaster" );
231+ // exec(txn, "drop extension if exists multimaster");
232+ // exec(txn, "create extension multimaster");
203233exec (txn," drop table if exists t" );
204234exec (txn," create table t(u int primary key, v int)" );
205235}
@@ -251,6 +281,9 @@ int main (int argc, char* argv[])
251281case ' d' :
252282 cfg.avoidDeadlocks =true ;
253283continue ;
284+ case ' x' :
285+ cfg.subtransactions =true ;
286+ continue ;
254287 }
255288 }
256289printf (" Options:\n "
@@ -260,7 +293,8 @@ int main (int argc, char* argv[])
260293" \t -n N\t number of iterations (1000)\n "
261294" \t -p N\t update percent (100)\n "
262295" \t -c STR\t database connection string\n "
263- " \t -s\t scattern avoid deadlocks\n "
296+ " \t -s\t scatter ids to avoid conflicts\n "
297+ " \t -x\t use subtransactions\n "
264298" \t -d\t avoid deadlocks\n "
265299" \t -i\t initialize database\n " );
266300return 1 ;