Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9eb2b3d

Browse files
knizhnikkelvich
authored andcommitted
Perform select with timeout in arbiter
1 parent4702e5c commit9eb2b3d

File tree

3 files changed

+55
-29
lines changed

3 files changed

+55
-29
lines changed

‎arbiter.c

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
297297

298298
/* Some node considered that I am dead, so switch to recovery mode */
299299
if (BIT_CHECK(msg.disabledNodeMask,MtmNodeId-1)) {
300-
elog(WARNING,"Node is switched to recovery mode");
301-
ds->status=MTM_RECOVERY;
300+
MtmClusterSwitchMode(MTM_RECOVERY);
302301
}
303302
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
304303
ds->disabledNodeMask |=msg.disabledNodeMask;
@@ -345,8 +344,7 @@ static void MtmOpenConnections()
345344
elog(WARNING,"Node is out of quorum: only %d nodes from %d are accssible",ds->nNodes,MtmNodes);
346345
ds->status=MTM_OFFLINE;
347346
}elseif (ds->status==MTM_INITIALIZATION) {
348-
elog(WARNING,"Switch to CONNECTED mode");
349-
ds->status=MTM_CONNECTED;
347+
MtmClusterSwitchMode(MTM_CONNECTED);
350348
}
351349
}
352350

@@ -403,9 +401,9 @@ static void MtmAcceptOneConnection()
403401
}else {
404402
elog(NOTICE,"Arbiter established connection with node %d",msg.node);
405403
BIT_CLEAR(ds->connectivityMask,msg.node-1);
406-
MtmOnNodeConnect(msg.node);
407404
MtmRegisterSocket(fd,msg.node-1);
408405
sockets[msg.node-1]=fd;
406+
MtmOnNodeConnect(msg.node);
409407
}
410408
}
411409
}
@@ -586,7 +584,7 @@ static void MtmTransReceiver(Datum arg)
586584

587585
while (true) {
588586
#ifUSE_EPOLL
589-
n=epoll_wait(epollfd,events,nNodes,-1);
587+
n=epoll_wait(epollfd,events,nNodes,MtmKeepaliveTimeout/1000);
590588
if (n<0) {
591589
elog(ERROR,"Arbiter failed to poll sockets: %d",errno);
592590
}
@@ -600,9 +598,12 @@ static void MtmTransReceiver(Datum arg)
600598
#else
601599
fd_setevents;
602600
do {
601+
structtimevaltv;
603602
events=inset;
604-
rc=select(max_fd+1,&events,NULL,NULL,NULL);
605-
}while (rc<0&&MtmRecovery());
603+
tv.tv_sec=MtmKeepAliveTimeout/USEC;
604+
tv.tv_usec=MtmKeepAliveTimeout%USEC;
605+
n=select(max_fd+1,&events,NULL,NULL,&tv);
606+
}while (n<0&&MtmRecovery());
606607

607608
if (rc<0) {
608609
elog(ERROR,"Arbiter failed to select sockets: %d",errno);
@@ -742,6 +743,10 @@ static void MtmTransReceiver(Datum arg)
742743
}
743744
}
744745
}
746+
if (n==0&&ds->disabledNodeMask!=0) {
747+
/* If timeout is expired and there are didabled nodes, then recheck cluster's state */
748+
MtmUpdateClusterStatus();
749+
}
745750
}
746751
}
747752

‎multimaster.c

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ typedef enum
7777

7878
#defineMTM_SHMEM_SIZE (64*1024*1024)
7979
#defineMTM_HASH_SIZE 100003
80-
#defineUSEC 1000000
8180
#defineMIN_WAIT_TIMEOUT 1000
8281
#defineMAX_WAIT_TIMEOUT 100000
8382
#defineSTATUS_POLL_DELAY USEC
@@ -132,6 +131,15 @@ static TransactionManager MtmTM = {
132131
MtmGetName
133132
};
134133

134+
staticcharconst*constMtmNodeStatusMnem[]=
135+
{
136+
"Intialization",
137+
"Offline",
138+
"Connected",
139+
"Online",
140+
"Recovery"
141+
};
142+
135143
boolMtmDoReplication;
136144
char*MtmDatabaseName;
137145

@@ -509,8 +517,9 @@ MtmBeginTransaction(MtmCurrentTrans* x)
509517
x->isReplicated= false;
510518
x->isDistributed=MtmIsUserTransaction();
511519
if (x->isDistributed&&dtm->status!=MTM_ONLINE) {
512-
MtmUnlock();
513-
elog(ERROR,"Multimaster node is offline");
520+
/* reject all user's transactions at offline cluster */
521+
MtmUnlock();
522+
elog(ERROR,"Multimaster node is not online");
514523
}
515524
x->containsDML= false;
516525
x->isPrepared= false;
@@ -594,6 +603,9 @@ static void MtmPrecommitTransaction(MtmCurrentTrans* x)
594603

595604
if (dtm->disabledNodeMask!=0) {
596605
MtmUpdateClusterStatus();
606+
if (dtm->status!=MTM_ONLINE) {
607+
elog(ERROR,"Abort current transaction because this cluster node is not online");
608+
}
597609
}
598610

599611
MtmLock(LW_EXCLUSIVE);
@@ -1084,21 +1096,13 @@ _PG_fini(void)
10841096
*/
10851097

10861098

1087-
voidMtmSwitchToNormalMode()
1088-
{
1089-
dtm->status=MTM_ONLINE;
1090-
elog(WARNING,"Switch to normal mode");
1091-
/* ??? Something else to do here? */
1092-
}
1093-
1094-
voidMtmSwitchToRecoveryMode()
1099+
voidMtmClusterSwitchMode(MtmNodeStatusmode)
10951100
{
1096-
dtm->status=MTM_RECOVERY;
1101+
dtm->status=mode;
1102+
elog(WARNING,"Switch to %s mode",MtmNodeStatusMnem[mode]);
10971103
/* ??? Something else to do here? */
1098-
elog(ERROR,"Switch to normal mode");
10991104
}
11001105

1101-
11021106
voidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tglobalSnapshot)
11031107
{
11041108
csn_tlocalSnapshot;
@@ -1117,7 +1121,7 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
11171121
Assert(dtm->status==MTM_RECOVERY);
11181122
}elseif (dtm->status==MTM_RECOVERY) {
11191123
/* When recovery is completed we get normal transaction ID and switch to normal mode */
1120-
MtmSwitchToNormalMode();
1124+
MtmClusterSwitchMode(MTM_ONLINE);
11211125
}
11221126
dtmTx.gtid=*gtid;
11231127
dtmTx.xid=GetCurrentTransactionId();
@@ -1670,7 +1674,10 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix)
16701674
}
16711675
}
16721676

1673-
1677+
/**
1678+
* Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
1679+
* This function returns false if current node is excluded from cluster, true otherwise
1680+
*/
16741681
voidMtmUpdateClusterStatus(void)
16751682
{
16761683
nodemask_tmask,clique;
@@ -1683,17 +1690,29 @@ void MtmUpdateClusterStatus(void)
16831690
clique=MtmFindMaxClique(matrix,MtmNodes,&clique_size);
16841691
if (clique_size >=MtmNodes/2+1) {/* have quorum */
16851692
MtmLock(LW_EXCLUSIVE);
1686-
mask= ~clique& (((nodemask_t)1 <<MtmNodes)-1)& ~dtm->disabledNodeMask;
1693+
mask= ~clique& (((nodemask_t)1 <<MtmNodes)-1)& ~dtm->disabledNodeMask;/* new disabled nodes mask */
16871694
for (i=0;mask!=0;i++,mask >>=1) {
16881695
if (mask&1) {
16891696
dtm->nNodes-=1;
16901697
BIT_SET(dtm->disabledNodeMask,i);
16911698
}
16921699
}
1700+
mask=clique&dtm->disabledNodeMask;/* new enabled nodes mask */
1701+
for (i=0;mask!=0;i++,mask >>=1) {
1702+
if (mask&1) {
1703+
dtm->nNodes+=1;
1704+
BIT_CLEAR(dtm->disabledNodeMask,i);
1705+
}
1706+
}
16931707
MtmUnlock();
16941708
if (BIT_CHECK(dtm->disabledNodeMask,MtmNodeId-1)) {
1695-
/* I was excluded from cluster:( */
1696-
MtmSwitchToRecoveryMode();
1709+
if (dtm->status==MTM_ONLINE) {
1710+
/* I was excluded from cluster:( */
1711+
MtmClusterSwitchMode(MTM_OFFLINE);
1712+
}
1713+
}elseif (dtm->status==MTM_OFFLINE) {
1714+
/* Should we somehow restart logical receivers? */
1715+
MtmClusterSwitchMode(MTM_RECOVERY);
16971716
}
16981717
}else {
16991718
elog(WARNING,"Clique %lx has no quorum",clique);

‎multimaster.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#defineMULTIMASTER_MIN_PROTO_VERSION 1
2121
#defineMULTIMASTER_MAX_PROTO_VERSION 1
2222

23+
#defineUSEC 1000000
24+
2325
#defineNatts_mtm_ddl_log 2
2426
#defineAnum_mtm_ddl_log_issued1
2527
#defineAnum_mtm_ddl_log_query2
@@ -124,6 +126,7 @@ extern char* MtmDatabaseName;
124126
externintMtmConnectAttempts;
125127
externintMtmConnectTimeout;
126128
externintMtmReconnectAttempts;
129+
externintMtmKeepaliveTimeout;
127130

128131
externvoidMtmArbiterInitialize(void);
129132
externintMtmStartReceivers(char*nodes,intnodeId);
@@ -148,6 +151,5 @@ extern timestamp_t MtmGetCurrentTime(void);
148151
externvoidMtmSleep(timestamp_tinterval);
149152
externboolMtmIsRecoveredNode(intnodeId);
150153
externvoidMtmUpdateClusterStatus(void);
151-
externvoidMtmSwitchToNormalMode(void);
152-
externvoidMtmSwitchToRecoveryMode(void);
154+
externvoidMtmClusterSwitchMode(MtmNodeStatusmode);
153155
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp