Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4db6be6

Browse files
committed
add latencies to aggregations, count isolation errors
1 parentde66a2b commit4db6be6

File tree

1 file changed

+85
-52
lines changed

1 file changed

+85
-52
lines changed

‎tests2/client2.py

Lines changed: 85 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,50 @@
55
importrandom
66
importpsycopg2
77
importtime
8+
importdatetime
9+
importcopy
810
importaioprocessing
911
importmultiprocessing
1012

1113
classMtmTxAggregate(object):
1214

1315
def__init__(self,name):
1416
self.name=name
17+
self.isolation=0
1518
self.clear_values()
1619

1720
defclear_values(self):
1821
self.max_latency=0.0
19-
self.running_latency=0.0
2022
self.finish= {}
2123

22-
defadd_finish(self,name):
24+
defstart_tx(self):
25+
self.start_time=datetime.datetime.now()
26+
27+
deffinish_tx(self,name):
28+
latency= (datetime.datetime.now()-self.start_time).total_seconds()
29+
30+
iflatency>self.max_latency:
31+
self.max_latency=latency
32+
2333
ifnamenotinself.finish:
2434
self.finish[name]=1
2535
else:
2636
self.finish[name]+=1
37+
38+
defas_dict(self):
39+
return {
40+
'running_latency': (datetime.datetime.now()-self.start_time).total_seconds(),
41+
'max_latency':self.max_latency,
42+
'isolation':self.isolation,
43+
'finish':copy.deepcopy(self.finish)
44+
}
2745

2846
classMtmClient(object):
2947

3048
def__init__(self,dsns,n_accounts=100000):
3149
self.n_accounts=n_accounts
3250
self.dsns=dsns
33-
34-
self.aggregates= [MtmTxAggregate('transfer'),MtmTxAggregate('transfer'),MtmTxAggregate('transfer')]
51+
self.aggregates= {}
3552
self.initdb()
3653

3754
definitdb(self):
@@ -53,90 +70,106 @@ async def status(self):
5370
whileTrue:
5471
msg=awaitself.child_pipe.coro_recv()
5572
ifmsg=='status':
56-
self.child_pipe.send(self.aggregates)
57-
foraggregateinself.aggregates:
73+
serialized_aggs= {}
74+
forname,aggregateinself.aggregates.items():
75+
serialized_aggs[name]=aggregate.as_dict()
5876
aggregate.clear_values()
77+
self.child_pipe.send(serialized_aggs)
5978

60-
asyncdeftransfer(self,i):
61-
pool=awaitaiopg.create_pool(self.dsns[i])
62-
asyncwithpool.acquire()asconn:
63-
asyncwithconn.cursor()ascur:
64-
whileTrue:
65-
amount=1
66-
from_uid=random.randint(1,self.n_accounts-1)
67-
to_uid=random.randint(1,self.n_accounts-1)
68-
69-
try:
70-
awaitcur.execute('begin')
71-
awaitcur.execute('''update bank_test
72-
set amount = amount - %s
73-
where uid = %s''',
74-
(amount,from_uid))
75-
awaitcur.execute('''update bank_test
76-
set amount = amount + %s
77-
where uid = %s''',
78-
(amount,to_uid))
79-
awaitcur.execute('commit')
80-
81-
self.aggregates[i].add_finish('commit')
82-
exceptpsycopg2.Errorase:
83-
awaitcur.execute('rollback')
84-
self.aggregates[i].add_finish(e.pgerror)
79+
asyncdefexec_tx(self,tx_block,aggname_prefix,conn_i):
80+
aggname="%s_%i"% (aggname_prefix,conn_i)
81+
agg=self.aggregates[aggname]=MtmTxAggregate(aggname)
8582

