Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6b70fbe

Browse files
knizhnikkelvich
authored andcommitted
Handle node disconnect
1 parent69d8dd9 commit6b70fbe

File tree

3 files changed

+52
-11
lines changed

3 files changed

+52
-11
lines changed

‎multimaster.c

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,7 @@ MtmCheckClusterLock()
10471047
Mtm->nNodes+=Mtm->nLockers;
10481048
Mtm->nLockers=0;
10491049
Mtm->nodeLockerMask=0;
1050+
MtmCheckQuorum();
10501051
}
10511052
}
10521053
break;
@@ -1056,14 +1057,17 @@ MtmCheckClusterLock()
10561057
/**
10571058
* Build internode connectivity mask. 1 - means that node is disconnected.
10581059
*/
1059-
staticvoid
1060+
staticbool
10601061
MtmBuildConnectivityMatrix(nodemask_t*matrix,boolnowait)
10611062
{
10621063
inti,j,n=MtmNodes;
10631064
for (i=0;i<n;i++) {
10641065
if (i+1!=MtmNodeId) {
10651066
void*data=PaxosGet(psprintf("node-mask-%d",i+1),NULL,NULL,nowait);
1066-
matrix[i]=data ?*(nodemask_t*)data :0;
1067+
if (data==NULL) {
1068+
return false;
1069+
}
1070+
matrix[i]=*(nodemask_t*)data;
10671071
}else {
10681072
matrix[i]=Mtm->connectivityMask;
10691073
}
@@ -1074,21 +1078,25 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
10741078
matrix[i] |= ((matrix[j] >>i)&1) <<j;
10751079
}
10761080
}
1081+
return true;
10771082
}
10781083

10791084

10801085
/**
10811086
* Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
10821087
* This function returns false if current node is excluded from cluster, true otherwise
10831088
*/
1084-
voidMtmRefreshClusterStatus(boolnowait)
1089+
boolMtmRefreshClusterStatus(boolnowait)
10851090
{
10861091
nodemask_tmask,clique;
10871092
nodemask_tmatrix[MAX_NODES];
10881093
intclique_size;
10891094
inti;
10901095

1091-
MtmBuildConnectivityMatrix(matrix,nowait);
1096+
if (!MtmBuildConnectivityMatrix(matrix,nowait)) {
1097+
/* RAFT is not available */
1098+
return false;
1099+
}
10921100

10931101
clique=MtmFindMaxClique(matrix,MtmNodes,&clique_size);
10941102
if (clique_size >=MtmNodes/2+1) {/* have quorum */
@@ -1108,6 +1116,7 @@ void MtmRefreshClusterStatus(bool nowait)
11081116
BIT_CLEAR(Mtm->disabledNodeMask,i);
11091117
}
11101118
}
1119+
MtmCheckQuorum();
11111120
MtmUnlock();
11121121
if (BIT_CHECK(Mtm->disabledNodeMask,MtmNodeId-1)) {
11131122
if (Mtm->status==MTM_ONLINE) {
@@ -1120,9 +1129,27 @@ void MtmRefreshClusterStatus(bool nowait)
11201129
}
11211130
}else {
11221131
elog(WARNING,"Clique %lx has no quorum",clique);
1132+
Mtm->status=MTM_IN_MINORITY;
11231133
}
1134+
return true;
11241135
}
11251136

1137+
voidMtmCheckQuorum(void)
1138+
{
1139+
if (Mtm->nNodes<MtmNodes/2+1) {
1140+
if (Mtm->status==MTM_ONLINE) {/* out of quorum */
1141+
elog(WARNING,"Node is in minority: disabled mask %lx",Mtm->disabledNodeMask);
1142+
Mtm->status=MTM_IN_MINORITY;
1143+
}
1144+
}else {
1145+
if (Mtm->status==MTM_IN_MINORITY) {
1146+
elog(WARNING,"Node is in majority: dissbled mask %lx",Mtm->disabledNodeMask);
1147+
Mtm->status=MTM_ONLINE;
1148+
}
1149+
}
1150+
}
1151+
1152+
11261153
voidMtmOnNodeDisconnect(intnodeId)
11271154
{
11281155
BIT_SET(Mtm->connectivityMask,nodeId-1);
@@ -1131,7 +1158,15 @@ void MtmOnNodeDisconnect(int nodeId)
11311158
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
11321159
MtmSleep(MtmKeepaliveTimeout);
11331160

1134-
MtmRefreshClusterStatus(false);
1161+
if (!MtmRefreshClusterStatus(false)) {
1162+
MtmLock(LW_EXCLUSIVE);
1163+
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
1164+
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
1165+
Mtm->nNodes-=1;
1166+
MtmCheckQuorum();
1167+
}
1168+
MtmUnlock();
1169+
}
11351170
}
11361171

11371172
voidMtmOnNodeConnect(intnodeId)
@@ -1633,6 +1668,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
16331668
}
16341669
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
16351670
Mtm->nNodes-=1;
1671+
MtmCheckQuorum();
16361672
if (!MtmIsBroadcast())
16371673
{
16381674
MtmBroadcastUtilityStmt(psprintf("select mtm.drop_node(%d,%s)",nodeId,dropSlot ?"true" :"false"), true);
@@ -1647,6 +1683,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
16471683
staticvoid
16481684
MtmReplicationShutdownHook(structPGLogicalShutdownHookArgs*args)
16491685
{
1686+
elog(WARNING,"Logical replication to node %d is stopped",MtmReplicationNodeId);
16501687
MtmOnNodeDisconnect(MtmReplicationNodeId);
16511688
}
16521689

‎multimaster.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
#include"pglogical_output/hooks.h"
99

1010
#defineMTM_TUPLE_TRACE(fmt, ...)
11-
/*
11+
#if0
1212
#defineMTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1313
#defineMTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
14-
*/
14+
#else
1515
#defineMTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1616
#defineMTM_TRACE(fmt, ...)
17-
/* */
17+
#endif
1818

1919
#defineMULTIMASTER_NAME "multimaster"
2020
#defineMULTIMASTER_SCHEMA_NAME "mtm"
@@ -72,7 +72,8 @@ typedef enum
7272
MTM_OFFLINE,/* Node is out of quorum */
7373
MTM_CONNECTED,/* Arbiter is established connections with other nodes */
7474
MTM_ONLINE,/* Ready to receive client's queries */
75-
MTM_RECOVERY/* Node is in recovery process */
75+
MTM_RECOVERY,/* Node is in recovery process */
76+
MTM_IN_MINORITY/* Node is out of quorum */
7677
}MtmNodeStatus;
7778

7879
typedefenum
@@ -193,8 +194,10 @@ extern TransactionId MtmGetCurrentTransactionId(void);
193194
externXidStatusMtmGetCurrentTransactionStatus(void);
194195
externXidStatusMtmGetGlobalTransactionStatus(charconst*gid);
195196
externboolMtmIsRecoveredNode(intnodeId);
196-
externvoidMtmRefreshClusterStatus(boolnowait);
197+
externboolMtmRefreshClusterStatus(boolnowait);
197198
externvoidMtmSwitchClusterMode(MtmNodeStatusmode);
198199
externvoidMtmUpdateNodeConnectionInfo(MtmConnectionInfo*conn,charconst*connStr);
199200
externvoidMtmSetupReplicationHooks(structPGLogicalHooks*hooks);
201+
externvoidMtmCheckQuorum(void);
202+
200203
#endif

‎pglogical_receiver.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,9 @@ pglogical_receiver_main(Datum main_arg)
240240
if (PQstatus(conn)!=CONNECTION_OK)
241241
{
242242
PQfinish(conn);
243-
ereport(ERROR, (errmsg("%s: Could not establish connection to remote server",
243+
ereport(WARNING, (errmsg("%s: Could not establish connection to remote server",
244244
worker_proc)));
245+
MtmOnNodeDisconnect(args->remote_node);
245246
proc_exit(1);
246247
}
247248

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp