Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitcd98380

Browse files
committed
nested aggregates
1 parentc399553 commitcd98380

File tree

3 files changed

+37
-41
lines changed

3 files changed

+37
-41
lines changed

‎tests2/lib/bank_client.py

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -111,20 +111,24 @@ def status(self):
111111
whileself.running:
112112
msg=yieldfromself.child_pipe.coro_recv()
113113
ifmsg=='status':
114-
# print('evloop: got status request')
115114
serialized_aggs= {}
116-
forname,aggregateinself.aggregates.items():
117-
serialized_aggs[name]=aggregate.as_dict()
118-
aggregate.clear_values()
115+
116+
forconn_id,conn_aggsinself.aggregates.items():
117+
serialized_aggs[conn_id]= {}
118+
foraggname,agginconn_aggs.items():
119+
serialized_aggs[conn_id][aggname]=agg.as_dict()
120+
agg.clear_values()
121+
119122
self.child_pipe.send(serialized_aggs)
120-
# print('evloop: sent status response')
121123
else:
122124
print('evloop: unknown message')
123125

124126
@asyncio.coroutine
125127
defexec_tx(self,tx_block,aggname_prefix,conn_i):
126128
aggname="%s_%i"% (aggname_prefix,conn_i)
127-
agg=self.aggregates[aggname]=MtmTxAggregate(aggname)
129+
ifconn_inotinself.aggregates:
130+
self.aggregates[conn_i]= {}
131+
agg=self.aggregates[conn_i][aggname_prefix]=MtmTxAggregate(aggname)
128132
pool=yieldfromaiopg.create_pool(self.dsns[conn_i])
129133
conn=yieldfrompool.acquire()
130134
cur=yieldfromconn.cursor()
@@ -167,8 +171,8 @@ def total_tx(self, conn, cur, agg):
167171
total=yieldfromcur.fetchone()
168172
iftotal[0]!=0:
169173
agg.isolation+=1
170-
#print(self.oops)
171-
#print('Isolation error, total = ', total[0])
174+
print(self.oops)
175+
print('Isolation error, total = ',total[0])
172176
# yield from cur.execute('select * from mtm.get_nodes_state()')
173177
# nodes_state = yield from cur.fetchall()
174178
# for i, col in enumerate(self.nodes_state_fields):
@@ -177,7 +181,6 @@ def total_tx(self, conn, cur, agg):
177181
# print("%19s" % nodes_state[j][i], end="\t")
178182
# print("\n")
179183

180-
181184
defrun(self):
182185
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
183186
self.loop=asyncio.get_event_loop()
@@ -196,13 +199,9 @@ def bgrun(self):
196199
self.evloop_process=multiprocessing.Process(target=self.run,args=())
197200
self.evloop_process.start()
198201

199-
# XXX: introduce periodic report from client?
200202
defget_aggregates(self,print=True):
201-
# print('test: sending status request')
202203
self.parent_pipe.send('status')
203-
# print('test: awaitng status response')
204204
resp=self.parent_pipe.recv()
205-
# print('test: got status response')
206205
ifprint:
207206
MtmClient.print_aggregates(resp)
208207
returnresp
@@ -216,7 +215,7 @@ def stop(self):
216215
self.evloop_process.terminate()
217216

218217
@classmethod
219-
defprint_aggregates(cls,serialized_agg):
218+
defprint_aggregates(cls,aggs):
220219
columns= ['running_latency','max_latency','isolation','finish']
221220

222221
# print table header
@@ -225,23 +224,18 @@ def print_aggregates(cls, serialized_agg):
225224
print(col,end="\t")
226225
print("\n",end="")
227226

228-
serialized_agg
229-
230-
foraggnameinsorted(serialized_agg.keys()):
231-
agg=serialized_agg[aggname]
232-
print("%s\t"%aggname,end="")
233-
forcolincolumns:
234-
ifcolinagg:
227+
forconn_idinaggs.keys():
228+
foraggnameinaggs[conn_id].keys():
229+
agg=aggs[conn_id][aggname]
230+
print("Node %d: %s\t"% (conn_id+1,aggname),end="")
231+
forcolincolumns:
235232
ifisinstance(agg[col],float):
236233
print("%.2f\t"% (agg[col],),end="\t")
237234
else:
238235
print(agg[col],end="\t")
239-
else:
240-
print("-\t",end="")
241-
print("")
236+
print("")
242237
print("")
243238

244-
245239
if__name__=="__main__":
246240
c=MtmClient(['dbname=postgres user=postgres host=127.0.0.1',
247241
'dbname=postgres user=postgres host=127.0.0.1 port=5433',

‎tests2/lib/failure_injector.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22

33
classFailureInjector(object):
44

5-
def__init__(self):
6-
self.docker_api=docker.Client()
5+
#def __init__(self):
6+
# self.docker_api = docker.Client()
77

88
defcontainer_exec(self,node,command):
9+
self.docker_api=docker.Client()
910
exec_id=self.docker_api.exec_create(node,command,user='root')
1011
output=self.docker_api.exec_start(exec_id)
12+
self.docker_api.close()
1113
# print(command, ' -> ', output)
1214

1315

@@ -39,7 +41,7 @@ def start(self):
3941
self.container_exec(self.nodeA,"iptables -A OUTPUT -s {} -j DROP".format(self.nodeB) )
4042

4143
defstop(self):
42-
self.container_exec(self.nodeA,"iptables -D INPUT-s {} -j DROP".format(self.nodeB))
44+
self.container_exec(self.nodeA,"iptables -D INPUT -s {} -j DROP".format(self.nodeB))
4345
self.container_exec(self.nodeA,"iptables -D OUTPUT -s {} -j DROP".format(self.nodeB))
4446

4547

‎tests2/test_recovery.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,19 @@ def test_node_partition(self):
7171
time.sleep(TEST_RECOVERY_TIME)
7272
aggs=self.client.get_aggregates()
7373

74-
self.assertTrue('commit'inaggs_failure['transfer_0']['finish'] )
75-
self.assertTrue('commit'inaggs_failure['transfer_1']['finish'] )
76-
self.assertTrue('commit'notinaggs_failure['transfer_2']['finish'] )
77-
self.assertTrue(aggs_failure['sumtotal_0']['isolation']==0)
78-
self.assertTrue(aggs_failure['sumtotal_1']['isolation']==0)
79-
self.assertTrue(aggs_failure['sumtotal_2']['isolation']==0)
80-
81-
self.assertTrue('commit'inaggs['transfer_0']['finish'] )
82-
self.assertTrue('commit'inaggs['transfer_1']['finish'] )
83-
self.assertTrue('commit'inaggs['transfer_2']['finish'] )
84-
self.assertTrue(aggs['sumtotal_0']['isolation']==0)
85-
self.assertTrue(aggs['sumtotal_1']['isolation']==0)
86-
self.assertTrue(aggs['sumtotal_2']['isolation']==0)
74+
self.assertTrue('commit'inaggs_failure[0]['transfer']['finish'] )
75+
self.assertTrue('commit'inaggs_failure[1]['transfer']['finish'] )
76+
self.assertTrue('commit'notinaggs_failure[2]['transfer']['finish'] )
77+
self.assertTrue(aggs_failure[0]['sumtotal']['isolation']==0)
78+
self.assertTrue(aggs_failure[1]['sumtotal']['isolation']==0)
79+
self.assertTrue(aggs_failure[2]['sumtotal']['isolation']==0)
80+
81+
self.assertTrue('commit'inaggs[0]['transfer']['finish'] )
82+
self.assertTrue('commit'inaggs[1]['transfer']['finish'] )
83+
self.assertTrue('commit'inaggs[2]['transfer']['finish'] )
84+
self.assertTrue(aggs[0]['sumtotal']['isolation']==0)
85+
self.assertTrue(aggs[1]['sumtotal']['isolation']==0)
86+
self.assertTrue(aggs[2]['sumtotal']['isolation']==0)
8787

8888

8989
deftest_edge_partition(self):

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp