4
4
"fmt"
5
5
"sync"
6
6
"math/rand"
7
+ "time"
7
8
"github.com/jackc/pgx"
8
9
)
9
10
@@ -34,10 +35,10 @@ var running = false
34
35
var nodes []int32 = []int32 {0 ,1 }
35
36
36
37
func asyncCommit (conn * pgx.Conn ,wg * sync.WaitGroup ) {
37
- exec (conn ,"commit" )
38
+ exec (conn ,"commit" )
38
39
wg .Done ()
39
40
}
40
-
41
+
41
42
func commit (conn1 ,conn2 * pgx.Conn ) {
42
43
var wg sync.WaitGroup
43
44
wg .Add (2 )
@@ -66,33 +67,53 @@ func prepare_db() {
66
67
exec (conn2 ,"create extension pg_dtm" )
67
68
exec (conn2 ,"drop table if exists t" )
68
69
exec (conn2 ,"create table t(u int primary key, v int)" )
69
-
70
+
70
71
// xid = execQuery(conn1, "select dtm_begin_transaction(2)")
71
72
// exec(conn2, "select dtm_join_transaction($1)", xid)
72
73
73
74
// strt transaction
74
75
exec (conn1 ,"begin transaction isolation level " + ISOLATION_LEVEL )
75
76
exec (conn2 ,"begin transaction isolation level " + ISOLATION_LEVEL )
76
-
77
+
77
78
for i := 0 ;i < N_ACCOUNTS ;i ++ {
78
79
exec (conn1 ,"insert into t values($1, $2)" ,i ,INIT_AMOUNT )
79
80
exec (conn2 ,"insert into t values($1, $2)" ,i ,INIT_AMOUNT )
80
81
}
81
-
82
+
82
83
commit (conn1 ,conn2 )
83
84
}
84
85
85
86
func max (a ,b int64 )int64 {
86
87
if a >= b {
87
88
return a
88
- }
89
+ }
89
90
return b
90
91
}
91
92
92
- func transfer (id int ,wg * sync.WaitGroup ) {
93
+ func progress (total int ,cCommits chan int ,cAborts chan int ) {
94
+ commits := 0
95
+ aborts := 0
96
+ start := time .Now ()
97
+ for newcommits := range cCommits {
98
+ newaborts := <- cAborts
99
+ commits += newcommits
100
+ aborts += newaborts
101
+ if time .Since (start ).Seconds ()> 1 {
102
+ fmt .Printf (
103
+ "progress %0.2f%%: %d commits, %d aborts\n " ,
104
+ float32 (commits )* 100.0 / float32 (total ),commits ,aborts ,
105
+ )
106
+ start = time .Now ()
107
+ }
108
+ }
109
+ }
110
+
111
+ func transfer (id int ,cCommits chan int ,cAborts chan int ,wg * sync.WaitGroup ) {
93
112
var err error
94
113
var xid int32
95
- var nConflicts = 0
114
+ var nAborts = 0
115
+ var nCommits = 0
116
+ var myCommits = 0
96
117
97
118
conn1 ,err := pgx .Connect (cfg1 )
98
119
checkErr (err )
@@ -102,10 +123,11 @@ func transfer(id int, wg *sync.WaitGroup) {
102
123
checkErr (err )
103
124
defer conn2 .Close ()
104
125
105
- for i := 0 ;i < N_ITERATIONS ;i ++ {
106
- //amount := 2*rand.Intn(2) - 1
107
- amount := 1
108
- account1 := rand .Intn (N_ACCOUNTS )
126
+ start := time .Now ()
127
+ for myCommits < N_ITERATIONS {
128
+ amount := 2 * rand .Intn (2000 )- 1
129
+ //amount := 1
130
+ account1 := rand .Intn (N_ACCOUNTS )
109
131
account2 := rand .Intn (N_ACCOUNTS )
110
132
111
133
xid = execQuery (conn1 ,"select dtm_begin_transaction(2)" )
@@ -114,25 +136,35 @@ func transfer(id int, wg *sync.WaitGroup) {
114
136
// start transaction
115
137
exec (conn1 ,"begin transaction isolation level " + ISOLATION_LEVEL )
116
138
exec (conn2 ,"begin transaction isolation level " + ISOLATION_LEVEL )
117
-
118
- ok1 := execUpdate (conn1 ,"update t set v = v + $1 where u=$2" ,amount ,account1 )
119
- ok2 := execUpdate (conn2 ,"update t set v = v - $1 where u=$2" ,amount ,account2 )
120
- if ! ok1 || ! ok2 {
139
+
140
+ ok1 := execUpdate (conn1 ,"update t set v = v + $1 where u=$2" ,amount ,account1 )
141
+ ok2 := execUpdate (conn2 ,"update t set v = v - $1 where u=$2" ,amount ,account2 )
142
+ if ! ok1 || ! ok2 {
121
143
exec (conn1 ,"rollback" )
122
144
exec (conn2 ,"rollback" )
123
- nConflicts += 1
124
- i -= 1
125
- }else {
145
+ nAborts += 1
146
+ }else {
126
147
commit (conn1 ,conn2 )
127
- }
148
+ nCommits += 1
149
+ myCommits += 1
150
+ }
151
+
152
+ if time .Since (start ).Seconds ()> 1 {
153
+ cCommits <- nCommits
154
+ cAborts <- nAborts
155
+ nCommits = 0
156
+ nAborts = 0
157
+ start = time .Now ()
158
+ }
128
159
}
129
- fmt .Println ("Test completed with " ,nConflicts ," conflicts" )
160
+ cCommits <- nCommits
161
+ cAborts <- nAborts
130
162
wg .Done ()
131
163
}
132
164
133
165
func inspect (wg * sync.WaitGroup ) {
134
166
var sum1 ,sum2 ,sum int64
135
- var prevSum int64 = 0
167
+ var prevSum int64 = 0
136
168
var xid int32
137
169
138
170
{
@@ -142,28 +174,33 @@ func inspect(wg *sync.WaitGroup) {
142
174
conn2 ,err := pgx .Connect (cfg2 )
143
175
checkErr (err )
144
176
145
- for running {
177
+ for running {
178
+ xid = execQuery (conn1 ,"select dtm_begin_transaction(2)" )
179
+ exec (conn2 ,"select dtm_join_transaction($1)" ,xid )
180
+
181
+ exec (conn1 ,"begin transaction isolation level " + ISOLATION_LEVEL )
182
+ exec (conn2 ,"begin transaction isolation level " + ISOLATION_LEVEL )
183
+
184
+ sum1 = execQuery64 (conn1 ,"select sum(v) from t" )
185
+ sum2 = execQuery64 (conn2 ,"select sum(v) from t" )
186
+
187
+ sum = sum1 + sum2
188
+ if (sum != prevSum ) {
189
+ xmin1 := execQuery (conn1 ,"select dtm_get_current_snapshot_xmin()" )
190
+ xmax1 := execQuery (conn1 ,"select dtm_get_current_snapshot_xmax()" )
191
+ xmin2 := execQuery (conn2 ,"select dtm_get_current_snapshot_xmin()" )
192
+ xmax2 := execQuery (conn2 ,"select dtm_get_current_snapshot_xmax()" )
193
+ fmt .Printf (
194
+ "Total=%d xid=%d snap1=[%d, %d) snap2=[%d, %d)\n " ,
195
+ sum ,xid ,xmin1 ,xmax1 ,xmin2 ,xmax2 ,
196
+ )
197
+ prevSum = sum
198
+ }
146
199
147
-
148
- xid = execQuery (conn1 ,"select dtm_begin_transaction(2)" )
149
- exec (conn2 ,"select dtm_join_transaction($1)" ,xid )
150
-
151
- exec (conn1 ,"begin transaction isolation level " + ISOLATION_LEVEL )
152
- exec (conn2 ,"begin transaction isolation level " + ISOLATION_LEVEL )
153
-
154
- sum1 = execQuery64 (conn1 ,"select sum(v) from t" )
155
- sum2 = execQuery64 (conn2 ,"select sum(v) from t" )
156
-
157
- sum = sum1 + sum2
158
- if (sum != prevSum ) {
159
- fmt .Println ("Total = " ,sum ,"xid=" ,xid ,"snap1={" ,execQuery (conn1 ,"select dtm_get_current_snapshot_xmin()" ),execQuery (conn1 ,"select dtm_get_current_snapshot_xmax()" ),"}, snap2={" ,execQuery (conn2 ,"select dtm_get_current_snapshot_xmin()" ),execQuery (conn2 ,"select dtm_get_current_snapshot_xmax()" ),"}" )
160
- prevSum = sum
161
- }
162
-
163
- commit (conn1 ,conn2 )
164
- }
165
- conn1 .Close ()
166
- conn2 .Close ()
200
+ commit (conn1 ,conn2 )
201
+ }
202
+ conn1 .Close ()
203
+ conn2 .Close ()
167
204
}
168
205
wg .Done ()
169
206
}
@@ -174,9 +211,13 @@ func main() {
174
211
175
212
prepare_db ()
176
213
214
+ cCommits := make (chan int )
215
+ cAborts := make (chan int )
216
+ go progress (TRANSFER_CONNECTIONS * N_ITERATIONS ,cCommits ,cAborts )
217
+
177
218
transferWg .Add (TRANSFER_CONNECTIONS )
178
219
for i := 0 ;i < TRANSFER_CONNECTIONS ;i ++ {
179
- go transfer (i ,& transferWg )
220
+ go transfer (i ,cCommits , cAborts , & transferWg )
180
221
}
181
222
running = true
182
223
inspectWg .Add (1 )
@@ -185,6 +226,8 @@ func main() {
185
226
transferWg .Wait ()
186
227
running = false
187
228
inspectWg .Wait ()
229
+
230
+ fmt .Printf ("done\n " )
188
231
}
189
232
190
233
func exec (conn * pgx.Conn ,stmt string ,arguments ... interface {}) {
@@ -216,10 +259,11 @@ func execQuery64(conn *pgx.Conn, stmt string, arguments ...interface{}) int64 {
216
259
checkErr (err )
217
260
return result
218
261
}
262
+
219
263
func checkErr (err error ) {
220
264
if err != nil {
221
265
panic (err )
222
266
}
223
267
}
224
268
225
-
269
+ // vim: expandtab ts=4 sts=4 sw=4