86-
asyncdeftotal(self,i):
87-
pool=awaitaiopg.create_pool(self.dsns[i])
83+
pool=awaitaiopg.create_pool(self.dsns[conn_i])
8884
asyncwithpool.acquire()asconn:
8985
asyncwithconn.cursor()ascur:
9086
whileTrue:
91-
87+
agg.start_tx()
9288
try:
93-
awaitcur.execute('select sum(amount) from bank_test')
94-
95-
if'commit'notinself.tps_vector[i]:
96-
self.tps_vector[i]['commit']=1
97-
else:
98-
self.tps_vector[i]['commit']+=1
89+
awaittx_block(conn,cur)
90+
agg.finish_tx('commit')
9991
exceptpsycopg2.Errorase:
10092
awaitcur.execute('rollback')
101-
ife.pgerrornotinself.tps_vector[i]:
102-
self.tps_vector[i][e.pgerror]=1
103-
else:
104-
self.tps_vector[i][e.pgerror]+=1
93+
agg.finish_tx(e.pgerror)
94+
95+
asyncdeftransfer_tx(self,conn,cur):
96+
amount=1
97+
# to avoid deadlocks:
98+
from_uid=random.randint(1,self.n_accounts-2)
99+
to_uid=from_uid+1
100+
awaitcur.execute('begin')
101+
awaitcur.execute('''update bank_test
102+
set amount = amount - %s
103+
where uid = %s''',
104+
(amount,from_uid))
105+
awaitcur.execute('''update bank_test
106+
set amount = amount + %s
107+
where uid = %s''',
108+
(amount,to_uid))
109+
awaitcur.execute('commit')
110+
111+
asyncdeftotal_tx(self,conn,cur):
112+
awaitcur.execute('select sum(amount) from bank_test')
113+
total=awaitcur.fetchone()
114+
iftotal[0]!=0:
115+
self.isolation_errors+=1
105116

106117
defrun(self):
107118
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
108119
self.loop=asyncio.get_event_loop()
109120

110121
fori,_inenumerate(self.dsns):
111-
asyncio.ensure_future(self.transfer(i))
112-
#asyncio.ensure_future(self.total(i))
122+
asyncio.ensure_future(self.exec_tx(self.transfer_tx,'transfer',i))
123+
asyncio.ensure_future(self.exec_tx(self.total_tx,'sumtotal',i))
113124

114125
asyncio.ensure_future(self.status())
115126

116127
self.loop.run_forever()
117128

118129
defbgrun(self):
119130
print('Starting evloop in different process');
120-
121131
self.parent_pipe,self.child_pipe=aioprocessing.AioPipe()
122-
123132
self.evloop_process=multiprocessing.Process(target=self.run,args=())
124133
self.evloop_process.start()
125134

126135
defget_status(self):
127136
c.parent_pipe.send('status')
128137
returnc.parent_pipe.recv()
129138

139+
defprint_aggregates(serialized_agg):
140+
columns= ['running_latency','max_latency','isolation','finish']
141+
142+
# print table header
143+
print("\t\t",end="")
144+
forcolincolumns:
145+
print(col,end="\t")
146+
print("\n",end="")
147+
148+
serialized_agg
149+
150+
foraggnameinsorted(serialized_agg.keys()):
151+
agg=serialized_agg[aggname]
152+
print("%s\t"%aggname,end="")
153+
forcolincolumns:
154+
ifcolinagg:
155+
ifisinstance(agg[col],float):
156+
print("%.2f\t"% (agg[col],),end="\t")
157+
else:
158+
print(agg[col],end="\t")
159+
else:
160+
print("-\t",end="")
161+
print("")
162+
print("")
130163

131164
c=MtmClient(['dbname=postgres user=stas host=127.0.0.1',
132165
'dbname=postgres user=stas host=127.0.0.1 port=5433',
133-
'dbname=postgres user=stas host=127.0.0.1 port=5434'],n_accounts=1000)
134-
# c = MtmClient(['dbname=postgres user=stas host=127.0.0.1'])
166+
'dbname=postgres user=stas host=127.0.0.1 port=5434'],n_accounts=10000)
135167
c.bgrun()
136168

137169
whileTrue:
138170
time.sleep(1)
139171
aggs=c.get_status()
140-
foragginaggs:
141-
print(agg.finish)
172+
print_aggregates(aggs)
173+
# for k, v in aggs.items():
174+
# print(k, v.finish)
142175

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp