Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf947553

Browse files
knizhnikkelvich
authored andcommitted
Handle heartbeat timesouts in arbiter receiver
1 parent54cbf4c commitf947553

File tree

3 files changed

+26
-24
lines changed

3 files changed

+26
-24
lines changed

‎arbiter.c

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,8 @@ static void MtmTransReceiver(Datum arg)
675675
intnResponses;
676676
inti,j,n,rc;
677677
MtmBuffer*rxBuffer= (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
678+
timestamp_tlastHeartbeatCheck=MtmGetSystemTime();
679+
timestamp_tnow;
678680

679681
#ifUSE_EPOLL
680682
structepoll_event*events= (structepoll_event*)palloc(sizeof(structepoll_event)*nNodes);
@@ -698,7 +700,7 @@ static void MtmTransReceiver(Datum arg)
698700

699701
while (!stop) {
700702
#ifUSE_EPOLL
701-
n=epoll_wait(epollfd,events,nNodes,USEC_TO_MSEC(MtmKeepaliveTimeout));
703+
n=epoll_wait(epollfd,events,nNodes,MtmHeartbeatRecvTimeout);
702704
if (n<0) {
703705
if (errno==EINTR) {
704706
continue;
@@ -717,8 +719,8 @@ static void MtmTransReceiver(Datum arg)
717719
do {
718720
structtimevaltv;
719721
events=inset;
720-
tv.tv_sec=MtmKeepaliveTimeout/USECS_PER_SEC;
721-
tv.tv_usec=MtmKeepaliveTimeout%USECS_PER_SEC;
722+
tv.tv_sec=MtmHeartbeatRecvTimeout/1000;
723+
tv.tv_usec=MtmHeartbeatRecvTimeout%1000*1000;
722724
do {
723725
n=select(max_fd+1,&events,NULL,NULL,&tv);
724726
}while (n<0&&errno==EINTR);
@@ -855,8 +857,13 @@ static void MtmTransReceiver(Datum arg)
855857
}
856858
}
857859
}
860+
now=MtmGetSystemTime();
861+
if (now>lastHeartbeatCheck+MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
862+
MtmWatchdog();
863+
lastHeartbeatCheck=now;
864+
}
858865
if (n==0&&Mtm->disabledNodeMask!=0) {
859-
/* If timeout is expired and there aredidabled nodes, then recheck cluster's state */
866+
/* If timeout is expired and there aredisabled nodes, then recheck cluster's state */
860867
MtmRefreshClusterStatus(false);
861868
}
862869
}

‎multimaster.c

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ int MtmReplicationNodeId;
186186
intMtmArbiterPort;
187187
intMtmConnectAttempts;
188188
intMtmConnectTimeout;
189-
intMtmKeepaliveTimeout;
189+
intMtmRaftPollDelay;
190190
intMtmReconnectAttempts;
191191
intMtmNodeDisableDelay;
192192
intMtmTransSpillThreshold;
@@ -746,7 +746,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
746746
/*
747747
* Check heartbeats
748748
*/
749-
staticvoidMtmWatchdog()
749+
voidMtmWatchdog(void)
750750
{
751751
inti,n=Mtm->nAllNodes;
752752
timestamp_tnow=MtmGetSystemTime();
@@ -794,33 +794,27 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
794794
MtmResetTransaction(x);
795795
}else {
796796
time_ttransTimeout=Max(Mtm2PCMinTimeout, (ts->csn-ts->snapshot)*Mtm2PCPrepareRatio/100000);/* usec->msec and percents */
797-
time_ttimeout=Min(transTimeout,MtmHeartbeatRecvTimeout);
798-
timestamp_tdeadline=MtmGetSystemTime()+MSEC_TO_USEC(transTimeout);
799797
intresult=0;
800798
intnConfigChanges=Mtm->nConfigChanges;
801799
/* wait votes from all nodes */
802-
while (!ts->votingCompleted) {
800+
while (!ts->votingCompleted&& !(result&WL_TIMEOUT))
801+
{
803802
MtmUnlock();
804-
//MtmWatchdog();
803+
MtmWatchdog();
805804
if (ts->status==TRANSACTION_STATUS_ABORTED) {
806805
elog(WARNING,"Transaction %d(%s) is aborted by watchdog",x->xid,x->gid);
807806
x->status=TRANSACTION_STATUS_ABORTED;
808807
return;
809808
}
810-
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET|WL_TIMEOUT,timeout);
809+
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET|WL_TIMEOUT,transTimeout);
811810
if (result&WL_LATCH_SET) {
812811
ResetLatch(&MyProc->procLatch);
813-
}elseif (result&WL_TIMEOUT) {
814-
if (MtmGetSystemTime()>deadline) {
815-
MtmLock(LW_SHARED);
816-
break;
817-
}
818812
}
819813
MtmLock(LW_SHARED);
820814
}
821815
if (!ts->votingCompleted) {
822816
ts->status=TRANSACTION_STATUS_ABORTED;
823-
elog(WARNING,"Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)timeout, (int)USEC_TO_MSEC(ts->csn-x->snapshot));
817+
elog(WARNING,"Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)transTimeout, (int)USEC_TO_MSEC(ts->csn-x->snapshot));
824818
}elseif (nConfigChanges!=Mtm->nConfigChanges) {
825819
ts->status=TRANSACTION_STATUS_ABORTED;
826820
elog(WARNING,"Transaction is aborted because cluster configuration is changed during commit");
@@ -1368,8 +1362,7 @@ void MtmOnNodeDisconnect(int nodeId)
13681362
BIT_SET(Mtm->reconnectMask,nodeId-1);
13691363
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
13701364

1371-
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
1372-
MtmSleep(MtmKeepaliveTimeout);
1365+
MtmSleep(MtmRaftPollDelay);
13731366

13741367
if (!MtmRefreshClusterStatus(false)) {
13751368
MtmLock(LW_EXCLUSIVE);
@@ -1969,10 +1962,10 @@ _PG_init(void)
19691962
);
19701963

19711964
DefineCustomIntVariable(
1972-
"multimaster.keepalive_timeout",
1973-
"Multimasterkeepalive interval for sockets",
1965+
"multimaster.raft_poll_delay",
1966+
"Multimasterdelay of polling cluster state from Raftable after updating local node status",
19741967
"Timeout in microseconds before polling state of nodes",
1975-
&MtmKeepaliveTimeout,
1968+
&MtmRaftPollDelay,
19761969
1000000,
19771970
1,
19781971
INT_MAX,

‎multimaster.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ extern char* MtmDatabaseName;
217217
externintMtmConnectAttempts;
218218
externintMtmConnectTimeout;
219219
externintMtmReconnectAttempts;
220-
externintMtmKeepaliveTimeout;
220+
externintMtmRaftPollDelay;
221221
externintMtmNodeDisableDelay;
222222
externintMtmTransSpillThreshold;
223223
externintMtmHeartbeatSendTimeout;
@@ -266,6 +266,8 @@ extern void MtmRecoveryCompleted(void);
266266
externvoidMtmMakeTableLocal(char*schema,char*name);
267267
externvoidMtmHandleApplyError(void);
268268
externvoidMtmUpdateLsnMapping(intnodeId,XLogRecPtrendLsn);
269-
externXLogRecPtrMtmGetFlushPosition(intnodeId);
269+
externXLogRecPtrMtmGetFlushPosition(intnodeId);
270+
externvoidMtmWatchdog(void);
271+
270272

271273
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp