Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5571df8

Browse files
committed
brand new reconnection logic
1 parentbe1b3f6 commit5571df8

File tree

3 files changed

+67
-21
lines changed

3 files changed

+67
-21
lines changed

‎multimaster.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3850,7 +3850,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
38503850
StartTransactionCommand();
38513851
if (x->status==TRANSACTION_STATUS_ABORTED) {
38523852
FinishPreparedTransaction(x->gid, false);
3853-
elog(ERROR,"Transaction%s isaborted by DTM",x->gid);
3853+
elog(ERROR,"Transaction aborted by DTM");
38543854
}else {
38553855
FinishPreparedTransaction(x->gid, true);
38563856
}

‎tests2/lib/bank_client.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
importaiopg
55
importrandom
66
importpsycopg2
7+
frompsycopg2.extensionsimport*
78
importtime
89
importdatetime
910
importcopy
1011
importaioprocessing
1112
importmultiprocessing
13+
importlogging
1214

1315
classMtmTxAggregate(object):
1416

@@ -57,6 +59,7 @@ def keep_trying(tries, delay, method, name, *args, **kwargs):
5759
classMtmClient(object):
5860

5961
def__init__(self,dsns,n_accounts=100000):
62+
# logging.basicConfig(level=logging.DEBUG)
6063
self.n_accounts=n_accounts
6164
self.dsns=dsns
6265
self.aggregates= {}
@@ -129,21 +132,41 @@ def exec_tx(self, tx_block, aggname_prefix, conn_i):
129132
ifconn_inotinself.aggregates:
130133
self.aggregates[conn_i]= {}
131134
agg=self.aggregates[conn_i][aggname_prefix]=MtmTxAggregate(aggname)
132-
pool=yieldfromaiopg.create_pool(self.dsns[conn_i])
133-
conn=yieldfrompool.acquire()
134-
cur=yieldfromconn.cursor()
135+
dsn=self.dsns[conn_i]
136+
137+
conn=cur=False
138+
135139
whileself.running:
136140
agg.start_tx()
141+
137142
try:
138-
# yield from cur.execute('commit')
143+
if (notconn)orconn.closed:
144+
# enable_hstore tries to perform select from database
145+
# which in case of select's failure will lead to exception
146+
# and stale connection to the database
147+
conn=yieldfromaiopg.connect(dsn,enable_hstore=False)
148+
print("reconnected")
149+
150+
if (notcur)orcur.closed:
151+
cur=yieldfromconn.cursor()
152+
153+
# ROLLBACK tx after previous exception.
154+
# Doing this here instead of except handler to stay inside try
155+
# block.
156+
status=yieldfromconn.get_transaction_status()
157+
ifstatus!=TRANSACTION_STATUS_IDLE:
158+
yieldfromcur.execute('rollback')
159+
139160
yieldfromtx_block(conn,cur,agg)
140161
agg.finish_tx('commit')
141-
exceptpsycopg2.OperationalErrorase:
142-
ifnotcur.closed:
143-
yieldfromcur.execute('rollback')
144-
agg.finish_tx('operational_rollback')
162+
145163
exceptpsycopg2.Errorase:
146-
agg.finish_tx(e.pgerror)
164+
agg.finish_tx(str(e).strip())
165+
# Give evloop some free time.
166+
# In case of continuous excetions we can loop here without returning
167+
# back to event loop and block it
168+
yieldfromasyncio.sleep(0.01)
169+
147170
print("We've count to infinity!")
148171

149172
@asyncio.coroutine
@@ -199,14 +222,14 @@ def bgrun(self):
199222
self.evloop_process=multiprocessing.Process(target=self.run,args=())
200223
self.evloop_process.start()
201224

202-
defget_aggregates(self,print=True):
225+
defget_aggregates(self,_print=True):
203226
self.parent_pipe.send('status')
204227
resp=self.parent_pipe.recv()
205-
ifprint:
228+
if_print:
206229
MtmClient.print_aggregates(resp)
207230
returnresp
208231

209-
defclean_aggregates(self,print=True):
232+
defclean_aggregates(self):
210233
self.parent_pipe.send('status')
211234
self.parent_pipe.recv()
212235

‎tests2/test_recovery.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
importsubprocess
88
importdatetime
99
importdocker
10+
importwarnings
1011

1112
fromlib.bank_clientimportMtmClient
1213
fromlib.failure_injectorimport*
@@ -63,6 +64,7 @@ def setUpClass(self):
6364

6465
# XXX: add normal wait here
6566
time.sleep(20)
67+
print('started')
6668
self.client=MtmClient([
6769
"dbname=regression user=postgres host=127.0.0.1 port=15432",
6870
"dbname=regression user=postgres host=127.0.0.1 port=15433",
@@ -77,8 +79,11 @@ def tearDownClass(self):
7779
# XXX: check nodes data identity here
7880
subprocess.check_call(['docker-compose','down'])
7981

82+
defsetUp(self):
83+
warnings.simplefilter("ignore",ResourceWarning)
84+
8085
deftest_normal_operations(self):
81-
print('###normal_operations ###')
86+
print('###test_normal_operations ###')
8287

8388
aggs_failure,aggs=self.performFailure(NoFailure())
8489

@@ -90,7 +95,7 @@ def test_normal_operations(self):
9095

9196

9297
deftest_node_partition(self):
93-
print('###nodePartitionTest ###')
98+
print('###test_node_partition ###')
9499

95100
aggs_failure,aggs=self.performFailure(SingleNodePartition('node3'))
96101

@@ -103,7 +108,7 @@ def test_node_partition(self):
103108

104109

105110
deftest_edge_partition(self):
106-
print('###edgePartitionTest ###')
111+
print('###test_edge_partition ###')
107112

108113
aggs_failure,aggs=self.performFailure(EdgePartition('node2','node3'))
109114

@@ -114,13 +119,31 @@ def test_edge_partition(self):
114119
self.assertCommits(aggs)
115120
self.assertIsolation(aggs)
116121

117-
subprocess.check_call(['blockade','join'])
118-
print("Node3 joined back")
122+
deftest_node_restart(self):
123+
print('### test_node_restart ###')
124+
125+
time.sleep(3)
126+
127+
aggs_failure,aggs=self.performFailure(RestartNode('node3'))
119128

120-
foriinrange(50):
121-
time.sleep(3)
122-
self.clients.print_agg()
129+
self.assertCommits(aggs_failure[:2])
130+
self.assertNoCommits(aggs_failure[2:])
131+
self.assertIsolation(aggs_failure)
132+
133+
self.assertCommits(aggs)
134+
self.assertIsolation(aggs)
123135

136+
deftest_node_crash(self):
137+
print('### test_node_crash ###')
138+
139+
aggs_failure,aggs=self.performFailure(CrashRecoverNode('node3'))
140+
141+
self.assertCommits(aggs_failure[:2])
142+
self.assertNoCommits(aggs_failure[2:])
143+
self.assertIsolation(aggs_failure)
144+
145+
self.assertCommits(aggs)
146+
self.assertIsolation(aggs)
124147

125148
if__name__=='__main__':
126149
unittest.main()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp