Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd5facb5

Browse files
committed
2 parents639886b +1721363 commitd5facb5

File tree

7 files changed

+91
-126
lines changed

7 files changed

+91
-126
lines changed

‎contrib/mmts/Makefile‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ xcheck:
4747
cd tests2&& docker network rm tests2_net||true
4848
cd tests2&& docker network rm tests2_net||true
4949
cd tests2&& blockade up
50-
sleep15# wait for mmts init
50+
sleep20# wait for mmts init
5151
cd tests2&& python test_recovery.py||true
5252
#cd tests2 && blockade destroy
5353

‎contrib/mmts/multimaster.c‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -927,7 +927,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
927927
* Send notification only if ABORT happens during transaction processing at replicas,
928928
* do not send notification if ABORT is received from master
929929
*/
930-
MTM_LOG2("%d: send ABORT notification abort transaction %d to coordinator %d",MyProcPid,x->gtid.xid,x->gtid.node);
930+
MTM_LOG1("%d: send ABORT notification abort transaction %d to coordinator %d",MyProcPid,x->gtid.xid,x->gtid.node);
931931
if (ts==NULL) {
932932
Assert(TransactionIdIsValid(x->xid));
933933
ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,NULL);
@@ -1390,6 +1390,7 @@ bool MtmRefreshClusterStatus(bool nowait)
13901390
}
13911391
}elseif (BIT_CHECK(disabled,ts->gtid.node-1)) {// coordinator of transaction is on disabled node
13921392
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
1393+
MTM_LOG1("1) Rollback active transaction %d:%d:%d",ts->gtid.node,ts->gtid.xid,ts->xid);
13931394
MtmAbortTransaction(ts);
13941395
FinishPreparedTransaction(ts->gid, false);
13951396
}
@@ -1460,6 +1461,7 @@ void MtmOnNodeDisconnect(int nodeId)
14601461
}
14611462
}elseif (ts->gtid.node==nodeId) {//coordinator of transaction is on disabled node
14621463
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
1464+
MTM_LOG1("2) Rollback active transaction %d:%d",ts->gtid.node,ts->gtid.xid);
14631465
MtmAbortTransaction(ts);
14641466
FinishPreparedTransaction(ts->gid, false);
14651467
}

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,8 +567,9 @@ process_remote_commit(StringInfo in)
567567
{
568568
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
569569
gid=pq_getmsgstring(in);
570-
MTM_LOG3("%d: PGLOGICAL_ABORT_PREPARED commit: gid=%s",MyProcPid,gid);
570+
MTM_LOG1("%d: PGLOGICAL_ABORT_PREPARED commit: gid=%s",MyProcPid,gid);
571571
if (MtmGetGlobalTransactionStatus(gid)!=TRANSACTION_STATUS_ABORTED) {
572+
MTM_LOG1("%d: PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",MyProcPid,gid);
572573
StartTransactionCommand();
573574
MtmSetCurrentTransactionGID(gid);
574575
FinishPreparedTransaction(gid, false);

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
160160
}
161161
pq_sendbyte(out,'C');/* sending COMMIT */
162162

163-
MTM_LOG2("PGLOGICAL_SEND commit: event=%d, gid=%s, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx",flags,txn->gid,commit_lsn,txn->end_lsn,GetXLogInsertRecPtr());
163+
MTM_LOG1("PGLOGICAL_SEND commit: event=%d, gid=%s, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx",flags,txn->gid,commit_lsn,txn->end_lsn,GetXLogInsertRecPtr());
164164

165165
/* send the flags field */
166166
pq_sendbyte(out,flags);

‎contrib/mmts/tests2/lib/bank_client.py‎

Lines changed: 39 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -65,106 +65,68 @@ def print_error(self, arg, comment=''):
6565
ifself.show_errors:
6666
print('Node',self.node_id,'got error',arg,comment)
6767

68-
defcheck_total(self):
69-
conn,cur=self.connect()
70-
i=0
68+
defexec_tx(self,name,tx_block):
69+
conn=psycopg2.connect(self.connstr)
70+
cur=conn.cursor()
7171

7272
whileself.run.value:
73-
i+=1
74-
75-
event_id=self.history.register_start('total')
73+
event_id=self.history.register_start(name)
7674

77-
ifnotconn.closed:
75+
ifconn.closed:
76+
self.history.register_finish(event_id,'ReConnect')
7877
try :
7978
conn=psycopg2.connect(self.connstr)
8079
cur=conn.cursor()
8180
except :
82-
self.history.register_finish(event_id,'CantConnect')
83-
next
84-
85-
amount=1
86-
from_uid=random.randrange(1,self.accounts+1)
87-
to_uid=random.randrange(1,self.accounts+1)
81+
continue
82+
else :
83+
continue
8884

8985
try:
90-
cur.execute('select sum(amount) from bank_test')
91-
res=cur.fetchone()
92-
ifres[0]!=0:
93-
print("Isolation error, total = %d"% (res[0],))
94-
raiseBaseException
95-
exceptBaseException:
96-
raiseBaseException
86+
tx_block(conn,cur)
9787
exceptpsycopg2.InterfaceError:
9888
self.history.register_finish(event_id,'InterfaceError')
89+
exceptpsycopg2.Error:
90+
self.history.register_finish(event_id,'PsycopgError')
9991
except :
92+
print(sys.exc_info())
10093
self.history.register_finish(event_id,'OtherError')
10194
else :
102-
self.history.register_finish(event_id,'commit')
95+
self.history.register_finish(event_id,'Commit')
10396

10497
cur.close()
10598
conn.close()
10699

107-
deftransfer_money(self):
108-
conn,cur=self.connect()
109-
110-
i=0
100+
defcheck_total(self):
111101

112-
whileself.run.value:
113-
i+=1
102+
deftx(conn,cur):
103+
cur.execute('select sum(amount) from bank_test')
104+
res=cur.fetchone()
105+
ifres[0]!=0:
106+
print("Isolation error, total = %d"% (res[0],))
107+
raiseBaseException
114108

115-
event_id=self.history.register_start('transfer')
109+
self.exec_tx('total',tx)
116110

117-
ifnotconn.closed:
118-
try :
119-
conn=psycopg2.connect(self.connstr)
120-
cur=conn.cursor()
121-
except :
122-
self.history.register_finish(event_id,'CantConnect')
123-
next
111+
deftransfer_money(self):
124112

113+
deftx(conn,cur):
125114
amount=1
126-
from_uid=random.randrange(1,self.accounts+1)
127-
to_uid=random.randrange(1,self.accounts+1)
128-
129-
try:
130-
cur.execute('''update bank_test
131-
set amount = amount - %s
132-
where uid = %s''',
133-
(amount,from_uid))
134-
cur.execute('''update bank_test
135-
set amount = amount + %s
136-
where uid = %s''',
137-
(amount,to_uid))
138-
conn.commit()
139-
140-
exceptpsycopg2.InterfaceError:
141-
self.history.register_finish(event_id,'InterfaceError')
142-
except :
143-
self.history.register_finish(event_id,'OtherError')
144-
else :
145-
self.history.register_finish(event_id,'commit')
146-
147-
cur.close()
148-
conn.close()
149-
150-
defconnect(self,reconnect=False):
151-
152-
whileTrue:
153-
try:
154-
conn=psycopg2.connect(self.connstr)
155-
cur=conn.cursor()
156-
returnconn,cur
157-
except:
158-
self.print_error(sys.exc_info(),'2')
159-
ifnotreconnect:
160-
raise
161-
ifnotself.run.value:
162-
raise
163-
164-
# def watchdog(self):
165-
# while self.run.value:
166-
# time.sleep(1)
167-
# print('watchdog: ', self.history.aggregate())
115+
from_uid=random.randrange(1,self.accounts-10)
116+
to_uid=from_uid+1#random.randrange(1, self.accounts + 1)
117+
118+
conn.commit()
119+
cur.execute('''update bank_test
120+
set amount = amount - %s
121+
where uid = %s''',
122+
(amount,from_uid))
123+
cur.execute('''update bank_test
124+
set amount = amount + %s
125+
where uid = %s''',
126+
(amount,to_uid))
127+
conn.commit()
128+
129+
self.exec_tx('transfer',tx)
168130

169131
defstart(self):
170132
self.transfer_process=Process(target=self.transfer_money,args=())
@@ -173,9 +135,6 @@ def start(self):
173135
self.total_process=Process(target=self.check_total,args=())
174136
self.total_process.start()
175137

176-
#self.total_process = Process(target=self.watchdog, args=())
177-
#self.total_process.start()
178-
179138
return
180139

181140
defstop(self):

‎contrib/mmts/tests2/lib/event_history.py‎

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
importtime
22
importdatetime
33
importuuid
4+
importcopy
45
frommultiprocessingimportQueue
56

67

@@ -12,11 +13,10 @@ def __init__(self):
1213
self.running_events= {}
1314
self.last_aggregation=datetime.datetime.now()
1415
self.agg_template= {
15-
'commit':0,
16-
'rollback':0,
1716
'max_latency':0.0,
1817
'running':0,
19-
'running_latency':0.0
18+
'running_latency':0.0,
19+
'finish': {}
2020
}
2121

2222
defregister_start(self,name):
@@ -43,18 +43,21 @@ def load_queue(self):
4343
self.running_events[event['event_id']]=event
4444
else:
4545
# finish mark
46-
ifevent['event_id']inself.running_events:
47-
start_ev=self.running_events[event['event_id']]
48-
self.events.append({
49-
'name':start_ev['name'],
50-
'started_at':start_ev['time'],
51-
'finished_at':event['time'],
52-
'status':event['status']
53-
})
54-
self.running_events.pop(event['event_id'],None)
55-
else:
46+
ifevent['event_id']notinself.running_events:
5647
# found finish event without corresponding start
48+
print("ololololo!")
5749
raise
50+
51+
start_ev=self.running_events[event['event_id']]
52+
self.events.append({
53+
'name':start_ev['name'],
54+
'started_at':start_ev['time'],
55+
'finished_at':event['time'],
56+
'status':event['status']
57+
})
58+
self.running_events.pop(event['event_id'],None)
59+
60+
#print(self.events)
5861
return
5962

6063
defaggregate(self):
@@ -63,32 +66,38 @@ def aggregate(self):
6366
agg= {}
6467
forevinself.events:
6568
ifev['finished_at']<self.last_aggregation:
69+
#print("cont")
6670
continue
6771

6872
ifev['name']notinagg:
69-
agg[ev['name']]=self.agg_template.copy()
73+
agg[ev['name']]=copy.deepcopy(self.agg_template)
74+
#print('-=-=-', agg)
7075

7176
named_agg=agg[ev['name']]
7277
latency= (ev['finished_at']-ev['started_at']).total_seconds()
73-
named_agg[ev['status']]+=1
78+
79+
ifev['status']notinnamed_agg['finish']:
80+
named_agg['finish'][ev['status']]=1
81+
else:
82+
named_agg['finish'][ev['status']]+=1
83+
7484
ifnamed_agg['max_latency']<latency:
7585
named_agg['max_latency']=latency
7686

7787
forvalueinself.running_events.itervalues():
88+
7889
ifvalue['name']notinagg:
79-
agg[value['name']]=self.agg_template.copy()
90+
agg[value['name']]=copy.deepcopy(self.agg_template)
8091

8192
named_agg=agg[value['name']]
8293
latency= (datetime.datetime.now()-value['time']).total_seconds()
83-
if'started'innamed_agg:
84-
named_agg['running']+=1
85-
iflatency>named_agg['running_latency']:
86-
named_agg['running_latency']=latency
87-
else:
88-
named_agg['running']=1
94+
95+
named_agg['running']+=1
96+
ifnamed_agg['running_latency']<latency:
8997
named_agg['running_latency']=latency
9098

9199
self.last_aggregation=datetime.datetime.now()
100+
#print("aggregeted!")
92101
returnagg
93102

94103
defaggregate_by(self,period):

‎contrib/mmts/tests2/test_recovery.py‎

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
importsubprocess
44
fromlib.bank_clientimport*
55

6-
76
classRecoveryTest(unittest.TestCase):
87
defsetUp(self):
98
#subprocess.check_call(['blockade','up'])
@@ -24,41 +23,36 @@ def test_0_normal_operation(self):
2423
print('### normalOpsTest ###')
2524
print('Waiting 5s to check operability')
2625
time.sleep(5)
27-
26+
2827
forclientinself.clients:
2928
agg=client.history.aggregate()
3029
print(agg)
31-
self.assertTrue(agg['transfer']['commit']>0)
30+
self.assertTrue(agg['transfer']['finish']['Commit']>0)
3231

3332
deftest_1_node_disconnect(self):
3433
print('### disconnectTest ###')
3534

3635
subprocess.check_call(['blockade','partition','node3'])
3736
print('Node3 disconnected')
3837

39-
print('Waiting20s to discover failure')
38+
print('Waiting15s to discover failure')
4039

41-
whileTrue:
40+
foriinrange(5):
4241
time.sleep(3)
4342
forclientinself.clients:
4443
agg=client.history.aggregate()
4544
print(agg)
45+
print(" ")
46+
47+
subprocess.check_call(['blockade','join'])
4648

47-
# print('Waiting 3s to check operability')
48-
# time.sleep(3)
49-
# for client in self.clients:
50-
# agg = client.history.aggregate()
51-
# print(agg)
52-
#
53-
# subprocess.check_call(['blockade','join'])
54-
# print('Node3 connected back')
55-
#
56-
# print('Waiting 12s for catch-up')
57-
# time.sleep(12)
58-
#
59-
# for client in self.clients:
60-
# agg = client.history.aggregate()
61-
# print(agg)
49+
print('Waiting 15s to join node')
50+
foriinrange(1000):
51+
time.sleep(3)
52+
forclientinself.clients:
53+
agg=client.history.aggregate()
54+
print(agg)
55+
print(" ")
6256

6357

6458
if__name__=='__main__':

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp