Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitde66a2b

Browse files
committed
async python client for testing
1 parent64a5f47 commitde66a2b

File tree

1 file changed

+142
-0
lines changed

1 file changed

+142
-0
lines changed

‎tests2/client2.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
#!/usr/bin/env python3
2+
importasyncio
3+
importuvloop
4+
importaiopg
5+
importrandom
6+
importpsycopg2
7+
importtime
8+
importaioprocessing
9+
importmultiprocessing
10+
11+
classMtmTxAggregate(object):
12+
13+
def__init__(self,name):
14+
self.name=name
15+
self.clear_values()
16+
17+
defclear_values(self):
18+
self.max_latency=0.0
19+
self.running_latency=0.0
20+
self.finish= {}
21+
22+
defadd_finish(self,name):
23+
ifnamenotinself.finish:
24+
self.finish[name]=1
25+
else:
26+
self.finish[name]+=1
27+
28+
classMtmClient(object):
29+
30+
def__init__(self,dsns,n_accounts=100000):
31+
self.n_accounts=n_accounts
32+
self.dsns=dsns
33+
34+
self.aggregates= [MtmTxAggregate('transfer'),MtmTxAggregate('transfer'),MtmTxAggregate('transfer')]
35+
self.initdb()
36+
37+
definitdb(self):
38+
conn=psycopg2.connect(self.dsns[0])
39+
cur=conn.cursor()
40+
cur.execute('create extension if not exists multimaster')
41+
conn.commit()
42+
cur.execute('drop table if exists bank_test')
43+
cur.execute('create table bank_test(uid int primary key, amount int)')
44+
cur.execute('''
45+
insert into bank_test
46+
select *, 0 from generate_series(0, %s)''',
47+
(self.n_accounts,))
48+
conn.commit()
49+
cur.close()
50+
conn.close()
51+
52+
asyncdefstatus(self):
53+
whileTrue:
54+
msg=awaitself.child_pipe.coro_recv()
55+
ifmsg=='status':
56+
self.child_pipe.send(self.aggregates)
57+
foraggregateinself.aggregates:
58+
aggregate.clear_values()
59+
60+
asyncdeftransfer(self,i):
61+
pool=awaitaiopg.create_pool(self.dsns[i])
62+
asyncwithpool.acquire()asconn:
63+
asyncwithconn.cursor()ascur:
64+
whileTrue:
65+
amount=1
66+
from_uid=random.randint(1,self.n_accounts-1)
67+
to_uid=random.randint(1,self.n_accounts-1)
68+
69+
try:
70+
awaitcur.execute('begin')
71+
awaitcur.execute('''update bank_test
72+
set amount = amount - %s
73+
where uid = %s''',
74+
(amount,from_uid))
75+
awaitcur.execute('''update bank_test
76+
set amount = amount + %s
77+
where uid = %s''',
78+
(amount,to_uid))
79+
awaitcur.execute('commit')
80+
81+
self.aggregates[i].add_finish('commit')
82+
exceptpsycopg2.Errorase:
83+
awaitcur.execute('rollback')
84+
self.aggregates[i].add_finish(e.pgerror)
85+
86+
asyncdeftotal(self,i):
87+
pool=awaitaiopg.create_pool(self.dsns[i])
88+
asyncwithpool.acquire()asconn:
89+
asyncwithconn.cursor()ascur:
90+
whileTrue:
91+
92+
try:
93+
awaitcur.execute('select sum(amount) from bank_test')
94+
95+
if'commit'notinself.tps_vector[i]:
96+
self.tps_vector[i]['commit']=1
97+
else:
98+
self.tps_vector[i]['commit']+=1
99+
exceptpsycopg2.Errorase:
100+
awaitcur.execute('rollback')
101+
ife.pgerrornotinself.tps_vector[i]:
102+
self.tps_vector[i][e.pgerror]=1
103+
else:
104+
self.tps_vector[i][e.pgerror]+=1
105+
106+
defrun(self):
107+
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
108+
self.loop=asyncio.get_event_loop()
109+
110+
fori,_inenumerate(self.dsns):
111+
asyncio.ensure_future(self.transfer(i))
112+
# asyncio.ensure_future(self.total(i))
113+
114+
asyncio.ensure_future(self.status())
115+
116+
self.loop.run_forever()
117+
118+
defbgrun(self):
119+
print('Starting evloop in different process');
120+
121+
self.parent_pipe,self.child_pipe=aioprocessing.AioPipe()
122+
123+
self.evloop_process=multiprocessing.Process(target=self.run,args=())
124+
self.evloop_process.start()
125+
126+
defget_status(self):
127+
c.parent_pipe.send('status')
128+
returnc.parent_pipe.recv()
129+
130+
131+
c=MtmClient(['dbname=postgres user=stas host=127.0.0.1',
132+
'dbname=postgres user=stas host=127.0.0.1 port=5433',
133+
'dbname=postgres user=stas host=127.0.0.1 port=5434'],n_accounts=1000)
134+
# c = MtmClient(['dbname=postgres user=stas host=127.0.0.1'])
135+
c.bgrun()
136+
137+
whileTrue:
138+
time.sleep(1)
139+
aggs=c.get_status()
140+
foragginaggs:
141+
print(agg.finish)
142+

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp