Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4be7f76

Browse files
committed
Clean referee decision when all nodes are online; do not send requests to referee continously from disabled node
1 parent5135a59 commit4be7f76

File tree

8 files changed

+150
-52
lines changed

8 files changed

+150
-52
lines changed

‎multimaster--1.0.sql

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,11 @@ END
170170
$$
171171
LANGUAGE plpgsql;
172172

173-
CREATE OR REPLACEFUNCTIONmtm.referee_clean() RETURNSvoidAS
173+
CREATE OR REPLACEFUNCTIONmtm.referee_clean() RETURNSboolAS
174174
$$
175+
BEGIN
175176
deletefrommtm.referee_decisionwhere key='winner';
177+
return'true';
178+
END
176179
$$
177-
LANGUAGEsql;
180+
LANGUAGEplpgsql;

‎multimaster.c

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2376,6 +2376,7 @@ static void MtmInitialize()
23762376
Mtm->disabledNodeMask= (((nodemask_t)1 <<MtmNodes)-1);
23772377
Mtm->clique=0;
23782378
Mtm->refereeGrant= false;
2379+
Mtm->refereeWinnerId=0;
23792380
Mtm->stalledNodeMask=0;
23802381
Mtm->stoppedNodeMask=0;
23812382
Mtm->deadNodeMask=0;
@@ -4158,15 +4159,42 @@ static void erase_option_from_connstr(const char *option, char *connstr)
41584159
pfree(needle);
41594160
}
41604161

4161-
PGconn*PQconnectdb_safe(constchar*conninfo)
4162+
PGconn*
4163+
PQconnectdb_safe(constchar*conninfo,inttimeout)
41624164
{
41634165
PGconn*conn;
4166+
structtimevaltv= {timeout,0 };
41644167
char*safe_connstr=pstrdup(conninfo);
4165-
erase_option_from_connstr("arbiter_port",safe_connstr);
41664168

4167-
conn=PQconnectdb(safe_connstr);
4169+
/* XXXX add timeout to connstring if set */
41684170

4171+
erase_option_from_connstr("arbiter_port",safe_connstr);
4172+
conn=PQconnectdb(safe_connstr);
41694173
pfree(safe_connstr);
4174+
4175+
if (PQstatus(conn)!=CONNECTION_OK)
4176+
{
4177+
MTM_ELOG(WARNING,"Could not connect to '%s': %s",
4178+
safe_connstr,PQerrorMessage(conn));
4179+
}
4180+
4181+
if (timeout!=0)
4182+
{
4183+
intsocket_fd=PQsocket(conn);
4184+
4185+
if (socket_fd<0)
4186+
{
4187+
MTM_ELOG(WARNING,"Referee socket is invalid");
4188+
}
4189+
4190+
if (setsockopt(socket_fd,SOL_SOCKET,SO_RCVTIMEO,
4191+
(char*)&tv,sizeof(tv))<0)
4192+
{
4193+
MTM_ELOG(WARNING,"Could not set referee socket timeout: %s",
4194+
strerror(errno));
4195+
}
4196+
}
4197+
41704198
returnconn;
41714199
}
41724200

@@ -4202,7 +4230,7 @@ mtm_collect_cluster_info(PG_FUNCTION_ARGS)
42024230
SRF_RETURN_DONE(funcctx);
42034231
}
42044232

4205-
conn=PQconnectdb_safe(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
4233+
conn=PQconnectdb_safe(Mtm->nodes[usrfctx->nodeId-1].con.connStr,0);
42064234
if (PQstatus(conn)!=CONNECTION_OK)
42074235
{
42084236
MTM_ELOG(WARNING,"Failed to establish connection '%s' to node %d: error = %s",Mtm->nodes[usrfctx->nodeId-1].con.connStr,usrfctx->nodeId,PQerrorMessage(conn));
@@ -4411,7 +4439,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError, int force
44114439
{
44124440
if (!BIT_CHECK(disabledNodeMask,i)|| (i+1==forceOnNode))
44134441
{
4414-
conns[i]=PQconnectdb_safe(psprintf("%s application_name=%s",Mtm->nodes[i].con.connStr,MULTIMASTER_BROADCAST_SERVICE));
4442+
conns[i]=PQconnectdb_safe(psprintf("%s application_name=%s",Mtm->nodes[i].con.connStr,MULTIMASTER_BROADCAST_SERVICE),0);
44154443
if (PQstatus(conns[i])!=CONNECTION_OK)
44164444
{
44174445
if (ignoreError)

‎multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,8 @@ typedef struct
289289
TransactionIdoldestXid;/* XID of oldest transaction visible by any active transaction (local or global) */
290290
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes */
291291
nodemask_tclique;/* Bitmask of nodes that are connected and we allowed to connect/send wal/receive wal with them */
292-
boolrefereeGrant;/* Referee allowed us to work with half of the nodes */
292+
boolrefereeGrant;/* Referee allowed us to work with half of the nodes */
293+
intrefereeWinnerId;/* Node that won referee contest */
293294
nodemask_tdeadNodeMask;/* Bitmask of nodes considered as dead by referee */
294295
nodemask_trecoveredNodeMask;/* Bitmask of nodes recoverd after been reported as dead by referee */
295296
nodemask_tstalledNodeMask;/* Bitmask of stalled nodes (node with dropped replication slot which makes it not possible automatic recovery of such node) */
@@ -430,7 +431,7 @@ extern void MtmCheckHeartbeat(void);
430431
externvoidMtmResetTransaction(void);
431432
externvoidMtmUpdateLockGraph(intnodeId,voidconst*messageBody,intmessageSize);
432433
externvoidMtmReleaseRecoverySlot(intnodeId);
433-
externPGconn*PQconnectdb_safe(constchar*conninfo);
434+
externPGconn*PQconnectdb_safe(constchar*conninfo,inttimeout);
434435
externvoidMtmBeginSession(intnodeId);
435436
externvoidMtmEndSession(intnodeId,boolunlock);
436437
externvoidMtmFinishPreparedTransaction(MtmTransState*ts,boolcommit);

‎pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ pglogical_receiver_main(Datum main_arg)
296296
count=Mtm->recoveryCount;
297297

298298
/* Establish connection to remote server */
299-
conn=PQconnectdb_safe(connString);
299+
conn=PQconnectdb_safe(connString,0);
300300
status=PQstatus(conn);
301301
if (status!=CONNECTION_OK)
302302
{

‎referee.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ static void MtmRefereeLoop(char const** connections, int nConns)
5050
intresult;
5151

5252
for (i=0;i<nConns;i++) {
53-
conns[i]=PQconnectdb_safe(connections[i]);
53+
conns[i]=PQconnectdb_safe(connections[i],0);
5454
status=PQstatus(conns[i]);
5555
if (status!=CONNECTION_OK)
5656
{
@@ -74,7 +74,7 @@ static void MtmRefereeLoop(char const** connections, int nConns)
7474
/* Some of live node reestablished connection with dead node, so referee should also try to connect to this node */
7575
if (conns[i]==NULL) {
7676
if (BIT_CHECK(newEnabledMask,i)) {
77-
conns[i]=PQconnectdb_safe(connections[i]);
77+
conns[i]=PQconnectdb_safe(connections[i],0);
7878
status=PQstatus(conns[i]);
7979
if (status==CONNECTION_OK) {
8080
BIT_CLEAR(disabledMask,i);

‎state.c

Lines changed: 82 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ char const* const MtmEventMnem[] =
2525
"MTM_NONRECOVERABLE_ERROR"
2626
};
2727

28-
staticintMtmGetRefereeWinner(void);
28+
staticintMtmRefereeGetWinner(void);
29+
staticboolMtmRefereeClearWinner(void);
2930

3031
// XXXX: allocate in context and clean it
3132
staticchar*
@@ -81,7 +82,7 @@ MtmCheckState(void)
8182
// int nVotingNodes = MtmGetNumberOfVotingNodes();
8283
boolisEnabledState;
8384
intnEnabled=countZeroBits(Mtm->disabledNodeMask,Mtm->nAllNodes);
84-
intnConnected=countZeroBits(SELF_CONNECTIVITY_MASK,Mtm->nAllNodes);
85+
intnConnected=countZeroBits(EFFECTIVE_CONNECTIVITY_MASK,Mtm->nAllNodes);
8586
intnReceivers=Mtm->nAllNodes-countZeroBits(Mtm->pglogicalReceiverMask,Mtm->nAllNodes);
8687
intnSenders=Mtm->nAllNodes-countZeroBits(Mtm->pglogicalSenderMask,Mtm->nAllNodes);
8788

@@ -390,26 +391,46 @@ MtmRefreshClusterStatus()
390391
MtmCheckState();
391392

392393
/*
393-
* Check for refereedecidion whenpnly half of nodes are visible.
394+
* Check for refereedecision whenonly half of nodes are visible.
394395
*/
395-
if (MtmRefereeConnStr&&*MtmRefereeConnStr&& !Mtm->refereeGrant&&
396+
if (MtmRefereeConnStr&&*MtmRefereeConnStr&& !Mtm->refereeWinnerId&&
396397
countZeroBits(EFFECTIVE_CONNECTIVITY_MASK,Mtm->nAllNodes)==Mtm->nAllNodes/2)
397398
{
398-
intwinner_node_id=MtmGetRefereeWinner();
399-
if (winner_node_id!=-1&&
400-
!BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK,winner_node_id-1))
399+
intwinner_node_id=MtmRefereeGetWinner();
400+
401+
if (winner_node_id>0)
401402
{
402-
MTM_LOG1("[STATE] Referee allowed to proceed with half of the nodes (winner_id = %d)",
403-
winner_node_id);
404-
Mtm->refereeGrant= true;
405-
406-
MtmLock(LW_EXCLUSIVE);
407-
MtmEnableNode(MtmNodeId);
408-
MtmCheckState();
409-
MtmUnlock();
403+
Mtm->refereeWinnerId=winner_node_id;
404+
if (!BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK,winner_node_id-1))
405+
{
406+
MTM_LOG1("[STATE] Referee allowed to proceed with half of the nodes (winner_id = %d)",
407+
winner_node_id);
408+
Mtm->refereeGrant= true;
409+
MtmLock(LW_EXCLUSIVE);
410+
MtmEnableNode(MtmNodeId);
411+
MtmCheckState();
412+
MtmUnlock();
413+
}
410414
}
411415
}
412416

417+
/*
418+
* Clear winner if we again have all nodes are online.
419+
*/
420+
if (MtmRefereeConnStr&&*MtmRefereeConnStr&&Mtm->refereeWinnerId&&
421+
countZeroBits(EFFECTIVE_CONNECTIVITY_MASK,Mtm->nAllNodes)==Mtm->nAllNodes)
422+
{
423+
if (MtmRefereeClearWinner())
424+
{
425+
Mtm->refereeWinnerId=0;
426+
Mtm->refereeGrant= false;
427+
MTM_LOG1("[STATE] Cleaning old referee decision");
428+
}
429+
}
430+
431+
/* Do not check clique with referee grant */
432+
if (Mtm->refereeGrant)
433+
return;
413434

414435
/*
415436
* Check for clique.
@@ -473,37 +494,17 @@ MtmRefreshClusterStatus()
473494
}
474495

475496
staticint
476-
MtmGetRefereeWinner(void)
497+
MtmRefereeGetWinner(void)
477498
{
478-
intsocket_fd;
479499
PGconn*conn;
480500
PGresult*res;
481-
structtimevaltimeout= {5,0 };
482501
charsql[128];
483502
intwinner_node_id;
484503

485-
conn=PQconnectdb_safe(MtmRefereeConnStr);
504+
conn=PQconnectdb_safe(MtmRefereeConnStr,5);
486505
if (PQstatus(conn)!=CONNECTION_OK)
487506
{
488-
MTM_ELOG(WARNING,"Could not connect to referee (%s): %s",
489-
MtmRefereeConnStr,PQerrorMessage(conn));
490-
PQfinish(conn);
491-
return-1;
492-
}
493-
494-
socket_fd=PQsocket(conn);
495-
if (socket_fd<0)
496-
{
497-
MTM_ELOG(WARNING,"Referee socket is invalid");
498-
PQfinish(conn);
499-
return-1;
500-
}
501-
502-
if (setsockopt(socket_fd,SOL_SOCKET,SO_RCVTIMEO,
503-
(char*)&timeout,sizeof(timeout))<0)
504-
{
505-
MTM_ELOG(WARNING,"Could not set referee socket timeout: %s",
506-
strerror(errno));
507+
MTM_ELOG(WARNING,"Could not connect to referee");
507508
PQfinish(conn);
508509
return-1;
509510
}
@@ -540,3 +541,46 @@ MtmGetRefereeWinner(void)
540541
returnwinner_node_id;
541542
}
542543

544+
staticbool
545+
MtmRefereeClearWinner(void)
546+
{
547+
PGconn*conn;
548+
PGresult*res;
549+
char*response;
550+
551+
conn=PQconnectdb_safe(MtmRefereeConnStr,5);
552+
if (PQstatus(conn)!=CONNECTION_OK)
553+
{
554+
MTM_ELOG(WARNING,"Could not connect to referee");
555+
PQfinish(conn);
556+
return false;
557+
}
558+
559+
res=PQexec(conn,"select mtm.referee_clean()");
560+
if (PQresultStatus(res)!=PGRES_TUPLES_OK||
561+
PQntuples(res)!=1||
562+
PQnfields(res)!=1)
563+
{
564+
MTM_ELOG(WARNING,"Refusing unexpected result (r=%d, n=%d, w=%d, k=%s) from referee_clean().",
565+
PQresultStatus(res),PQntuples(res),PQnfields(res),PQgetvalue(res,0,0));
566+
PQclear(res);
567+
PQfinish(conn);
568+
return false;
569+
}
570+
571+
response=PQgetvalue(res,0,0);
572+
573+
if (false)
574+
{
575+
MTM_ELOG(WARNING,"Wrong response from referee");
576+
PQclear(res);
577+
PQfinish(conn);
578+
return false;
579+
}
580+
581+
/* Ok, we finally got it! */
582+
MTM_LOG1("Got referee clear response %s",response);
583+
PQclear(res);
584+
PQfinish(conn);
585+
return true;
586+
}

‎tests2/lib/test_helper.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def assertNoCommits(self, aggs):
3434
ifcommits:
3535
raiseAssertionError('There are commits during aggregation interval')
3636

37-
defperformFailure(self,failure):
37+
defperformFailure(self,failure,wait=0):
3838

3939
time.sleep(TEST_WARMING_TIME)
4040

@@ -51,7 +51,7 @@ def performFailure(self, failure):
5151
aggs_failure=self.client.get_aggregates()
5252

5353

54-
#time.sleep(10000)
54+
time.sleep(wait)
5555
failure.stop()
5656

5757
print('Eliminate failure at ',datetime.datetime.utcnow())

‎tests2/test_major.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,28 @@ def test_partition_referee(self):
9595
self.assertCommits(aggs)
9696
self.assertIsolation(aggs)
9797

98+
deftest_double_failure_referee(self):
99+
print('### test_double_failure_referee ###')
100+
101+
aggs_failure,aggs=self.performFailure(SingleNodePartition('node2'))
102+
103+
self.assertCommits(aggs_failure[:1])
104+
self.assertNoCommits(aggs_failure[1:])
105+
self.assertIsolation(aggs_failure)
106+
107+
self.assertCommits(aggs)
108+
self.assertIsolation(aggs)
109+
110+
aggs_failure,aggs=self.performFailure(SingleNodePartition('node1'))
111+
112+
self.assertNoCommits(aggs_failure[:1])
113+
self.assertCommits(aggs_failure[1:])
114+
self.assertIsolation(aggs_failure)
115+
116+
self.assertCommits(aggs)
117+
self.assertIsolation(aggs)
118+
119+
98120
if__name__=='__main__':
99121
unittest.main()
100122

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp