Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit57564b3

Browse files
committed
Arbitrator fixes
1 parent75eaad0 commit57564b3

File tree

3 files changed

+45
-20
lines changed

3 files changed

+45
-20
lines changed

‎contrib/mmts/arbitrator/arbitrator.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ int main (int argc, char* argv[])
103103
queries[i] = pipes[i]->insert(sql);
104104
}
105105
sleep(cfg.timeout);
106-
enabledMask =disabledMask;
106+
enabledMask =0;
107107
for (size_t i =0; i < nConns; i++) {
108108
if (!BIT_CHECK(didsabledMask, i)) {
109109
if (!pipes[i]->is_finished(queries[i]))
@@ -113,8 +113,14 @@ int main (int argc, char* argv[])
113113
delete conns[i];
114114
conns[i] =NULL;
115115
}else {
116-
result r = pipes[i]->retrieve(results[i]);
117-
enabledMask &= ~r[0][0].as(nodemask_t());
116+
try {
117+
result r = pipes[i]->retrieve(results[i]);
118+
enabledMask |= r[0][0].as(nodemask_t());
119+
}catch (pqxx_exceptionconst& x) {
120+
delete conns[i];
121+
conns[i] =NULL;
122+
fprintf(stderr,"Failed to retrieve result from node %d: %s\n", (int)i+1, x.base().what());
123+
}
118124
}
119125
}
120126
}

‎contrib/mmts/multimaster.c

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1964,6 +1964,7 @@ static void MtmEnableNode(int nodeId)
19641964
if (BIT_SET(Mtm->disabledNodeMask,nodeId-1)) {
19651965
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
19661966
BIT_CLEAR(Mtm->reconnectMask,nodeId-1);
1967+
BIT_SET(Mtm->recoveredNodeMask,nodeId-1);
19671968
Mtm->nConfigChanges+=1;
19681969
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
19691970
Mtm->nodes[nodeId-1].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
@@ -2132,6 +2133,7 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
21322133
Assert(BIT_CHECK(Mtm->disabledNodeMask,nodeId-1));
21332134
BIT_CLEAR(Mtm->originLockNodeMask,nodeId-1);
21342135
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
2136+
BIT_SET(Mtm->recoveredNodeMask,nodeId-1);
21352137
Mtm->nLiveNodes+=1;
21362138
MtmCheckQuorum();
21372139
}else {
@@ -2271,6 +2273,22 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix)
22712273
}
22722274

22732275

2276+
staticintMtmGetNumberOfVotingNodes()
2277+
{
2278+
inti;
2279+
intnVotingNodes=Mtm->nAllNodes;
2280+
notebask_tdeadNodeMask=Mtm->deadNodeMask;
2281+
for (i=0;deadNodeMask!=0;i++) {
2282+
if (BIT_CHECK(deadNodeMask,i)) {
2283+
if (!BIT_CHECK(newClique,i)) {
2284+
nVotingNodes-=1;
2285+
}
2286+
BIT_CLEAR(deadNodeMask,i);
2287+
}
2288+
}
2289+
returnnVotingNodes;
2290+
}
2291+
22742292
/**
22752293
* Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
22762294
* This function is called by arbiter monitor process with period MtmHeartbeatSendTimeout
@@ -2281,9 +2299,7 @@ void MtmRefreshClusterStatus()
22812299
nodemask_tmatrix[MAX_NODES];
22822300
intcliqueSize;
22832301
nodemask_toldClique= ~Mtm->disabledNodeMask& (((nodemask_t)1 <<Mtm->nAllNodes)-1);
2284-
nodemask_tarbitratorDisabledMask;
22852302
intnVotingNodes;
2286-
inti;
22872303

22882304
MtmBuildConnectivityMatrix(matrix);
22892305
newClique=MtmFindMaxClique(matrix,Mtm->nAllNodes,&cliqueSize);
@@ -2304,16 +2320,7 @@ void MtmRefreshClusterStatus()
23042320
newClique=MtmFindMaxClique(matrix,Mtm->nAllNodes,&cliqueSize);
23052321
}while (newClique!=oldClique);
23062322

2307-
nVotingNodes=Mtm->nAllNodes;
2308-
arbitratorDisabledMask=Mtm->arbitratorDisabledMask;
2309-
for (i=0;arbitratorDisabledMask!=0;i++) {
2310-
if (BIT_CHECK(arbitratorDisabledMask,i)) {
2311-
if (!BIT_CHECK(newClique,i)) {
2312-
nVotingNodes-=1;
2313-
}
2314-
BIT_CLEAR(arbitratorDisabledMask,i);
2315-
}
2316-
}
2323+
nVotingNodes=MtmGetNumberOfVotingNodes();
23172324
if (cliqueSize >=nVotingNodes/2+1|| (cliqueSize== (nVotingNodes+1)/2&&MtmMajorNode)) {/* have quorum */
23182325
fprintf(stderr,"Old mask: ");
23192326
for (i=0;i<Mtm->nAllNodes;i++) {
@@ -2378,7 +2385,9 @@ void MtmRefreshClusterStatus()
23782385
*/
23792386
voidMtmCheckQuorum(void)
23802387
{
2381-
if (Mtm->nLiveNodes >=Mtm->nAllNodes/2+1|| (Mtm->nLiveNodes== (Mtm->nAllNodes+1)/2&&MtmMajorNode)) {/* have quorum */
2388+
intnVotingNodes=MtmGetNumberOfVotingNodes();
2389+
2390+
if (Mtm->nLiveNodes >=nVotingNodes/2+1|| (Mtm->nLiveNodes== (nVotingNodes+1)/2&&MtmMajorNode)) {/* have quorum */
23822391
if (Mtm->status==MTM_IN_MINORITY) {
23832392
MTM_LOG1("Node is in majority: disabled mask %llx",Mtm->disabledNodeMask);
23842393
MtmSwitchClusterMode(MTM_ONLINE);
@@ -2626,7 +2635,8 @@ static void MtmInitialize()
26262635
Mtm->disabledNodeMask=0;
26272636
Mtm->stalledNodeMask=0;
26282637
Mtm->stoppedNodeMask=0;
2629-
Mtm->arbitratorDisabledMask=0;
2638+
Mtm->deadNodeMask=0;
2639+
Mtm->recoveredNodeMask=0;
26302640
Mtm->pglogicalReceiverMask=0;
26312641
Mtm->pglogicalSenderMask=0;
26322642
Mtm->inducedLockNodeMask=0;
@@ -5444,6 +5454,14 @@ Datum mtm_check_deadlock(PG_FUNCTION_ARGS)
54445454

54455455
Datummtm_arbitrator_poll(PG_FUNCTION_ARGS)
54465456
{
5447-
Mtm->arbitratorDisabledMask=PG_GETARG_INT64(0);
5448-
PG_RETURN_INT64(Mtm->disabledNodeMask);
5457+
nodemask_trecoveredNodeMask;
5458+
5459+
MtmLock(LW_EXCLUSIVE);
5460+
recoveredNodeMask=Mtm->recoveredNodeMask;
5461+
Mtm->deadNodeMask=PG_GETARG_INT64(0);
5462+
Mtm->recoveredNodeMask &= ~Mtm->deadNodeMask;
5463+
MtmCheckQuorum();
5464+
MtmUnlock();
5465+
5466+
PG_RETURN_INT64(recoveredNodeMask);
54495467
}

‎contrib/mmts/multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,8 @@ typedef struct
278278
LWLockPadded*locks;/* multimaster lock tranche */
279279
TransactionIdoldestXid;/* XID of oldest transaction visible by any active transaction (local or global) */
280280
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes */
281-
nodemask_tarbitratorDisabledMask;/* Bitmask of node disabled by arbitrator */
281+
nodemask_tdeadNodeMask;/* Bitmask of nodes considered as dead by arbitrator */
282+
nodemask_trecoveredNodeMask;/* Bitmask of nodes recoverd after been reported as dead by arbitrator */
282283
nodemask_tstalledNodeMask;/* Bitmask of stalled nodes (node with dropped replication slot which makes it not possible automatic recovery of such node) */
283284
nodemask_tstoppedNodeMask;/* Bitmask of stopped (permanently disabled nodes) */
284285
nodemask_tpglogicalReceiverMask;/* bitmask of started pglogic receivers */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp