Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4b3a499

Browse files
committed
better errors handling in bank_client
1 parente604b24 commit4b3a499

File tree

3 files changed

+84
-122
lines changed

3 files changed

+84
-122
lines changed

‎tests2/lib/bank_client.py

Lines changed: 39 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -65,106 +65,68 @@ def print_error(self, arg, comment=''):
6565
ifself.show_errors:
6666
print('Node',self.node_id,'got error',arg,comment)
6767

68-
defcheck_total(self):
69-
conn,cur=self.connect()
70-
i=0
68+
defexec_tx(self,name,tx_block):
69+
conn=psycopg2.connect(self.connstr)
70+
cur=conn.cursor()
7171

7272
whileself.run.value:
73-
i+=1
74-
75-
event_id=self.history.register_start('total')
73+
event_id=self.history.register_start(name)
7674

77-
ifnotconn.closed:
75+
ifconn.closed:
76+
self.history.register_finish(event_id,'ReConnect')
7877
try :
7978
conn=psycopg2.connect(self.connstr)
8079
cur=conn.cursor()
8180
except :
82-
self.history.register_finish(event_id,'CantConnect')
83-
next
84-
85-
amount=1
86-
from_uid=random.randrange(1,self.accounts+1)
87-
to_uid=random.randrange(1,self.accounts+1)
81+
continue
82+
else :
83+
continue
8884

8985
try:
90-
cur.execute('select sum(amount) from bank_test')
91-
res=cur.fetchone()
92-
ifres[0]!=0:
93-
print("Isolation error, total = %d"% (res[0],))
94-
raiseBaseException
95-
exceptBaseException:
96-
raiseBaseException
86+
tx_block(conn,cur)
9787
exceptpsycopg2.InterfaceError:
9888
self.history.register_finish(event_id,'InterfaceError')
89+
exceptpsycopg2.Error:
90+
self.history.register_finish(event_id,'PsycopgError')
9991
except :
92+
print(sys.exc_info())
10093
self.history.register_finish(event_id,'OtherError')
10194
else :
102-
self.history.register_finish(event_id,'commit')
95+
self.history.register_finish(event_id,'Commit')
10396

10497
cur.close()
10598
conn.close()
10699

107-
deftransfer_money(self):
108-
conn,cur=self.connect()
109-
110-
i=0
100+
defcheck_total(self):
111101

112-
whileself.run.value:
113-
i+=1
102+
deftx(conn,cur):
103+
cur.execute('select sum(amount) from bank_test')
104+
res=cur.fetchone()
105+
ifres[0]!=0:
106+
print("Isolation error, total = %d"% (res[0],))
107+
raiseBaseException
114108

115-
event_id=self.history.register_start('transfer')
109+
self.exec_tx('total',tx)
116110

117-
ifnotconn.closed:
118-
try :
119-
conn=psycopg2.connect(self.connstr)
120-
cur=conn.cursor()
121-
except :
122-
self.history.register_finish(event_id,'CantConnect')
123-
next
111+
deftransfer_money(self):
124112

113+
deftx(conn,cur):
125114
amount=1
126-
from_uid=random.randrange(1,self.accounts+1)
127-
to_uid=random.randrange(1,self.accounts+1)
128-
129-
try:
130-
cur.execute('''update bank_test
131-
set amount = amount - %s
132-
where uid = %s''',
133-
(amount,from_uid))
134-
cur.execute('''update bank_test
135-
set amount = amount + %s
136-
where uid = %s''',
137-
(amount,to_uid))
138-
conn.commit()
139-
140-
exceptpsycopg2.InterfaceError:
141-
self.history.register_finish(event_id,'InterfaceError')
142-
except :
143-
self.history.register_finish(event_id,'OtherError')
144-
else :
145-
self.history.register_finish(event_id,'commit')
146-
147-
cur.close()
148-
conn.close()
149-
150-
defconnect(self,reconnect=False):
151-
152-
whileTrue:
153-
try:
154-
conn=psycopg2.connect(self.connstr)
155-
cur=conn.cursor()
156-
returnconn,cur
157-
except:
158-
self.print_error(sys.exc_info(),'2')
159-
ifnotreconnect:
160-
raise
161-
ifnotself.run.value:
162-
raise
163-
164-
# def watchdog(self):
165-
# while self.run.value:
166-
# time.sleep(1)
167-
# print('watchdog: ', self.history.aggregate())
115+
from_uid=random.randrange(1,self.accounts-10)
116+
to_uid=from_uid+1#random.randrange(1, self.accounts + 1)
117+
118+
conn.commit()
119+
cur.execute('''update bank_test
120+
set amount = amount - %s
121+
where uid = %s''',
122+
(amount,from_uid))
123+
cur.execute('''update bank_test
124+
set amount = amount + %s
125+
where uid = %s''',
126+
(amount,to_uid))
127+
conn.commit()
128+
129+
self.exec_tx('transfer',tx)
168130

169131
defstart(self):
170132
self.transfer_process=Process(target=self.transfer_money,args=())
@@ -173,9 +135,6 @@ def start(self):
173135
self.total_process=Process(target=self.check_total,args=())
174136
self.total_process.start()
175137

176-
#self.total_process = Process(target=self.watchdog, args=())
177-
#self.total_process.start()
178-
179138
return
180139

181140
defstop(self):

‎tests2/lib/event_history.py

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
importtime
22
importdatetime
33
importuuid
4+
importcopy
45
frommultiprocessingimportQueue
56

67

@@ -12,11 +13,10 @@ def __init__(self):
1213
self.running_events= {}
1314
self.last_aggregation=datetime.datetime.now()
1415
self.agg_template= {
15-
'commit':0,
16-
'rollback':0,
1716
'max_latency':0.0,
1817
'running':0,
19-
'running_latency':0.0
18+
'running_latency':0.0,
19+
'finish': {}
2020
}
2121

2222
defregister_start(self,name):
@@ -43,18 +43,21 @@ def load_queue(self):
4343
self.running_events[event['event_id']]=event
4444
else:
4545
# finish mark
46-
ifevent['event_id']inself.running_events:
47-
start_ev=self.running_events[event['event_id']]
48-
self.events.append({
49-
'name':start_ev['name'],
50-
'started_at':start_ev['time'],
51-
'finished_at':event['time'],
52-
'status':event['status']
53-
})
54-
self.running_events.pop(event['event_id'],None)
55-
else:
46+
ifevent['event_id']notinself.running_events:
5647
# found finish event without corresponding start
48+
print("ololololo!")
5749
raise
50+
51+
start_ev=self.running_events[event['event_id']]
52+
self.events.append({
53+
'name':start_ev['name'],
54+
'started_at':start_ev['time'],
55+
'finished_at':event['time'],
56+
'status':event['status']
57+
})
58+
self.running_events.pop(event['event_id'],None)
59+
60+
#print(self.events)
5861
return
5962

6063
defaggregate(self):
@@ -63,32 +66,38 @@ def aggregate(self):
6366
agg= {}
6467
forevinself.events:
6568
ifev['finished_at']<self.last_aggregation:
69+
#print("cont")
6670
continue
6771

6872
ifev['name']notinagg:
69-
agg[ev['name']]=self.agg_template.copy()
73+
agg[ev['name']]=copy.deepcopy(self.agg_template)
74+
#print('-=-=-', agg)
7075

7176
named_agg=agg[ev['name']]
7277
latency= (ev['finished_at']-ev['started_at']).total_seconds()
73-
named_agg[ev['status']]+=1
78+
79+
ifev['status']notinnamed_agg['finish']:
80+
named_agg['finish'][ev['status']]=1
81+
else:
82+
named_agg['finish'][ev['status']]+=1
83+
7484
ifnamed_agg['max_latency']<latency:
7585
named_agg['max_latency']=latency
7686

7787
forvalueinself.running_events.itervalues():
88+
7889
ifvalue['name']notinagg:
79-
agg[value['name']]=self.agg_template.copy()
90+
agg[value['name']]=copy.deepcopy(self.agg_template)
8091

8192
named_agg=agg[value['name']]
8293
latency= (datetime.datetime.now()-value['time']).total_seconds()
83-
if'started'innamed_agg:
84-
named_agg['running']+=1
85-
iflatency>named_agg['running_latency']:
86-
named_agg['running_latency']=latency
87-
else:
88-
named_agg['running']=1
94+
95+
named_agg['running']+=1
96+
ifnamed_agg['running_latency']<latency:
8997
named_agg['running_latency']=latency
9098

9199
self.last_aggregation=datetime.datetime.now()
100+
#print("aggregeted!")
92101
returnagg
93102

94103
defaggregate_by(self,period):

‎tests2/test_recovery.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
importsubprocess
44
fromlib.bank_clientimport*
55

6-
76
classRecoveryTest(unittest.TestCase):
87
defsetUp(self):
98
#subprocess.check_call(['blockade','up'])
@@ -24,41 +23,36 @@ def test_0_normal_operation(self):
2423
print('### normalOpsTest ###')
2524
print('Waiting 5s to check operability')
2625
time.sleep(5)
27-
26+
2827
forclientinself.clients:
2928
agg=client.history.aggregate()
3029
print(agg)
31-
self.assertTrue(agg['transfer']['commit']>0)
30+
self.assertTrue(agg['transfer']['finish']['Commit']>0)
3231

3332
deftest_1_node_disconnect(self):
3433
print('### disconnectTest ###')
3534

3635
subprocess.check_call(['blockade','partition','node3'])
3736
print('Node3 disconnected')
3837

39-
print('Waiting20s to discover failure')
38+
print('Waiting15s to discover failure')
4039

41-
whileTrue:
40+
foriinrange(5):
4241
time.sleep(3)
4342
forclientinself.clients:
4443
agg=client.history.aggregate()
4544
print(agg)
45+
print(" ")
46+
47+
subprocess.check_call(['blockade','join'])
4648

47-
# print('Waiting 3s to check operability')
48-
# time.sleep(3)
49-
# for client in self.clients:
50-
# agg = client.history.aggregate()
51-
# print(agg)
52-
#
53-
# subprocess.check_call(['blockade','join'])
54-
# print('Node3 connected back')
55-
#
56-
# print('Waiting 12s for catch-up')
57-
# time.sleep(12)
58-
#
59-
# for client in self.clients:
60-
# agg = client.history.aggregate()
61-
# print(agg)
49+
print('Waiting 15s to join node')
50+
foriinrange(1000):
51+
time.sleep(3)
52+
forclientinself.clients:
53+
agg=client.history.aggregate()
54+
print(agg)
55+
print(" ")
6256

6357

6458
if__name__=='__main__':

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp