Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit67de224

Browse files
committed
reconnectMask maks modifications to state.c; disable MTM_RECOVERY_FINISH1 event
1 parenta310215 commit67de224

File tree

5 files changed

+109
-132
lines changed

5 files changed

+109
-132
lines changed

‎multimaster.c

Lines changed: 17 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,7 +1147,7 @@ bool MtmWatchdog(timestamp_t now)
11471147
{
11481148
MTM_ELOG(WARNING,"Heartbeat is not received from node %d during %d msec",
11491149
i+1, (int)USEC_TO_MSEC(now-Mtm->nodes[i].lastHeartbeat));
1150-
MtmOnNodeDisconnect(i+1);
1150+
MtmStateProcessNeighborEvent(i+1,MTM_NEIGHBOR_HEARTBEAT_TIMEOUT);
11511151
allAlive= false;
11521152
}
11531153
}
@@ -1758,16 +1758,18 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot, nodemas
17581758
}else {
17591759
globalSnapshot=MtmTx.snapshot;
17601760
}
1761-
if (!TransactionIdIsValid(gtid->xid)&&Mtm->status!=MTM_RECOVERY)
1762-
{
1763-
/* In case of recovery InvalidTransactionId is passed */
1764-
MtmStateProcessEvent(MTM_RECOVERY_START1);
1765-
}
1766-
elseif (Mtm->status==MTM_RECOVERY)
1767-
{
1768-
/* When recovery is completed we get normal transaction ID and switch to normal mode */
1769-
MtmStateProcessEvent(MTM_RECOVERY_FINISH1);
1770-
}
1761+
1762+
1763+
// if (!TransactionIdIsValid(gtid->xid) && Mtm->status != MTM_RECOVERY)
1764+
// {
1765+
// /* In case of recovery InvalidTransactionId is passed */
1766+
// MtmStateProcessEvent(MTM_RECOVERY_START1);
1767+
// }
1768+
// else if (Mtm->status == MTM_RECOVERY)
1769+
// {
1770+
// /* When recovery is completed we get normal transaction ID and switch to normal mode */
1771+
// MtmStateProcessEvent(MTM_RECOVERY_FINISH1);
1772+
// }
17711773
}
17721774

17731775
voidMtmSetCurrentTransactionGID(charconst*gid)
@@ -2074,7 +2076,7 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
20742076
MtmLock(LW_EXCLUSIVE);
20752077
if (MtmIsRecoveredNode(nodeId)&&Mtm->nActiveTransactions==0) {
20762078
if (BIT_CHECK(Mtm->originLockNodeMask,nodeId-1)) {
2077-
MtmStateProcessNeighborEvent(nodeId-1,MTM_NEIGHBOR_RECOVERY_CAUGHTUP);
2079+
MtmStateProcessNeighborEvent(nodeId,MTM_NEIGHBOR_RECOVERY_CAUGHTUP);
20782080
}else {
20792081
MTM_LOG1("Node %d is caught-up at WAL position %llx without locking cluster",nodeId,walEndPtr);
20802082
/* We are lucky: caught-up without locking cluster! */
@@ -2275,7 +2277,6 @@ void MtmRefreshClusterStatus()
22752277
disabled= ~newClique& (((nodemask_t)1 <<Mtm->nAllNodes)-1)& ~Mtm->disabledNodeMask;/* new disabled nodes mask */
22762278

22772279
if (disabled) {
2278-
// timestamp_t now = MtmGetSystemTime();
22792280
for (i=0,mask=disabled;mask!=0;i++,mask >>=1) {
22802281
if (mask&1) {
22812282
if (i+1==MtmNodeId )
@@ -2296,58 +2297,6 @@ void MtmRefreshClusterStatus()
22962297
}
22972298
}
22982299

2299-
2300-
/*
2301-
* This function is called in case of non-recoverable connection failure with this node.
2302-
* Non-recoverable means that connections can not be reestablish using specified number of attempts.
2303-
* It sets bit in connectivity mask and register delayed refresh of cluster status which build connectivity matrix
2304-
* and determine clique of connected nodes. Timeout here is needed to allow all nodes to exchanges their connectivity masks (them
2305-
* are sent together with any arbiter message, including heartbeats.
2306-
*/
2307-
voidMtmOnNodeDisconnect(intnodeId)
2308-
{
2309-
timestamp_tnow=MtmGetSystemTime();
2310-
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
2311-
{
2312-
/* Node is already disabled */
2313-
return;
2314-
}
2315-
if (Mtm->nodes[nodeId-1].lastStatusChangeTime+MSEC_TO_USEC(MtmNodeDisableDelay)>now)
2316-
{
2317-
/* Avoid false detection of node failure and prevent node status blinking */
2318-
return;
2319-
}
2320-
MtmLock(LW_EXCLUSIVE);
2321-
BIT_SET(SELF_CONNECTIVITY_MASK,nodeId-1);
2322-
BIT_SET(Mtm->reconnectMask,nodeId-1);
2323-
MTM_ELOG(LOG,"Disconnect node %d connectivity mask %llx",
2324-
nodeId,SELF_CONNECTIVITY_MASK);
2325-
MtmUnlock();
2326-
}
2327-
2328-
/*
2329-
* This method is called when connection with node is reestablished
2330-
*/
2331-
voidMtmOnNodeConnect(intnodeId)
2332-
{
2333-
MtmLock(LW_EXCLUSIVE);
2334-
MTM_ELOG(LOG,"Connect node %d connectivity mask %llx",nodeId,SELF_CONNECTIVITY_MASK);
2335-
BIT_CLEAR(SELF_CONNECTIVITY_MASK,nodeId-1);
2336-
BIT_SET(Mtm->reconnectMask,nodeId-1);/* force sender to reestablish connection and send heartbeat */
2337-
MtmUnlock();
2338-
}
2339-
2340-
/*
2341-
* Set reconnect mask to force reconnection attempt to the node
2342-
*/
2343-
voidMtmReconnectNode(intnodeId)
2344-
{
2345-
MtmLock(LW_EXCLUSIVE);
2346-
MTM_ELOG(LOG,"Reconnect node %d connectivity mask %llx",nodeId,SELF_CONNECTIVITY_MASK);
2347-
BIT_SET(Mtm->reconnectMask,nodeId-1);
2348-
MtmUnlock();
2349-
}
2350-
23512300
/*
23522301
* -------------------------------------------
23532302
* Node initialization
@@ -3673,11 +3622,9 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
36733622
}else {
36743623
MTM_LOG1("Node %d start logical replication to node %d in normal mode",MtmNodeId,MtmReplicationNodeId);
36753624
}
3676-
if (!BIT_CHECK(Mtm->pglogicalSenderMask,MtmReplicationNodeId-1)) {
3677-
MTM_ELOG(LOG,"Start %d senders and %d receivers from %d cluster status %s",Mtm->nSenders+1,Mtm->nReceivers,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
3678-
BIT_SET(Mtm->pglogicalSenderMask,MtmReplicationNodeId-1);
3679-
MtmStateProcessEvent(MTM_WAL_SENDER_START);
3680-
}
3625+
3626+
MtmStateProcessNeighborEvent(MtmReplicationNodeId,MTM_NEIGHBOR_WAL_SENDER_START);
3627+
36813628
BIT_SET(Mtm->reconnectMask,MtmReplicationNodeId-1);/* arbiter should try to reestablish connection with this node */
36823629
MtmUnlock();
36833630
on_shmem_exit(MtmOnProcExit,0);

‎multimaster.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,11 +402,8 @@ extern void MtmLockNode(int nodeId, LWLockMode mode);
402402
externboolMtmTryLockNode(intnodeId,LWLockModemode);
403403
externvoidMtmUnlockNode(intnodeId);
404404
externvoidMtmStopNode(intnodeId,booldropSlot);
405-
externvoidMtmReconnectNode(intnodeId);
406405
externvoidMtmRecoverNode(intnodeId);
407406
externvoidMtmResumeNode(intnodeId);
408-
externvoidMtmOnNodeDisconnect(intnodeId);
409-
externvoidMtmOnNodeConnect(intnodeId);
410407
externvoidMtmWakeUpBackend(MtmTransState*ts);
411408
externvoidMtmSleep(timestamp_tinterval);
412409
externvoidMtmAbortTransaction(MtmTransState*ts);

‎pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ pglogical_receiver_main(Datum main_arg)
370370
PQclear(res);
371371
resetPQExpBuffer(query);
372372

373-
MtmStateProcessNeighborEvent(nodeId,MTM_NEIGHBOR_RECEIVER_START);
373+
MtmStateProcessNeighborEvent(nodeId,MTM_NEIGHBOR_WAL_RECEIVER_START);
374374

375375
while (!got_sigterm)
376376
{

‎state.c

Lines changed: 84 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
charconst*constMtmNeighborEventMnem[]=
77
{
88
"MTM_NEIGHBOR_CLIQUE_DISABLE",
9-
"MTM_NEIGHBOR_RECEIVER_START",
9+
"MTM_NEIGHBOR_WAL_RECEIVER_START",
10+
"MTM_NEIGHBOR_WAL_SENDER_START",
11+
"MTM_NEIGHBOR_HEARTBEAT_TIMEOUT",
1012
"MTM_NEIGHBOR_RECOVERY_CAUGHTUP"
1113
};
1214

@@ -23,9 +25,6 @@ char const* const MtmEventMnem[] =
2325
"MTM_RECOVERY_FINISH1",
2426
"MTM_RECOVERY_FINISH2",
2527

26-
"MTM_WAL_RECEIVER_START",
27-
"MTM_WAL_SENDER_START",
28-
2928
"MTM_NONRECOVERABLE_ERROR"
3029
};
3130

@@ -37,39 +36,61 @@ static void MtmSwitchClusterMode(MtmNodeStatus mode);
3736
void
3837
MtmStateProcessNeighborEvent(intnode_id,MtmNeighborEventev)
3938
{
40-
MTM_LOG1("[STATE] Processing %s",MtmNeighborEventMnem[ev]);
39+
MTM_LOG1("[STATE] Processing %s for node %u",MtmNeighborEventMnem[ev],node_id);
4140

4241
switch(ev)
4342
{
4443
caseMTM_NEIGHBOR_CLIQUE_DISABLE:
4544
MtmDisableNode(node_id);
4645
break;
4746

48-
caseMTM_NEIGHBOR_RECEIVER_START:
49-
/*
50-
* This functions is called by pglogical receiver main function when receiver background worker is started.
51-
* We switch to ONLINE mode when all receivers are connected.
52-
* As far as background worker can be restarted multiple times, use node bitmask.
53-
*/
47+
caseMTM_NEIGHBOR_HEARTBEAT_TIMEOUT:
48+
MtmLock(LW_EXCLUSIVE);
49+
BIT_SET(SELF_CONNECTIVITY_MASK,node_id-1);
50+
BIT_SET(Mtm->reconnectMask,node_id-1);
51+
MtmUnlock();
52+
break;
53+
54+
caseMTM_NEIGHBOR_WAL_RECEIVER_START:
5455
MtmLock(LW_EXCLUSIVE);
55-
if (!BIT_CHECK(Mtm->pglogicalReceiverMask,node_id)) {
56-
BIT_SET(Mtm->pglogicalReceiverMask,node_id);
57-
if (BIT_CHECK(Mtm->disabledNodeMask,node_id)) {
56+
if (!BIT_CHECK(Mtm->pglogicalReceiverMask,node_id-1)) {
57+
BIT_SET(Mtm->pglogicalReceiverMask,node_id-1);
58+
if (BIT_CHECK(Mtm->disabledNodeMask,node_id-1)) {
5859
MtmEnableNode(node_id);
5960
}
60-
MtmStateProcessEvent(MTM_WAL_RECEIVER_START);
61+
62+
if (++Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->nSenders==Mtm->nLiveNodes-1
63+
&& (Mtm->status==MTM_RECOVERED||Mtm->status==MTM_CONNECTED))
64+
{
65+
BIT_CLEAR(Mtm->originLockNodeMask,MtmNodeId-1);/* recovery is completed: release cluster lock */
66+
MtmSwitchClusterMode(MTM_ONLINE);
67+
}
6168
}
6269
MtmUnlock();
6370
break;
6471

72+
caseMTM_NEIGHBOR_WAL_SENDER_START:
73+
if (!BIT_CHECK(Mtm->pglogicalSenderMask,node_id-1)) {
74+
BIT_SET(Mtm->pglogicalSenderMask,node_id-1);
75+
if (++Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->nReceivers==Mtm->nLiveNodes-1
76+
&& (Mtm->status==MTM_RECOVERED||Mtm->status==MTM_CONNECTED))
77+
{
78+
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
79+
BIT_CLEAR(Mtm->originLockNodeMask,MtmNodeId-1);/* recovery is completed: release cluster lock */
80+
MtmSwitchClusterMode(MTM_ONLINE);
81+
}
82+
}
83+
break;
84+
6585
caseMTM_NEIGHBOR_RECOVERY_CAUGHTUP:
66-
Assert(BIT_CHECK(Mtm->disabledNodeMask,node_id));
67-
BIT_CLEAR(Mtm->originLockNodeMask,node_id);
68-
BIT_CLEAR(Mtm->disabledNodeMask,node_id);
69-
BIT_SET(Mtm->recoveredNodeMask,node_id);
86+
Assert(BIT_CHECK(Mtm->disabledNodeMask,node_id-1));
87+
BIT_CLEAR(Mtm->originLockNodeMask,node_id-1);
88+
BIT_CLEAR(Mtm->disabledNodeMask,node_id-1);
89+
BIT_SET(Mtm->recoveredNodeMask,node_id-1);
7090
Mtm->nLiveNodes+=1;
7191
MtmCheckQuorum();
7292
break;
93+
7394
}
7495
}
7596

@@ -114,7 +135,7 @@ MtmStateProcessEvent(MtmEvent ev)
114135
if (Mtm->nLiveNodes<Mtm->nAllNodes/2+1)
115136
{
116137
/* no quorum */
117-
MTM_ELOG(WARNING,"Node is out of quorum: only %d nodes of %d are accessible",Mtm->nLiveNodes,Mtm->nAllNodes);
138+
//MTM_ELOG(WARNING, "Node is out of quorum: only %d nodes of %d are accessible", Mtm->nLiveNodes, Mtm->nAllNodes);
118139
MtmSwitchClusterMode(MTM_IN_MINORITY);
119140
}
120141
elseif (Mtm->status==MTM_INITIALIZATION)
@@ -185,28 +206,6 @@ MtmStateProcessEvent(MtmEvent ev)
185206
break;
186207

187208

188-
caseMTM_WAL_RECEIVER_START:
189-
MTM_ELOG(LOG,"[STATE] Start %d receivers and %d senders from %d cluster status %s",Mtm->nReceivers+1,Mtm->nSenders,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
190-
if (++Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->nSenders==Mtm->nLiveNodes-1
191-
&& (Mtm->status==MTM_RECOVERED||Mtm->status==MTM_CONNECTED))
192-
{
193-
BIT_CLEAR(Mtm->originLockNodeMask,MtmNodeId-1);/* recovery is completed: release cluster lock */
194-
MtmSwitchClusterMode(MTM_ONLINE);
195-
}
196-
break;
197-
198-
199-
caseMTM_WAL_SENDER_START:
200-
if (++Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->nReceivers==Mtm->nLiveNodes-1
201-
&& (Mtm->status==MTM_RECOVERED||Mtm->status==MTM_CONNECTED))
202-
{
203-
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
204-
BIT_CLEAR(Mtm->originLockNodeMask,MtmNodeId-1);/* recovery is completed: release cluster lock */
205-
MtmSwitchClusterMode(MTM_ONLINE);
206-
}
207-
break;
208-
209-
210209
caseMTM_NONRECOVERABLE_ERROR:
211210
// MTM_ELOG(WARNING, "Node is excluded from cluster because of non-recoverable error %d, %s, pid=%u",
212211
// edata->sqlerrcode, edata->message, getpid());
@@ -234,13 +233,10 @@ MtmSwitchClusterMode(MtmNodeStatus mode)
234233
*/
235234
voidMtmDisableNode(intnodeId)
236235
{
237-
timestamp_tnow=MtmGetSystemTime();
238-
MTM_ELOG(WARNING,"Disable node %d at xlog position %llx, last status change time %d msec ago",nodeId, (long64)GetXLogInsertRecPtr(),
239-
(int)USEC_TO_MSEC(now-Mtm->nodes[nodeId-1].lastStatusChangeTime));
240236
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
241237
Mtm->nConfigChanges+=1;
242238
Mtm->nodes[nodeId-1].timeline+=1;
243-
Mtm->nodes[nodeId-1].lastStatusChangeTime=now;
239+
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
244240
Mtm->nodes[nodeId-1].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
245241
if (nodeId!=MtmNodeId) {
246242
Mtm->nLiveNodes-=1;
@@ -269,7 +265,6 @@ void MtmEnableNode(int nodeId)
269265
if (nodeId!=MtmNodeId) {
270266
Mtm->nLiveNodes+=1;
271267
}
272-
// MTM_ELOG(WARNING, "Enable node %d at xlog position %llx", nodeId, (long64)GetXLogInsertRecPtr());
273268
}
274269
MtmCheckQuorum();
275270
}
@@ -282,15 +277,49 @@ static void MtmCheckQuorum(void)
282277
{
283278
intnVotingNodes=MtmGetNumberOfVotingNodes();
284279

285-
if (Mtm->nLiveNodes >=nVotingNodes/2+1|| (Mtm->nLiveNodes== (nVotingNodes+1)/2&&MtmMajorNode)) {/* have quorum */
286-
if (Mtm->status==MTM_IN_MINORITY){
287-
// MTM_LOG1("Node is in majority: disabled mask %llx",Mtm->disabledNodeMask);
280+
if (Mtm->nLiveNodes >=nVotingNodes/2+1|| (Mtm->nLiveNodes== (nVotingNodes+1)/2&&MtmMajorNode))
281+
{
282+
if (Mtm->status==MTM_IN_MINORITY)
288283
MtmSwitchClusterMode(MTM_ONLINE);
289-
}
290-
}else {
291-
if (Mtm->status==MTM_ONLINE) {/* out of quorum */
292-
// MTM_ELOG(WARNING, "Node is in minority: disabled mask %llx",Mtm->disabledNodeMask);
284+
}
285+
else
286+
{
287+
if (Mtm->status==MTM_ONLINE)
293288
MtmSwitchClusterMode(MTM_IN_MINORITY);
294-
}
295289
}
296290
}
291+
292+
voidMtmOnNodeDisconnect(intnodeId)
293+
{
294+
if (BIT_CHECK(SELF_CONNECTIVITY_MASK,nodeId-1))
295+
return;
296+
297+
MTM_LOG1("[STATE] NodeDisconnect for node %u",nodeId);
298+
299+
MtmLock(LW_EXCLUSIVE);
300+
BIT_SET(SELF_CONNECTIVITY_MASK,nodeId-1);
301+
BIT_SET(Mtm->reconnectMask,nodeId-1);
302+
MtmUnlock();
303+
}
304+
305+
voidMtmOnNodeConnect(intnodeId)
306+
{
307+
if (!BIT_CHECK(SELF_CONNECTIVITY_MASK,nodeId-1))
308+
return;
309+
310+
MTM_LOG1("[STATE] NodeConnect for node %u",nodeId);
311+
312+
MtmLock(LW_EXCLUSIVE);
313+
BIT_CLEAR(SELF_CONNECTIVITY_MASK,nodeId-1);
314+
BIT_SET(Mtm->reconnectMask,nodeId-1);
315+
MtmUnlock();
316+
}
317+
318+
voidMtmReconnectNode(intnodeId)
319+
{
320+
MTM_LOG1("[STATE] ReconnectNode for node %u",nodeId);
321+
MtmLock(LW_EXCLUSIVE);
322+
BIT_SET(Mtm->reconnectMask,nodeId-1);
323+
MtmUnlock();
324+
}
325+

‎state.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
typedefenum
33
{
44
MTM_NEIGHBOR_CLIQUE_DISABLE,
5-
MTM_NEIGHBOR_RECEIVER_START,
5+
MTM_NEIGHBOR_WAL_RECEIVER_START,
6+
MTM_NEIGHBOR_WAL_SENDER_START,
7+
MTM_NEIGHBOR_HEARTBEAT_TIMEOUT,
68
MTM_NEIGHBOR_RECOVERY_CAUGHTUP
79
}MtmNeighborEvent;
810

@@ -16,8 +18,6 @@ typedef enum
1618
MTM_RECOVERY_START2,
1719
MTM_RECOVERY_FINISH1,
1820
MTM_RECOVERY_FINISH2,
19-
MTM_WAL_RECEIVER_START,
20-
MTM_WAL_SENDER_START,
2121
MTM_NONRECOVERABLE_ERROR
2222
}MtmEvent;
2323

@@ -26,3 +26,7 @@ extern void MtmStateProcessEvent(MtmEvent ev);
2626
externvoidMtmDisableNode(intnodeId);
2727
externvoidMtmEnableNode(intnodeId);
2828

29+
externvoidMtmOnNodeDisconnect(intnodeId);
30+
externvoidMtmOnNodeConnect(intnodeId);
31+
externvoidMtmReconnectNode(intnodeId);
32+

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp