Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1b12b78

Browse files
committed
handle stop in tests
1 parentaa446f1 commit1b12b78

File tree

5 files changed

+95
-79
lines changed

5 files changed

+95
-79
lines changed

‎Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ USER postgres
3131
ENV CFLAGS -O0
3232
WORKDIR /pg
3333

34-
ENV REBUILD6
34+
ENV REBUILD1
3535

3636
RUN cd /pg && \
3737
git clone https://github.com/postgrespro/postgres_cluster.git --depth 1 && \
@@ -42,6 +42,8 @@ RUN cd /pg && \
4242
ENV PATH /pg/install/bin:$PATH
4343
ENV PGDATA /pg/data
4444

45+
ENV REBUILD 1
46+
4547
RUN cd /pg/postgres_cluster/contrib/raftable && make install
4648

4749
RUN mkdir /pg/mmts

‎Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,6 @@ xcheck:
4848
cd tests2&& docker network rm tests2_net||true
4949
cd tests2&& blockade up
5050
sleep 20# wait for mmts init
51-
cd tests2&&python test_recovery.py||true
51+
cd tests2&&python3 test_recovery.py||true
5252
#cd tests2 && blockade destroy
5353

‎tests2/client2.py

Lines changed: 71 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python3
22
importasyncio
3-
importuvloop
3+
#import uvloop
44
importaiopg
55
importrandom
66
importpsycopg2
@@ -50,6 +50,7 @@ def __init__(self, dsns, n_accounts=100000):
5050
self.dsns=dsns
5151
self.aggregates= {}
5252
self.initdb()
53+
self.running=True
5354

5455
definitdb(self):
5556
conn=psycopg2.connect(self.dsns[0])
@@ -66,63 +67,67 @@ def initdb(self):
6667
cur.close()
6768
conn.close()
6869

69-
asyncdefstatus(self):
70-
whileTrue:
71-
msg=awaitself.child_pipe.coro_recv()
70+
@asyncio.coroutine
71+
defstatus(self):
72+
whileself.running:
73+
msg=yieldfromself.child_pipe.coro_recv()
7274
ifmsg=='status':
7375
serialized_aggs= {}
7476
forname,aggregateinself.aggregates.items():
7577
serialized_aggs[name]=aggregate.as_dict()
7678
aggregate.clear_values()
7779
self.child_pipe.send(serialized_aggs)
7880

79-
asyncdefexec_tx(self,tx_block,aggname_prefix,conn_i):
81+
82+
@asyncio.coroutine
83+
defexec_tx(self,tx_block,aggname_prefix,conn_i):
8084
aggname="%s_%i"% (aggname_prefix,conn_i)
8185
agg=self.aggregates[aggname]=MtmTxAggregate(aggname)
82-
83-
pool=awaitaiopg.create_pool(self.dsns[conn_i])
84-
asyncwithpool.acquire()asconn:
85-
asyncwithconn.cursor()ascur:
86-
whileTrue:
87-
agg.start_tx()
88-
try:
89-
awaittx_block(conn,cur)
90-
agg.finish_tx('commit')
91-
exceptpsycopg2.Errorase:
92-
awaitcur.execute('rollback')
93-
agg.finish_tx(e.pgerror)
94-
95-
asyncdeftransfer_tx(self,conn,cur):
86+
pool=yieldfromaiopg.create_pool(self.dsns[conn_i])
87+
conn=yieldfrompool.acquire()
88+
cur=yieldfromconn.cursor()
89+
whileself.running:
90+
agg.start_tx()
91+
try:
92+
yieldfromcur.execute('commit')
93+
yieldfromtx_block(conn,cur)
94+
agg.finish_tx('commit')
95+
exceptpsycopg2.Errorase:
96+
agg.finish_tx(e.pgerror)
97+
98+
@asyncio.coroutine
99+
deftransfer_tx(self,conn,cur):
96100
amount=1
97101
# to avoid deadlocks:
98102
from_uid=random.randint(1,self.n_accounts-2)
99103
to_uid=from_uid+1
100-
awaitcur.execute('begin')
101-
awaitcur.execute('''update bank_test
104+
yieldfromcur.execute('begin')
105+
yieldfromcur.execute('''update bank_test
102106
set amount = amount - %s
103107
where uid = %s''',
104108
(amount,from_uid))
105-
awaitcur.execute('''update bank_test
109+
yieldfromcur.execute('''update bank_test
106110
set amount = amount + %s
107111
where uid = %s''',
108112
(amount,to_uid))
109-
awaitcur.execute('commit')
113+
yieldfromcur.execute('commit')
110114

111-
asyncdeftotal_tx(self,conn,cur):
112-
awaitcur.execute('select sum(amount) from bank_test')
113-
total=awaitcur.fetchone()
115+
@asyncio.coroutine
116+
deftotal_tx(self,conn,cur):
117+
yieldfromcur.execute('select sum(amount) from bank_test')
118+
total=yieldfromcur.fetchone()
114119
iftotal[0]!=0:
115120
self.isolation_errors+=1
116121

117122
defrun(self):
118-
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
123+
#asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
119124
self.loop=asyncio.get_event_loop()
120125

121126
fori,_inenumerate(self.dsns):
122-
asyncio.ensure_future(self.exec_tx(self.transfer_tx,'transfer',i))
123-
asyncio.ensure_future(self.exec_tx(self.total_tx,'sumtotal',i))
127+
asyncio.async(self.exec_tx(self.transfer_tx,'transfer',i))
128+
asyncio.async(self.exec_tx(self.total_tx,'sumtotal',i))
124129

125-
asyncio.ensure_future(self.status())
130+
asyncio.async(self.status())
126131

127132
self.loop.run_forever()
128133

@@ -133,43 +138,46 @@ def bgrun(self):
133138
self.evloop_process.start()
134139

135140
defget_status(self):
136-
c.parent_pipe.send('status')
137-
returnc.parent_pipe.recv()
138-
139-
defprint_aggregates(serialized_agg):
140-
columns= ['running_latency','max_latency','isolation','finish']
141-
142-
# print table header
143-
print("\t\t",end="")
144-
forcolincolumns:
145-
print(col,end="\t")
146-
print("\n",end="")
141+
self.parent_pipe.send('status')
142+
returnself.parent_pipe.recv()
143+
144+
defstop(self):
145+
self.running=False
146+
self.evloop_process.terminate()
147147

148-
serialized_agg
148+
@classmethod
149+
defprint_aggregates(cls,serialized_agg):
150+
columns= ['running_latency','max_latency','isolation','finish']
149151

150-
foraggnameinsorted(serialized_agg.keys()):
151-
agg=serialized_agg[aggname]
152-
print("%s\t"%aggname,end="")
152+
# print table header
153+
print("\t\t",end="")
153154
forcolincolumns:
154-
ifcolinagg:
155-
ifisinstance(agg[col],float):
156-
print("%.2f\t"% (agg[col],),end="\t")
155+
print(col,end="\t")
156+
print("\n",end="")
157+
158+
serialized_agg
159+
160+
foraggnameinsorted(serialized_agg.keys()):
161+
agg=serialized_agg[aggname]
162+
print("%s\t"%aggname,end="")
163+
forcolincolumns:
164+
ifcolinagg:
165+
ifisinstance(agg[col],float):
166+
print("%.2f\t"% (agg[col],),end="\t")
167+
else:
168+
print(agg[col],end="\t")
157169
else:
158-
print(agg[col],end="\t")
159-
else:
160-
print("-\t",end="")
170+
print("-\t",end="")
171+
print("")
161172
print("")
162-
print("")
163-
164-
c=MtmClient(['dbname=postgres user=stas host=127.0.0.1',
165-
'dbname=postgres user=stas host=127.0.0.1 port=5433',
166-
'dbname=postgres user=stas host=127.0.0.1 port=5434'],n_accounts=10000)
167-
c.bgrun()
168173

169-
whileTrue:
170-
time.sleep(1)
171-
aggs=c.get_status()
172-
print_aggregates(aggs)
173-
# for k, v in aggs.items():
174-
# print(k, v.finish)
175174

175+
if__name__=="__main__":
176+
c=MtmClient(['dbname=postgres user=postgres host=127.0.0.1',
177+
'dbname=postgres user=postgres host=127.0.0.1 port=5433',
178+
'dbname=postgres user=postgres host=127.0.0.1 port=5434'],n_accounts=10000)
179+
c.bgrun()
180+
whileTrue:
181+
time.sleep(1)
182+
aggs=c.get_status()
183+
MtmClient.print_aggregates(aggs)

‎tests2/requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
git+https://github.com/kelvich/blockade.git@002-allow-cores
22
psycopg2
3+
aiopg
4+
aioprocessing

‎tests2/test_recovery.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,24 @@
11
importunittest
22
importtime
33
importsubprocess
4-
fromlib.bank_clientimport*
4+
# from lib.bank_client import *
5+
fromclient2importMtmClient
56

67
classRecoveryTest(unittest.TestCase):
78
@classmethod
89
defsetUpClass(self):
9-
self.clients=ClientCollection([
10-
"dbname=postgres host=127.0.0.1 user=postgres",
11-
"dbname=postgres host=127.0.0.1 user=postgres port=5433",
12-
"dbname=postgres host=127.0.0.1 user=postgres port=5434"
10+
self.client=MtmClient([
11+
"dbname=postgresuser=postgreshost=127.0.0.1",
12+
"dbname=postgresuser=postgreshost=127.0.0.1 port=5433",
13+
"dbname=postgresuser=postgreshost=127.0.0.1 port=5434"
1314
])
14-
self.clients.start()
15+
self.client.bgrun()
1516
time.sleep(5)
1617

1718
@classmethod
1819
deftearDownClass(self):
1920
print('tearDown')
20-
self.clients.stop()
21+
self.client.stop()
2122

2223
# def test_normal_operations(self):
2324
# print('### normalOpsTest ###')
@@ -29,18 +30,20 @@ def tearDownClass(self):
2930
# # there were some commits
3031
# self.assertTrue( agg['transfer'] > 0 )
3132

32-
deftest_node_prtition(self):
33+
deftest_node_partition(self):
3334
print('### nodePartitionTest ###')
3435

3536
subprocess.check_call(['blockade','partition','node3'])
3637
print('### blockade node3 ###')
3738

3839
# clear tx history
39-
self.clients.aggregate(echo=False)
40+
self.client.get_status()
4041

4142
foriinrange(10):
43+
print(i)
4244
time.sleep(3)
43-
aggs=self.clients.aggregate()
45+
aggs=self.client.get_status()
46+
MtmClient.print_aggregates(aggs)
4447
#self.assertTrue( aggs[0]['transfer']['finish']['Commit'] > 0 )
4548
#self.assertTrue( aggs[1]['transfer']['finish']['Commit'] > 0 )
4649
#self.assertTrue( 'Commit' not in aggs[2]['transfer']['finish'] )
@@ -49,12 +52,13 @@ def test_node_prtition(self):
4952
print('### deblockade node3 ###')
5053

5154
# clear tx history
52-
self.clients.aggregate(echo=False)
55+
self.client.get_status()
5356

54-
foriinrange(1000):
57+
foriinrange(30):
58+
print(i)
5559
time.sleep(3)
56-
aggs=self.clients.aggregate()
57-
print(i,aggs)
60+
aggs=self.client.get_status()
61+
MtmClient.print_aggregates(aggs)
5862

5963

6064
subprocess.check_call(['blockade','join'])

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp