Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd2097bb

Browse files
knizhnikkelvich
authored andcommitted
Arbitrator fixes
1 parent8091c2c commitd2097bb

File tree

3 files changed

+45
-20
lines changed

3 files changed

+45
-20
lines changed

‎arbitrator/arbitrator.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ int main (int argc, char* argv[])
103103
queries[i] = pipes[i]->insert(sql);
104104
}
105105
sleep(cfg.timeout);
106-
enabledMask =disabledMask;
106+
enabledMask =0;
107107
for (size_t i =0; i < nConns; i++) {
108108
if (!BIT_CHECK(didsabledMask, i)) {
109109
if (!pipes[i]->is_finished(queries[i]))
@@ -113,8 +113,14 @@ int main (int argc, char* argv[])
113113
delete conns[i];
114114
conns[i] =NULL;
115115
}else {
116-
result r = pipes[i]->retrieve(results[i]);
117-
enabledMask &= ~r[0][0].as(nodemask_t());
116+
try {
117+
result r = pipes[i]->retrieve(results[i]);
118+
enabledMask |= r[0][0].as(nodemask_t());
119+
}catch (pqxx_exceptionconst& x) {
120+
delete conns[i];
121+
conns[i] =NULL;
122+
fprintf(stderr,"Failed to retrieve result from node %d: %s\n", (int)i+1, x.base().what());
123+
}
118124
}
119125
}
120126
}

‎multimaster.c

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1965,6 +1965,7 @@ static void MtmEnableNode(int nodeId)
19651965
if (BIT_SET(Mtm->disabledNodeMask,nodeId-1)) {
19661966
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
19671967
BIT_CLEAR(Mtm->reconnectMask,nodeId-1);
1968+
BIT_SET(Mtm->recoveredNodeMask,nodeId-1);
19681969
Mtm->nConfigChanges+=1;
19691970
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
19701971
Mtm->nodes[nodeId-1].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
@@ -2133,6 +2134,7 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
21332134
Assert(BIT_CHECK(Mtm->disabledNodeMask,nodeId-1));
21342135
BIT_CLEAR(Mtm->originLockNodeMask,nodeId-1);
21352136
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
2137+
BIT_SET(Mtm->recoveredNodeMask,nodeId-1);
21362138
Mtm->nLiveNodes+=1;
21372139
MtmCheckQuorum();
21382140
}else {
@@ -2272,6 +2274,22 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix)
22722274
}
22732275

22742276

2277+
staticintMtmGetNumberOfVotingNodes()
2278+
{
2279+
inti;
2280+
intnVotingNodes=Mtm->nAllNodes;
2281+
notebask_tdeadNodeMask=Mtm->deadNodeMask;
2282+
for (i=0;deadNodeMask!=0;i++) {
2283+
if (BIT_CHECK(deadNodeMask,i)) {
2284+
if (!BIT_CHECK(newClique,i)) {
2285+
nVotingNodes-=1;
2286+
}
2287+
BIT_CLEAR(deadNodeMask,i);
2288+
}
2289+
}
2290+
returnnVotingNodes;
2291+
}
2292+
22752293
/**
22762294
* Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
22772295
* This function is called by arbiter monitor process with period MtmHeartbeatSendTimeout
@@ -2282,9 +2300,7 @@ void MtmRefreshClusterStatus()
22822300
nodemask_tmatrix[MAX_NODES];
22832301
intcliqueSize;
22842302
nodemask_toldClique= ~Mtm->disabledNodeMask& (((nodemask_t)1 <<Mtm->nAllNodes)-1);
2285-
nodemask_tarbitratorDisabledMask;
22862303
intnVotingNodes;
2287-
inti;
22882304

22892305
MtmBuildConnectivityMatrix(matrix);
22902306
newClique=MtmFindMaxClique(matrix,Mtm->nAllNodes,&cliqueSize);
@@ -2305,16 +2321,7 @@ void MtmRefreshClusterStatus()
23052321
newClique=MtmFindMaxClique(matrix,Mtm->nAllNodes,&cliqueSize);
23062322
}while (newClique!=oldClique);
23072323

2308-
nVotingNodes=Mtm->nAllNodes;
2309-
arbitratorDisabledMask=Mtm->arbitratorDisabledMask;
2310-
for (i=0;arbitratorDisabledMask!=0;i++) {
2311-
if (BIT_CHECK(arbitratorDisabledMask,i)) {
2312-
if (!BIT_CHECK(newClique,i)) {
2313-
nVotingNodes-=1;
2314-
}
2315-
BIT_CLEAR(arbitratorDisabledMask,i);
2316-
}
2317-
}
2324+
nVotingNodes=MtmGetNumberOfVotingNodes();
23182325
if (cliqueSize >=nVotingNodes/2+1|| (cliqueSize== (nVotingNodes+1)/2&&MtmMajorNode)) {/* have quorum */
23192326
fprintf(stderr,"Old mask: ");
23202327
for (i=0;i<Mtm->nAllNodes;i++) {
@@ -2379,7 +2386,9 @@ void MtmRefreshClusterStatus()
23792386
*/
23802387
voidMtmCheckQuorum(void)
23812388
{
2382-
if (Mtm->nLiveNodes >=Mtm->nAllNodes/2+1|| (Mtm->nLiveNodes== (Mtm->nAllNodes+1)/2&&MtmMajorNode)) {/* have quorum */
2389+
intnVotingNodes=MtmGetNumberOfVotingNodes();
2390+
2391+
if (Mtm->nLiveNodes >=nVotingNodes/2+1|| (Mtm->nLiveNodes== (nVotingNodes+1)/2&&MtmMajorNode)) {/* have quorum */
23832392
if (Mtm->status==MTM_IN_MINORITY) {
23842393
MTM_LOG1("Node is in majority: disabled mask %llx",Mtm->disabledNodeMask);
23852394
MtmSwitchClusterMode(MTM_ONLINE);
@@ -2627,7 +2636,8 @@ static void MtmInitialize()
26272636
Mtm->disabledNodeMask=0;
26282637
Mtm->stalledNodeMask=0;
26292638
Mtm->stoppedNodeMask=0;
2630-
Mtm->arbitratorDisabledMask=0;
2639+
Mtm->deadNodeMask=0;
2640+
Mtm->recoveredNodeMask=0;
26312641
Mtm->pglogicalReceiverMask=0;
26322642
Mtm->pglogicalSenderMask=0;
26332643
Mtm->inducedLockNodeMask=0;
@@ -5458,6 +5468,14 @@ Datum mtm_check_deadlock(PG_FUNCTION_ARGS)
54585468

54595469
Datummtm_arbitrator_poll(PG_FUNCTION_ARGS)
54605470
{
5461-
Mtm->arbitratorDisabledMask=PG_GETARG_INT64(0);
5462-
PG_RETURN_INT64(Mtm->disabledNodeMask);
5471+
nodemask_trecoveredNodeMask;
5472+
5473+
MtmLock(LW_EXCLUSIVE);
5474+
recoveredNodeMask=Mtm->recoveredNodeMask;
5475+
Mtm->deadNodeMask=PG_GETARG_INT64(0);
5476+
Mtm->recoveredNodeMask &= ~Mtm->deadNodeMask;
5477+
MtmCheckQuorum();
5478+
MtmUnlock();
5479+
5480+
PG_RETURN_INT64(recoveredNodeMask);
54635481
}

‎multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,8 @@ typedef struct
278278
LWLockPadded*locks;/* multimaster lock tranche */
279279
TransactionIdoldestXid;/* XID of oldest transaction visible by any active transaction (local or global) */
280280
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes */
281-
nodemask_tarbitratorDisabledMask;/* Bitmask of node disabled by arbitrator */
281+
nodemask_tdeadNodeMask;/* Bitmask of nodes considered as dead by arbitrator */
282+
nodemask_trecoveredNodeMask;/* Bitmask of nodes recoverd after been reported as dead by arbitrator */
282283
nodemask_tstalledNodeMask;/* Bitmask of stalled nodes (node with dropped replication slot which makes it not possible automatic recovery of such node) */
283284
nodemask_tstoppedNodeMask;/* Bitmask of stopped (permanently disabled nodes) */
284285
nodemask_tpglogicalReceiverMask;/* bitmask of started pglogic receivers */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp