Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2d69b46

Browse files
committed
Handle heartbeat timesouts in arbiter receiver
1 parent1c24395 commit2d69b46

File tree

4 files changed

+27
-25
lines changed

4 files changed

+27
-25
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,8 @@ static void MtmTransReceiver(Datum arg)
675675
intnResponses;
676676
inti,j,n,rc;
677677
MtmBuffer*rxBuffer= (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
678+
timestamp_tlastHeartbeatCheck=MtmGetSystemTime();
679+
timestamp_tnow;
678680

679681
#ifUSE_EPOLL
680682
structepoll_event*events= (structepoll_event*)palloc(sizeof(structepoll_event)*nNodes);
@@ -698,7 +700,7 @@ static void MtmTransReceiver(Datum arg)
698700

699701
while (!stop) {
700702
#ifUSE_EPOLL
701-
n=epoll_wait(epollfd,events,nNodes,USEC_TO_MSEC(MtmKeepaliveTimeout));
703+
n=epoll_wait(epollfd,events,nNodes,MtmHeartbeatRecvTimeout);
702704
if (n<0) {
703705
if (errno==EINTR) {
704706
continue;
@@ -717,8 +719,8 @@ static void MtmTransReceiver(Datum arg)
717719
do {
718720
structtimevaltv;
719721
events=inset;
720-
tv.tv_sec=MtmKeepaliveTimeout/USECS_PER_SEC;
721-
tv.tv_usec=MtmKeepaliveTimeout%USECS_PER_SEC;
722+
tv.tv_sec=MtmHeartbeatRecvTimeout/1000;
723+
tv.tv_usec=MtmHeartbeatRecvTimeout%1000*1000;
722724
do {
723725
n=select(max_fd+1,&events,NULL,NULL,&tv);
724726
}while (n<0&&errno==EINTR);
@@ -855,8 +857,13 @@ static void MtmTransReceiver(Datum arg)
855857
}
856858
}
857859
}
860+
now=MtmGetSystemTime();
861+
if (now>lastHeartbeatCheck+MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
862+
MtmWatchdog();
863+
lastHeartbeatCheck=now;
864+
}
858865
if (n==0&&Mtm->disabledNodeMask!=0) {
859-
/* If timeout is expired and there aredidabled nodes, then recheck cluster's state */
866+
/* If timeout is expired and there aredisabled nodes, then recheck cluster's state */
860867
MtmRefreshClusterStatus(false);
861868
}
862869
}

‎contrib/mmts/multimaster.c‎

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ int MtmReplicationNodeId;
187187
intMtmArbiterPort;
188188
intMtmConnectAttempts;
189189
intMtmConnectTimeout;
190-
intMtmKeepaliveTimeout;
190+
intMtmRaftPollDelay;
191191
intMtmReconnectAttempts;
192192
intMtmNodeDisableDelay;
193193
intMtmTransSpillThreshold;
@@ -747,7 +747,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
747747
/*
748748
* Check heartbeats
749749
*/
750-
staticvoidMtmWatchdog()
750+
voidMtmWatchdog(void)
751751
{
752752
inti,n=Mtm->nAllNodes;
753753
timestamp_tnow=MtmGetSystemTime();
@@ -795,33 +795,27 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
795795
MtmResetTransaction(x);
796796
}else {
797797
time_ttransTimeout=Max(Mtm2PCMinTimeout, (ts->csn-ts->snapshot)*Mtm2PCPrepareRatio/100000);/* usec->msec and percents */
798-
time_ttimeout=Min(transTimeout,MtmHeartbeatRecvTimeout);
799-
timestamp_tdeadline=MtmGetSystemTime()+MSEC_TO_USEC(transTimeout);
800798
intresult=0;
801799
intnConfigChanges=Mtm->nConfigChanges;
802800
/* wait votes from all nodes */
803-
while (!ts->votingCompleted) {
801+
while (!ts->votingCompleted&& !(result&WL_TIMEOUT))
802+
{
804803
MtmUnlock();
805-
//MtmWatchdog();
804+
MtmWatchdog();
806805
if (ts->status==TRANSACTION_STATUS_ABORTED) {
807806
elog(WARNING,"Transaction %d(%s) is aborted by watchdog",x->xid,x->gid);
808807
x->status=TRANSACTION_STATUS_ABORTED;
809808
return;
810809
}
811-
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET|WL_TIMEOUT,timeout);
810+
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET|WL_TIMEOUT,transTimeout);
812811
if (result&WL_LATCH_SET) {
813812
ResetLatch(&MyProc->procLatch);
814-
}elseif (result&WL_TIMEOUT) {
815-
if (MtmGetSystemTime()>deadline) {
816-
MtmLock(LW_SHARED);
817-
break;
818-
}
819813
}
820814
MtmLock(LW_SHARED);
821815
}
822816
if (!ts->votingCompleted) {
823817
ts->status=TRANSACTION_STATUS_ABORTED;
824-
elog(WARNING,"Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)timeout, (int)USEC_TO_MSEC(ts->csn-x->snapshot));
818+
elog(WARNING,"Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)transTimeout, (int)USEC_TO_MSEC(ts->csn-x->snapshot));
825819
}elseif (nConfigChanges!=Mtm->nConfigChanges) {
826820
ts->status=TRANSACTION_STATUS_ABORTED;
827821
elog(WARNING,"Transaction is aborted because cluster configuration is changed during commit");
@@ -1369,8 +1363,7 @@ void MtmOnNodeDisconnect(int nodeId)
13691363
BIT_SET(Mtm->reconnectMask,nodeId-1);
13701364
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
13711365

1372-
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
1373-
MtmSleep(MtmKeepaliveTimeout);
1366+
MtmSleep(MtmRaftPollDelay);
13741367

13751368
if (!MtmRefreshClusterStatus(false)) {
13761369
MtmLock(LW_EXCLUSIVE);
@@ -1970,10 +1963,10 @@ _PG_init(void)
19701963
);
19711964

19721965
DefineCustomIntVariable(
1973-
"multimaster.keepalive_timeout",
1974-
"Multimasterkeepalive interval for sockets",
1966+
"multimaster.raft_poll_delay",
1967+
"Multimasterdelay of polling cluster state from Raftable after updating local node status",
19751968
"Timeout in microseconds before polling state of nodes",
1976-
&MtmKeepaliveTimeout,
1969+
&MtmRaftPollDelay,
19771970
1000000,
19781971
1,
19791972
INT_MAX,

‎contrib/mmts/multimaster.h‎

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ extern char* MtmDatabaseName;
217217
externintMtmConnectAttempts;
218218
externintMtmConnectTimeout;
219219
externintMtmReconnectAttempts;
220-
externintMtmKeepaliveTimeout;
220+
externintMtmRaftPollDelay;
221221
externintMtmNodeDisableDelay;
222222
externintMtmTransSpillThreshold;
223223
externintMtmHeartbeatSendTimeout;
@@ -266,6 +266,8 @@ extern void MtmRecoveryCompleted(void);
266266
externvoidMtmMakeTableLocal(char*schema,char*name);
267267
externvoidMtmHandleApplyError(void);
268268
externvoidMtmUpdateLsnMapping(intnodeId,XLogRecPtrendLsn);
269-
externXLogRecPtrMtmGetFlushPosition(intnodeId);
269+
externXLogRecPtrMtmGetFlushPosition(intnodeId);
270+
externvoidMtmWatchdog(void);
271+
270272

271273
#endif

‎contrib/raftable/state.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#include"blockmem.h"
77
#include"state.h"
88

9-
#defineRAFTABLE_BLOCK_MEM (1024 * 1024)
9+
#defineRAFTABLE_BLOCK_MEM (8*1024 * 1024)
1010
#defineRAFTABLE_HASH_SIZE (127)
1111

1212
typedefstructState {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp