Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3c1860a

Browse files
committed
Handle node disconnect
1 parent73b401f commit3c1860a

File tree

3 files changed

+52
-11
lines changed

3 files changed

+52
-11
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,7 @@ MtmCheckClusterLock()
10491049
Mtm->nNodes+=Mtm->nLockers;
10501050
Mtm->nLockers=0;
10511051
Mtm->nodeLockerMask=0;
1052+
MtmCheckQuorum();
10521053
}
10531054
}
10541055
break;
@@ -1058,14 +1059,17 @@ MtmCheckClusterLock()
10581059
/**
10591060
* Build internode connectivity mask. 1 - means that node is disconnected.
10601061
*/
1061-
staticvoid
1062+
staticbool
10621063
MtmBuildConnectivityMatrix(nodemask_t*matrix,boolnowait)
10631064
{
10641065
inti,j,n=MtmNodes;
10651066
for (i=0;i<n;i++) {
10661067
if (i+1!=MtmNodeId) {
10671068
void*data=PaxosGet(psprintf("node-mask-%d",i+1),NULL,NULL,nowait);
1068-
matrix[i]=data ?*(nodemask_t*)data :0;
1069+
if (data==NULL) {
1070+
return false;
1071+
}
1072+
matrix[i]=*(nodemask_t*)data;
10691073
}else {
10701074
matrix[i]=Mtm->connectivityMask;
10711075
}
@@ -1076,21 +1080,25 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
10761080
matrix[i] |= ((matrix[j] >>i)&1) <<j;
10771081
}
10781082
}
1083+
return true;
10791084
}
10801085

10811086

10821087
/**
10831088
* Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
10841089
* This function returns false if current node is excluded from cluster, true otherwise
10851090
*/
1086-
voidMtmRefreshClusterStatus(boolnowait)
1091+
boolMtmRefreshClusterStatus(boolnowait)
10871092
{
10881093
nodemask_tmask,clique;
10891094
nodemask_tmatrix[MAX_NODES];
10901095
intclique_size;
10911096
inti;
10921097

1093-
MtmBuildConnectivityMatrix(matrix,nowait);
1098+
if (!MtmBuildConnectivityMatrix(matrix,nowait)) {
1099+
/* RAFT is not available */
1100+
return false;
1101+
}
10941102

10951103
clique=MtmFindMaxClique(matrix,MtmNodes,&clique_size);
10961104
if (clique_size >=MtmNodes/2+1) {/* have quorum */
@@ -1110,6 +1118,7 @@ void MtmRefreshClusterStatus(bool nowait)
11101118
BIT_CLEAR(Mtm->disabledNodeMask,i);
11111119
}
11121120
}
1121+
MtmCheckQuorum();
11131122
MtmUnlock();
11141123
if (BIT_CHECK(Mtm->disabledNodeMask,MtmNodeId-1)) {
11151124
if (Mtm->status==MTM_ONLINE) {
@@ -1122,9 +1131,27 @@ void MtmRefreshClusterStatus(bool nowait)
11221131
}
11231132
}else {
11241133
elog(WARNING,"Clique %lx has no quorum",clique);
1134+
Mtm->status=MTM_IN_MINORITY;
11251135
}
1136+
return true;
11261137
}
11271138

1139+
voidMtmCheckQuorum(void)
1140+
{
1141+
if (Mtm->nNodes<MtmNodes/2+1) {
1142+
if (Mtm->status==MTM_ONLINE) {/* out of quorum */
1143+
elog(WARNING,"Node is in minority: disabled mask %lx",Mtm->disabledNodeMask);
1144+
Mtm->status=MTM_IN_MINORITY;
1145+
}
1146+
}else {
1147+
if (Mtm->status==MTM_IN_MINORITY) {
1148+
elog(WARNING,"Node is in majority: dissbled mask %lx",Mtm->disabledNodeMask);
1149+
Mtm->status=MTM_ONLINE;
1150+
}
1151+
}
1152+
}
1153+
1154+
11281155
voidMtmOnNodeDisconnect(intnodeId)
11291156
{
11301157
BIT_SET(Mtm->connectivityMask,nodeId-1);
@@ -1133,7 +1160,15 @@ void MtmOnNodeDisconnect(int nodeId)
11331160
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
11341161
MtmSleep(MtmKeepaliveTimeout);
11351162

1136-
MtmRefreshClusterStatus(false);
1163+
if (!MtmRefreshClusterStatus(false)) {
1164+
MtmLock(LW_EXCLUSIVE);
1165+
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
1166+
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
1167+
Mtm->nNodes-=1;
1168+
MtmCheckQuorum();
1169+
}
1170+
MtmUnlock();
1171+
}
11371172
}
11381173

11391174
voidMtmOnNodeConnect(intnodeId)
@@ -1635,6 +1670,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
16351670
}
16361671
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
16371672
Mtm->nNodes-=1;
1673+
MtmCheckQuorum();
16381674
if (!MtmIsBroadcast())
16391675
{
16401676
MtmBroadcastUtilityStmt(psprintf("select mtm.drop_node(%d,%s)",nodeId,dropSlot ?"true" :"false"), true);
@@ -1649,6 +1685,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
16491685
staticvoid
16501686
MtmReplicationShutdownHook(structPGLogicalShutdownHookArgs*args)
16511687
{
1688+
elog(WARNING,"Logical replication to node %d is stopped",MtmReplicationNodeId);
16521689
MtmOnNodeDisconnect(MtmReplicationNodeId);
16531690
}
16541691

‎contrib/mmts/multimaster.h‎

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
#include"pglogical_output/hooks.h"
99

1010
#defineMTM_TUPLE_TRACE(fmt, ...)
11-
/*
11+
#if0
1212
#defineMTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1313
#defineMTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
14-
*/
14+
#else
1515
#defineMTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1616
#defineMTM_TRACE(fmt, ...)
17-
/* */
17+
#endif
1818

1919
#defineMULTIMASTER_NAME "multimaster"
2020
#defineMULTIMASTER_SCHEMA_NAME "mtm"
@@ -72,7 +72,8 @@ typedef enum
7272
MTM_OFFLINE,/* Node is out of quorum */
7373
MTM_CONNECTED,/* Arbiter is established connections with other nodes */
7474
MTM_ONLINE,/* Ready to receive client's queries */
75-
MTM_RECOVERY/* Node is in recovery process */
75+
MTM_RECOVERY,/* Node is in recovery process */
76+
MTM_IN_MINORITY/* Node is out of quorum */
7677
}MtmNodeStatus;
7778

7879
typedefenum
@@ -193,8 +194,10 @@ extern TransactionId MtmGetCurrentTransactionId(void);
193194
externXidStatusMtmGetCurrentTransactionStatus(void);
194195
externXidStatusMtmGetGlobalTransactionStatus(charconst*gid);
195196
externboolMtmIsRecoveredNode(intnodeId);
196-
externvoidMtmRefreshClusterStatus(boolnowait);
197+
externboolMtmRefreshClusterStatus(boolnowait);
197198
externvoidMtmSwitchClusterMode(MtmNodeStatusmode);
198199
externvoidMtmUpdateNodeConnectionInfo(MtmConnectionInfo*conn,charconst*connStr);
199200
externvoidMtmSetupReplicationHooks(structPGLogicalHooks*hooks);
201+
externvoidMtmCheckQuorum(void);
202+
200203
#endif

‎contrib/mmts/pglogical_receiver.c‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,9 @@ pglogical_receiver_main(Datum main_arg)
240240
if (PQstatus(conn)!=CONNECTION_OK)
241241
{
242242
PQfinish(conn);
243-
ereport(ERROR, (errmsg("%s: Could not establish connection to remote server",
243+
ereport(WARNING, (errmsg("%s: Could not establish connection to remote server",
244244
worker_proc)));
245+
MtmOnNodeDisconnect(args->remote_node);
245246
proc_exit(1);
246247
}
247248

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp