Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfa0ee84

Browse files
committed
reconnectMask maks modifications to state.c; disable MTM_RECOVERY_FINISH1 event
1 parent526109f commitfa0ee84

File tree

5 files changed

+109
-132
lines changed

5 files changed

+109
-132
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 17 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,7 +1146,7 @@ bool MtmWatchdog(timestamp_t now)
11461146
{
11471147
MTM_ELOG(WARNING,"Heartbeat is not received from node %d during %d msec",
11481148
i+1, (int)USEC_TO_MSEC(now-Mtm->nodes[i].lastHeartbeat));
1149-
MtmOnNodeDisconnect(i+1);
1149+
MtmStateProcessNeighborEvent(i+1,MTM_NEIGHBOR_HEARTBEAT_TIMEOUT);
11501150
allAlive= false;
11511151
}
11521152
}
@@ -1757,16 +1757,18 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot, nodemas
17571757
}else {
17581758
globalSnapshot=MtmTx.snapshot;
17591759
}
1760-
if (!TransactionIdIsValid(gtid->xid)&&Mtm->status!=MTM_RECOVERY)
1761-
{
1762-
/* In case of recovery InvalidTransactionId is passed */
1763-
MtmStateProcessEvent(MTM_RECOVERY_START1);
1764-
}
1765-
elseif (Mtm->status==MTM_RECOVERY)
1766-
{
1767-
/* When recovery is completed we get normal transaction ID and switch to normal mode */
1768-
MtmStateProcessEvent(MTM_RECOVERY_FINISH1);
1769-
}
1760+
1761+
1762+
// if (!TransactionIdIsValid(gtid->xid) && Mtm->status != MTM_RECOVERY)
1763+
// {
1764+
// /* In case of recovery InvalidTransactionId is passed */
1765+
// MtmStateProcessEvent(MTM_RECOVERY_START1);
1766+
// }
1767+
// else if (Mtm->status == MTM_RECOVERY)
1768+
// {
1769+
// /* When recovery is completed we get normal transaction ID and switch to normal mode */
1770+
// MtmStateProcessEvent(MTM_RECOVERY_FINISH1);
1771+
// }
17701772
}
17711773

17721774
voidMtmSetCurrentTransactionGID(charconst*gid)
@@ -2073,7 +2075,7 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
20732075
MtmLock(LW_EXCLUSIVE);
20742076
if (MtmIsRecoveredNode(nodeId)&&Mtm->nActiveTransactions==0) {
20752077
if (BIT_CHECK(Mtm->originLockNodeMask,nodeId-1)) {
2076-
MtmStateProcessNeighborEvent(nodeId-1,MTM_NEIGHBOR_RECOVERY_CAUGHTUP);
2078+
MtmStateProcessNeighborEvent(nodeId,MTM_NEIGHBOR_RECOVERY_CAUGHTUP);
20772079
}else {
20782080
MTM_LOG1("Node %d is caught-up at WAL position %llx without locking cluster",nodeId,walEndPtr);
20792081
/* We are lucky: caught-up without locking cluster! */
@@ -2274,7 +2276,6 @@ void MtmRefreshClusterStatus()
22742276
disabled= ~newClique& (((nodemask_t)1 <<Mtm->nAllNodes)-1)& ~Mtm->disabledNodeMask;/* new disabled nodes mask */
22752277

22762278
if (disabled) {
2277-
// timestamp_t now = MtmGetSystemTime();
22782279
for (i=0,mask=disabled;mask!=0;i++,mask >>=1) {
22792280
if (mask&1) {
22802281
if (i+1==MtmNodeId )
@@ -2295,58 +2296,6 @@ void MtmRefreshClusterStatus()
22952296
}
22962297
}
22972298

2298-
2299-
/*
2300-
* This function is called in case of non-recoverable connection failure with this node.
2301-
* Non-recoverable means that connections can not be reestablish using specified number of attempts.
2302-
* It sets bit in connectivity mask and register delayed refresh of cluster status which build connectivity matrix
2303-
* and determine clique of connected nodes. Timeout here is needed to allow all nodes to exchanges their connectivity masks (them
2304-
* are sent together with any arbiter message, including heartbeats.
2305-
*/
2306-
voidMtmOnNodeDisconnect(intnodeId)
2307-
{
2308-
timestamp_tnow=MtmGetSystemTime();
2309-
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
2310-
{
2311-
/* Node is already disabled */
2312-
return;
2313-
}
2314-
if (Mtm->nodes[nodeId-1].lastStatusChangeTime+MSEC_TO_USEC(MtmNodeDisableDelay)>now)
2315-
{
2316-
/* Avoid false detection of node failure and prevent node status blinking */
2317-
return;
2318-
}
2319-
MtmLock(LW_EXCLUSIVE);
2320-
BIT_SET(SELF_CONNECTIVITY_MASK,nodeId-1);
2321-
BIT_SET(Mtm->reconnectMask,nodeId-1);
2322-
MTM_ELOG(LOG,"Disconnect node %d connectivity mask %llx",
2323-
nodeId,SELF_CONNECTIVITY_MASK);
2324-
MtmUnlock();
2325-
}
2326-
2327-
/*
2328-
* This method is called when connection with node is reestablished
2329-
*/
2330-
voidMtmOnNodeConnect(intnodeId)
2331-
{
2332-
MtmLock(LW_EXCLUSIVE);
2333-
MTM_ELOG(LOG,"Connect node %d connectivity mask %llx",nodeId,SELF_CONNECTIVITY_MASK);
2334-
BIT_CLEAR(SELF_CONNECTIVITY_MASK,nodeId-1);
2335-
BIT_SET(Mtm->reconnectMask,nodeId-1);/* force sender to reestablish connection and send heartbeat */
2336-
MtmUnlock();
2337-
}
2338-
2339-
/*
2340-
* Set reconnect mask to force reconnection attempt to the node
2341-
*/
2342-
voidMtmReconnectNode(intnodeId)
2343-
{
2344-
MtmLock(LW_EXCLUSIVE);
2345-
MTM_ELOG(LOG,"Reconnect node %d connectivity mask %llx",nodeId,SELF_CONNECTIVITY_MASK);
2346-
BIT_SET(Mtm->reconnectMask,nodeId-1);
2347-
MtmUnlock();
2348-
}
2349-
23502299
/*
23512300
* -------------------------------------------
23522301
* Node initialization
@@ -3659,11 +3608,9 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
36593608
}else {
36603609
MTM_LOG1("Node %d start logical replication to node %d in normal mode",MtmNodeId,MtmReplicationNodeId);
36613610
}
3662-
if (!BIT_CHECK(Mtm->pglogicalSenderMask,MtmReplicationNodeId-1)) {
3663-
MTM_ELOG(LOG,"Start %d senders and %d receivers from %d cluster status %s",Mtm->nSenders+1,Mtm->nReceivers,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
3664-
BIT_SET(Mtm->pglogicalSenderMask,MtmReplicationNodeId-1);
3665-
MtmStateProcessEvent(MTM_WAL_SENDER_START);
3666-
}
3611+
3612+
MtmStateProcessNeighborEvent(MtmReplicationNodeId,MTM_NEIGHBOR_WAL_SENDER_START);
3613+
36673614
BIT_SET(Mtm->reconnectMask,MtmReplicationNodeId-1);/* arbiter should try to reestablish connection with this node */
36683615
MtmUnlock();
36693616
on_shmem_exit(MtmOnProcExit,0);

‎contrib/mmts/multimaster.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,11 +401,8 @@ extern void MtmLockNode(int nodeId, LWLockMode mode);
401401
externboolMtmTryLockNode(intnodeId,LWLockModemode);
402402
externvoidMtmUnlockNode(intnodeId);
403403
externvoidMtmStopNode(intnodeId,booldropSlot);
404-
externvoidMtmReconnectNode(intnodeId);
405404
externvoidMtmRecoverNode(intnodeId);
406405
externvoidMtmResumeNode(intnodeId);
407-
externvoidMtmOnNodeDisconnect(intnodeId);
408-
externvoidMtmOnNodeConnect(intnodeId);
409406
externvoidMtmWakeUpBackend(MtmTransState*ts);
410407
externvoidMtmSleep(timestamp_tinterval);
411408
externvoidMtmAbortTransaction(MtmTransState*ts);

‎contrib/mmts/pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ pglogical_receiver_main(Datum main_arg)
368368
PQclear(res);
369369
resetPQExpBuffer(query);
370370

371-
MtmStateProcessNeighborEvent(nodeId,MTM_NEIGHBOR_RECEIVER_START);
371+
MtmStateProcessNeighborEvent(nodeId,MTM_NEIGHBOR_WAL_RECEIVER_START);
372372

373373
while (!got_sigterm)
374374
{

‎contrib/mmts/state.c

Lines changed: 84 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
charconst*constMtmNeighborEventMnem[]=
77
{
88
"MTM_NEIGHBOR_CLIQUE_DISABLE",
9-
"MTM_NEIGHBOR_RECEIVER_START",
9+
"MTM_NEIGHBOR_WAL_RECEIVER_START",
10+
"MTM_NEIGHBOR_WAL_SENDER_START",
11+
"MTM_NEIGHBOR_HEARTBEAT_TIMEOUT",
1012
"MTM_NEIGHBOR_RECOVERY_CAUGHTUP"
1113
};
1214

@@ -23,9 +25,6 @@ char const* const MtmEventMnem[] =
2325
"MTM_RECOVERY_FINISH1",
2426
"MTM_RECOVERY_FINISH2",
2527

26-
"MTM_WAL_RECEIVER_START",
27-
"MTM_WAL_SENDER_START",
28-
2928
"MTM_NONRECOVERABLE_ERROR"
3029
};
3130

@@ -37,39 +36,61 @@ static void MtmSwitchClusterMode(MtmNodeStatus mode);
3736
void
3837
MtmStateProcessNeighborEvent(intnode_id,MtmNeighborEventev)
3938
{
40-
MTM_LOG1("[STATE] Processing %s",MtmNeighborEventMnem[ev]);
39+
MTM_LOG1("[STATE] Processing %s for node %u",MtmNeighborEventMnem[ev],node_id);
4140

4241
switch(ev)
4342
{
4443
caseMTM_NEIGHBOR_CLIQUE_DISABLE:
4544
MtmDisableNode(node_id);
4645
break;
4746

48-
caseMTM_NEIGHBOR_RECEIVER_START:
49-
/*
50-
* This functions is called by pglogical receiver main function when receiver background worker is started.
51-
* We switch to ONLINE mode when all receivers are connected.
52-
* As far as background worker can be restarted multiple times, use node bitmask.
53-
*/
47+
caseMTM_NEIGHBOR_HEARTBEAT_TIMEOUT:
48+
MtmLock(LW_EXCLUSIVE);
49+
BIT_SET(SELF_CONNECTIVITY_MASK,node_id-1);
50+
BIT_SET(Mtm->reconnectMask,node_id-1);
51+
MtmUnlock();
52+
break;
53+
54+
caseMTM_NEIGHBOR_WAL_RECEIVER_START:
5455
MtmLock(LW_EXCLUSIVE);
55-
if (!BIT_CHECK(Mtm->pglogicalReceiverMask,node_id)) {
56-
BIT_SET(Mtm->pglogicalReceiverMask,node_id);
57-
if (BIT_CHECK(Mtm->disabledNodeMask,node_id)) {
56+
if (!BIT_CHECK(Mtm->pglogicalReceiverMask,node_id-1)) {
57+
BIT_SET(Mtm->pglogicalReceiverMask,node_id-1);
58+
if (BIT_CHECK(Mtm->disabledNodeMask,node_id-1)) {
5859
MtmEnableNode(node_id);
5960
}
60-
MtmStateProcessEvent(MTM_WAL_RECEIVER_START);
61+
62+
if (++Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->nSenders==Mtm->nLiveNodes-1
63+
&& (Mtm->status==MTM_RECOVERED||Mtm->status==MTM_CONNECTED))
64+
{
65+
BIT_CLEAR(Mtm->originLockNodeMask,MtmNodeId-1);/* recovery is completed: release cluster lock */
66+
MtmSwitchClusterMode(MTM_ONLINE);
67+
}
6168
}
6269
MtmUnlock();
6370
break;
6471

72+
caseMTM_NEIGHBOR_WAL_SENDER_START:
73+
if (!BIT_CHECK(Mtm->pglogicalSenderMask,node_id-1)) {
74+
BIT_SET(Mtm->pglogicalSenderMask,node_id-1);
75+
if (++Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->nReceivers==Mtm->nLiveNodes-1
76+
&& (Mtm->status==MTM_RECOVERED||Mtm->status==MTM_CONNECTED))
77+
{
78+
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
79+
BIT_CLEAR(Mtm->originLockNodeMask,MtmNodeId-1);/* recovery is completed: release cluster lock */
80+
MtmSwitchClusterMode(MTM_ONLINE);
81+
}
82+
}
83+
break;
84+
6585
caseMTM_NEIGHBOR_RECOVERY_CAUGHTUP:
66-
Assert(BIT_CHECK(Mtm->disabledNodeMask,node_id));
67-
BIT_CLEAR(Mtm->originLockNodeMask,node_id);
68-
BIT_CLEAR(Mtm->disabledNodeMask,node_id);
69-
BIT_SET(Mtm->recoveredNodeMask,node_id);
86+
Assert(BIT_CHECK(Mtm->disabledNodeMask,node_id-1));
87+
BIT_CLEAR(Mtm->originLockNodeMask,node_id-1);
88+
BIT_CLEAR(Mtm->disabledNodeMask,node_id-1);
89+
BIT_SET(Mtm->recoveredNodeMask,node_id-1);
7090
Mtm->nLiveNodes+=1;
7191
MtmCheckQuorum();
7292
break;
93+
7394
}
7495
}
7596

@@ -114,7 +135,7 @@ MtmStateProcessEvent(MtmEvent ev)
114135
if (Mtm->nLiveNodes<Mtm->nAllNodes/2+1)
115136
{
116137
/* no quorum */
117-
MTM_ELOG(WARNING,"Node is out of quorum: only %d nodes of %d are accessible",Mtm->nLiveNodes,Mtm->nAllNodes);
138+
//MTM_ELOG(WARNING, "Node is out of quorum: only %d nodes of %d are accessible", Mtm->nLiveNodes, Mtm->nAllNodes);
118139
MtmSwitchClusterMode(MTM_IN_MINORITY);
119140
}
120141
elseif (Mtm->status==MTM_INITIALIZATION)
@@ -185,28 +206,6 @@ MtmStateProcessEvent(MtmEvent ev)
185206
break;
186207

187208

188-
caseMTM_WAL_RECEIVER_START:
189-
MTM_ELOG(LOG,"[STATE] Start %d receivers and %d senders from %d cluster status %s",Mtm->nReceivers+1,Mtm->nSenders,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
190-
if (++Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->nSenders==Mtm->nLiveNodes-1
191-
&& (Mtm->status==MTM_RECOVERED||Mtm->status==MTM_CONNECTED))
192-
{
193-
BIT_CLEAR(Mtm->originLockNodeMask,MtmNodeId-1);/* recovery is completed: release cluster lock */
194-
MtmSwitchClusterMode(MTM_ONLINE);
195-
}
196-
break;
197-
198-
199-
caseMTM_WAL_SENDER_START:
200-
if (++Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->nReceivers==Mtm->nLiveNodes-1
201-
&& (Mtm->status==MTM_RECOVERED||Mtm->status==MTM_CONNECTED))
202-
{
203-
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
204-
BIT_CLEAR(Mtm->originLockNodeMask,MtmNodeId-1);/* recovery is completed: release cluster lock */
205-
MtmSwitchClusterMode(MTM_ONLINE);
206-
}
207-
break;
208-
209-
210209
caseMTM_NONRECOVERABLE_ERROR:
211210
// MTM_ELOG(WARNING, "Node is excluded from cluster because of non-recoverable error %d, %s, pid=%u",
212211
// edata->sqlerrcode, edata->message, getpid());
@@ -234,13 +233,10 @@ MtmSwitchClusterMode(MtmNodeStatus mode)
234233
*/
235234
voidMtmDisableNode(intnodeId)
236235
{
237-
timestamp_tnow=MtmGetSystemTime();
238-
MTM_ELOG(WARNING,"Disable node %d at xlog position %llx, last status change time %d msec ago",nodeId, (long64)GetXLogInsertRecPtr(),
239-
(int)USEC_TO_MSEC(now-Mtm->nodes[nodeId-1].lastStatusChangeTime));
240236
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
241237
Mtm->nConfigChanges+=1;
242238
Mtm->nodes[nodeId-1].timeline+=1;
243-
Mtm->nodes[nodeId-1].lastStatusChangeTime=now;
239+
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
244240
Mtm->nodes[nodeId-1].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
245241
if (nodeId!=MtmNodeId) {
246242
Mtm->nLiveNodes-=1;
@@ -269,7 +265,6 @@ void MtmEnableNode(int nodeId)
269265
if (nodeId!=MtmNodeId) {
270266
Mtm->nLiveNodes+=1;
271267
}
272-
// MTM_ELOG(WARNING, "Enable node %d at xlog position %llx", nodeId, (long64)GetXLogInsertRecPtr());
273268
}
274269
MtmCheckQuorum();
275270
}
@@ -282,15 +277,49 @@ static void MtmCheckQuorum(void)
282277
{
283278
intnVotingNodes=MtmGetNumberOfVotingNodes();
284279

285-
if (Mtm->nLiveNodes >=nVotingNodes/2+1|| (Mtm->nLiveNodes== (nVotingNodes+1)/2&&MtmMajorNode)) {/* have quorum */
286-
if (Mtm->status==MTM_IN_MINORITY){
287-
// MTM_LOG1("Node is in majority: disabled mask %llx",Mtm->disabledNodeMask);
280+
if (Mtm->nLiveNodes >=nVotingNodes/2+1|| (Mtm->nLiveNodes== (nVotingNodes+1)/2&&MtmMajorNode))
281+
{
282+
if (Mtm->status==MTM_IN_MINORITY)
288283
MtmSwitchClusterMode(MTM_ONLINE);
289-
}
290-
}else {
291-
if (Mtm->status==MTM_ONLINE) {/* out of quorum */
292-
// MTM_ELOG(WARNING, "Node is in minority: disabled mask %llx",Mtm->disabledNodeMask);
284+
}
285+
else
286+
{
287+
if (Mtm->status==MTM_ONLINE)
293288
MtmSwitchClusterMode(MTM_IN_MINORITY);
294-
}
295289
}
296290
}
291+
292+
voidMtmOnNodeDisconnect(intnodeId)
293+
{
294+
if (BIT_CHECK(SELF_CONNECTIVITY_MASK,nodeId-1))
295+
return;
296+
297+
MTM_LOG1("[STATE] NodeDisconnect for node %u",nodeId);
298+
299+
MtmLock(LW_EXCLUSIVE);
300+
BIT_SET(SELF_CONNECTIVITY_MASK,nodeId-1);
301+
BIT_SET(Mtm->reconnectMask,nodeId-1);
302+
MtmUnlock();
303+
}
304+
305+
voidMtmOnNodeConnect(intnodeId)
306+
{
307+
if (!BIT_CHECK(SELF_CONNECTIVITY_MASK,nodeId-1))
308+
return;
309+
310+
MTM_LOG1("[STATE] NodeConnect for node %u",nodeId);
311+
312+
MtmLock(LW_EXCLUSIVE);
313+
BIT_CLEAR(SELF_CONNECTIVITY_MASK,nodeId-1);
314+
BIT_SET(Mtm->reconnectMask,nodeId-1);
315+
MtmUnlock();
316+
}
317+
318+
voidMtmReconnectNode(intnodeId)
319+
{
320+
MTM_LOG1("[STATE] ReconnectNode for node %u",nodeId);
321+
MtmLock(LW_EXCLUSIVE);
322+
BIT_SET(Mtm->reconnectMask,nodeId-1);
323+
MtmUnlock();
324+
}
325+

‎contrib/mmts/state.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
typedefenum
33
{
44
MTM_NEIGHBOR_CLIQUE_DISABLE,
5-
MTM_NEIGHBOR_RECEIVER_START,
5+
MTM_NEIGHBOR_WAL_RECEIVER_START,
6+
MTM_NEIGHBOR_WAL_SENDER_START,
7+
MTM_NEIGHBOR_HEARTBEAT_TIMEOUT,
68
MTM_NEIGHBOR_RECOVERY_CAUGHTUP
79
}MtmNeighborEvent;
810

@@ -16,8 +18,6 @@ typedef enum
1618
MTM_RECOVERY_START2,
1719
MTM_RECOVERY_FINISH1,
1820
MTM_RECOVERY_FINISH2,
19-
MTM_WAL_RECEIVER_START,
20-
MTM_WAL_SENDER_START,
2121
MTM_NONRECOVERABLE_ERROR
2222
}MtmEvent;
2323

@@ -26,3 +26,7 @@ extern void MtmStateProcessEvent(MtmEvent ev);
2626
externvoidMtmDisableNode(intnodeId);
2727
externvoidMtmEnableNode(intnodeId);
2828

29+
externvoidMtmOnNodeDisconnect(intnodeId);
30+
externvoidMtmOnNodeConnect(intnodeId);
31+
externvoidMtmReconnectNode(intnodeId);
32+

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp