Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit63d4492

Browse files
knizhnikkelvich
authored andcommitted
Check for heartbeat in raftable_set
1 parentd50a7ec commit63d4492

File tree

4 files changed

+23
-20
lines changed

4 files changed

+23
-20
lines changed

‎arbiter.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,6 @@ static int busy_socket;
112112
staticvoidMtmTransSender(Datumarg);
113113
staticvoidMtmTransReceiver(Datumarg);
114114
staticvoidMtmSendHeartbeat(void);
115-
staticvoidMtmCheckHeartbeat(void);
116-
117115

118116

119117
staticcharconst*constmessageText[]=
@@ -356,7 +354,7 @@ static void MtmSendHeartbeat()
356354

357355
}
358356

359-
staticvoidMtmCheckHeartbeat()
357+
voidMtmCheckHeartbeat()
360358
{
361359
if (send_heartbeat) {
362360
send_heartbeat= false;

‎multimaster.c

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -795,8 +795,8 @@ void MtmWatchdog(void)
795795
if (Mtm->nodes[i].lastHeartbeat!=0
796796
&&now>Mtm->nodes[i].lastHeartbeat+MSEC_TO_USEC(MtmHeartbeatRecvTimeout))
797797
{
798-
elog(WARNING,"Disablenode %dbecause last heartbeat was received%d msec ago (%ld)",
799-
i+1, (int)USEC_TO_MSEC(now-Mtm->nodes[i].lastHeartbeat),USEC_TO_MSEC(now));
798+
elog(WARNING,"Heartbeat was received fromnode %dduring%d msec",
799+
i+1, (int)USEC_TO_MSEC(now-Mtm->nodes[i].lastHeartbeat));
800800
MtmOnNodeDisconnect(i+1);
801801
}
802802
}
@@ -839,24 +839,20 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
839839

840840
timestamp_tstart=MtmGetSystemTime();
841841
/* wait votes from all nodes */
842-
while (!ts->votingCompleted&&start+transTimeout >=MtmGetSystemTime())
842+
while (!ts->votingCompleted&&ts->status!=TRANSACTION_STATUS_ABORTED&&start+transTimeout >=MtmGetSystemTime())
843843
{
844844
MtmUnlock();
845-
MtmWatchdog();
846-
if (ts->status==TRANSACTION_STATUS_ABORTED) {
847-
elog(WARNING,"Transaction %d(%s) is aborted by watchdog",x->xid,x->gid);
848-
x->status=TRANSACTION_STATUS_ABORTED;
849-
return;
850-
}
851845
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET|WL_TIMEOUT,MtmHeartbeatRecvTimeout);
852846
if (result&WL_LATCH_SET) {
853847
ResetLatch(&MyProc->procLatch);
854848
}
855849
MtmLock(LW_SHARED);
856850
}
857-
if (!ts->votingCompleted) {
858-
MtmAbortTransaction(ts);
859-
elog(WARNING,"Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)transTimeout, (int)USEC_TO_MSEC(ts->csn-x->snapshot));
851+
if (!ts->votingCompleted) {
852+
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
853+
MtmAbortTransaction(ts);
854+
elog(WARNING,"Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)transTimeout, (int)USEC_TO_MSEC(ts->csn-x->snapshot));
855+
}
860856
}elseif (nConfigChanges!=Mtm->nConfigChanges) {
861857
MtmAbortTransaction(ts);
862858
elog(WARNING,"Transaction is aborted because cluster configuration is changed during commit");
@@ -1435,7 +1431,7 @@ void MtmOnNodeDisconnect(int nodeId)
14351431
BIT_SET(Mtm->reconnectMask,nodeId-1);
14361432
MtmUnlock();
14371433

1438-
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask,true);/*false); -- TODO: raftable is hanged with nowait=true */
1434+
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
14391435

14401436
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
14411437

@@ -1469,7 +1465,7 @@ void MtmOnNodeConnect(int nodeId)
14691465
MtmUnlock();
14701466

14711467
MTM_LOG1("Reconnect node %d",nodeId);
1472-
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask,true);/*false);-- TODO: raftable is hanged with nowait=true */
1468+
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
14731469
}
14741470

14751471

‎multimaster.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include"bgwpool.h"
66
#include"bkb.h"
77

8+
#include"access/clog.h"
89
#include"pglogical_output/hooks.h"
910

1011
#defineDEBUG_LEVEL 0
@@ -268,6 +269,8 @@ extern void MtmHandleApplyError(void);
268269
externvoidMtmUpdateLsnMapping(intnodeId,XLogRecPtrendLsn);
269270
externXLogRecPtrMtmGetFlushPosition(intnodeId);
270271
externvoidMtmWatchdog(void);
272+
externvoidMtmCheckHeartbeat(void);
273+
271274

272275

273276
#endif

‎raftable.c

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#include"postgres.h"
33
#include"raftable.h"
44
#include"raftable_wrapper.h"
5-
5+
#include"multimaster.h"
66

77
/*
88
* Raftable function proxies
@@ -18,8 +18,14 @@ void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool now
1818

1919
voidRaftableSet(charconst*key,voidconst*value,size_tsize,boolnowait)
2020
{
21-
if (MtmUseRaftable) {
22-
raftable_set(key,value,size,nowait ?0 :-1);
21+
if (MtmUseRaftable) {
22+
if (nowait) {
23+
raftable_set(key,value,size,0);
24+
}else {
25+
while (!raftable_set(key,value,size,MtmHeartbeatSendTimeout)) {
26+
MtmCheckHeartbeat();
27+
}
28+
}
2329
}
2430
}
2531

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp