Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitda5308c

Browse files
committed
Implement internal heartbeat for multimaster
1 parent781c855 commitda5308c

File tree

3 files changed

+147
-52
lines changed

3 files changed

+147
-52
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include"utils/array.h"
4545
#include"utils/builtins.h"
4646
#include"utils/memutils.h"
47+
#include"utils/timeout.h"
4748
#include"commands/dbcommands.h"
4849
#include"miscadmin.h"
4950
#include"postmaster/autovacuum.h"
@@ -101,22 +102,23 @@ typedef struct
101102

102103
staticint*sockets;
103104
staticintgateway;
105+
staticboolsend_heartbeat;
104106

105107
staticvoidMtmTransSender(Datumarg);
106108
staticvoidMtmTransReceiver(Datumarg);
107109

108-
/*
109-
*static char const* const messageText[] =
110-
*{
111-
*"INVALID",
112-
*"HANDSHAKE",
113-
*"READY",
114-
*"PREPARE",
115-
*"PREPARED",
116-
*"ABORTED",
117-
*"STATUS"
118-
*};
119-
*/
110+
111+
staticcharconst*constmessageText[]=
112+
{
113+
"INVALID",
114+
"HANDSHAKE",
115+
"READY",
116+
"PREPARE",
117+
"PREPARED",
118+
"ABORTED",
119+
"STATUS",
120+
"HEARTBEAT"
121+
};
120122

121123
staticBackgroundWorkerMtmSender= {
122124
"mtm-sender",
@@ -513,14 +515,19 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
513515
}
514516
buf->used=0;
515517
}
516-
MTM_LOG3("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d",
517-
messageText[ts->cmd],ts->csn,node+1,MtmNodeId,ts->gtid.xid,ts->xid);
518-
519-
Assert(ts->cmd!=MSG_INVALID);
520-
buf->data[buf->used].code=ts->cmd;
521518
buf->data[buf->used].dxid=xid;
522-
buf->data[buf->used].sxid=ts->xid;
523-
buf->data[buf->used].csn=ts->csn;
519+
520+
if (ts!=NULL) {
521+
MTM_LOG3("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d",
522+
messageText[ts->cmd],ts->csn,node+1,MtmNodeId,ts->gtid.xid,ts->xid);
523+
Assert(ts->cmd!=MSG_INVALID);
524+
buf->data[buf->used].code=ts->cmd;
525+
buf->data[buf->used].sxid=ts->xid;
526+
buf->data[buf->used].csn=ts->csn;
527+
}else {
528+
buf->data[buf->used].code=MSG_HEARTBEAT;
529+
MTM_LOG3("Send HEARTBEAT message to node %d from node %d\n",node+1,MtmNodeId);
530+
}
524531
buf->data[buf->used].node=MtmNodeId;
525532
buf->data[buf->used].disabledNodeMask=Mtm->disabledNodeMask;
526533
buf->data[buf->used].oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
@@ -533,15 +540,21 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
533540
intn=1;
534541
for (i=0;i<Mtm->nAllNodes;i++)
535542
{
536-
if (!BIT_CHECK(Mtm->disabledNodeMask,i)&&TransactionIdIsValid(ts->xids[i])) {
543+
if (!BIT_CHECK(Mtm->disabledNodeMask,i)&&(ts==NULL||TransactionIdIsValid(ts->xids[i]))) {
537544
Assert(i+1!=MtmNodeId);
538-
MtmAppendBuffer(txBuffer,ts->xids[i],i,ts);
545+
MtmAppendBuffer(txBuffer,ts ?ts->xids[i] :InvalidTransactionId,i,ts);
539546
n+=1;
540547
}
541548
}
542549
Assert(n==Mtm->nLiveNodes);
543550
}
544551

552+
staticvoidMtmSendHeartbeat()
553+
{
554+
send_heartbeat= true;
555+
PGSemaphoreUnlock(&Mtm->votingSemaphore);
556+
}
557+
545558

546559
staticvoidMtmTransSender(Datumarg)
547560
{
@@ -556,6 +569,8 @@ static void MtmTransSender(Datum arg)
556569
sigfillset(&sset);
557570
sigprocmask(SIG_UNBLOCK,&sset,NULL);
558571

572+
RegisterTimeout(USER_TIMEOUT,MtmSendHeartbeat);
573+
559574
MtmOpenConnections();
560575

561576
for (i=0;i<nNodes;i++) {
@@ -567,6 +582,10 @@ static void MtmTransSender(Datum arg)
567582
PGSemaphoreLock(&Mtm->votingSemaphore);
568583
CHECK_FOR_INTERRUPTS();
569584

585+
if (send_heartbeat) {
586+
send_heartbeat= false;
587+
MtmBroadcastMessage(txBuffer,NULL);
588+
}
570589
/*
571590
* Use shared lock to improve locality,
572591
* because all other process modifying this list are using exclusive lock
@@ -700,15 +719,22 @@ static void MtmTransReceiver(Datum arg)
700719

701720
for (j=0;j<nResponses;j++) {
702721
MtmArbiterMessage*msg=&rxBuffer[i].data[j];
703-
MtmTransState*ts= (MtmTransState*)hash_search(MtmXid2State,&msg->dxid,HASH_FIND,NULL);
704-
Assert(ts!=NULL);
722+
MtmTransState*ts;
723+
705724
Assert(msg->node>0&&msg->node <=nNodes&&msg->node!=MtmNodeId);
725+
Mtm->nodes[msg->node-1].oldestSnapshot=msg->oldestSnapshot;
726+
Mtm->nodes[msg->node-1].lastHeartbeat=MtmGetSystemTime();
727+
728+
if (msg->code==MSG_HEARTBEAT) {
729+
continue;
730+
}
731+
ts= (MtmTransState*)hash_search(MtmXid2State,&msg->dxid,HASH_FIND,NULL);
732+
Assert(ts!=NULL);
706733

707734
if (BIT_CHECK(msg->disabledNodeMask,MtmNodeId-1)&&Mtm->status!=MTM_RECOVERY) {
708735
elog(PANIC,"Node %d thinks that I was dead: perform hara-kiri not to be a zombie",msg->node);
709736
}
710-
Mtm->nodes[msg->node-1].oldestSnapshot=msg->oldestSnapshot;
711-
737+
712738
if (MtmIsCoordinator(ts)) {
713739
switch (msg->code) {
714740
caseMSG_READY:
@@ -768,7 +794,7 @@ static void MtmTransReceiver(Datum arg)
768794
}else {
769795
switch (msg->code) {
770796
caseMSG_PREPARE:
771-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
797+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
772798
ts->status=TRANSACTION_STATUS_UNKNOWN;
773799
ts->csn=MtmAssignCSN();
774800
MtmAdjustSubtransactions(ts);

‎contrib/mmts/multimaster.c‎

Lines changed: 90 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ int MtmReconnectAttempts;
192192
intMtmNodeDisableDelay;
193193
intMtmTransSpillThreshold;
194194
intMtmMaxNodes;
195+
intMtmHeartbeatSendTimeout;
196+
intMtmHeartbeatRecvTimeout;
195197
boolMtmUseRaftable;
196198
boolMtmUseDtm;
197199

@@ -742,6 +744,27 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
742744

743745
}
744746

747+
/*
748+
* Check heartbeats
749+
*/
750+
staticvoidMtmWatchdog()
751+
{
752+
inti,n=Mtm->nAllNodes;
753+
timestamp_tnow=MtmGetSystemTime();
754+
for (i=0;i<n;i++) {
755+
if (i+1!=MtmNodeId&& !BIT_CHECK(Mtm->disabledNodeMask,i)) {
756+
if (Mtm->nodes[i].lastHeartbeat!=0
757+
&&now>Mtm->nodes[i].lastHeartbeat+MSEC_TO_USEC(MtmHeartbeatRecvTimeout))
758+
{
759+
elog(WARNING,"Disable node %d because last heartbeat was received %d msec ago",
760+
i+1, (int)USEC_TO_MSEC(now-Mtm->nodes[i].lastHeartbeat));
761+
MtmOnNodeDisconnect(i+1);
762+
}
763+
}
764+
}
765+
}
766+
767+
745768
staticvoid
746769
MtmPostPrepareTransaction(MtmCurrentTrans*x)
747770
{
@@ -771,14 +794,24 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
771794
MtmUnlock();
772795
MtmResetTransaction(x);
773796
}else {
774-
time_ttimeout=Max(Mtm2PCMinTimeout, (ts->csn-ts->snapshot)*Mtm2PCPrepareRatio/100000);/* usec->msec and percents */
797+
time_ttransTimeout=Max(Mtm2PCMinTimeout, (ts->csn-ts->snapshot)*Mtm2PCPrepareRatio/100000);/* usec->msec and percents */
798+
time_ttimeout=transTimeout<MtmHeartbeatRecvTimeout ?transTimeout :MtmHeartbeatRecvTimeout;
799+
timestamp_tdeadline=MtmGetSystemTime()+MSEC_TO_USEC(transTimeout);
775800
intresult=0;
776801
intnConfigChanges=Mtm->nConfigChanges;
777802
/* wait votes from all nodes */
778-
while (!ts->votingCompleted&& !(result&WL_TIMEOUT)) {
803+
while (!ts->votingCompleted) {
779804
MtmUnlock();
805+
MtmWatchdog();
780806
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET|WL_TIMEOUT,timeout);
781-
ResetLatch(&MyProc->procLatch);
807+
if (result&WL_TIMEOUT) {
808+
if (MtmGetSystemTime()>deadline) {
809+
MtmLock(LW_SHARED);
810+
break;
811+
}
812+
}else {
813+
ResetLatch(&MyProc->procLatch);
814+
}
782815
MtmLock(LW_SHARED);
783816
}
784817
if (!ts->votingCompleted) {
@@ -1023,6 +1056,22 @@ void MtmHandleApplyError(void)
10231056
}
10241057

10251058

1059+
staticvoidMtmDisableNode(intnodeId)
1060+
{
1061+
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
1062+
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
1063+
Mtm->nodes[nodeId-1].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
1064+
Mtm->nLiveNodes-=1;
1065+
}
1066+
1067+
staticvoidMtmEnableNode(intnodeId)
1068+
{
1069+
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
1070+
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
1071+
Mtm->nodes[nodeId-1].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
1072+
Mtm->nLiveNodes+=1;
1073+
}
1074+
10261075
voidMtmRecoveryCompleted(void)
10271076
{
10281077
MTM_LOG1("Recovery of node %d is completed",MtmNodeId);
@@ -1117,9 +1166,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
11171166
MTM_LOG1("%d: node %d is caugth-up without locking cluster",MyProcPid,nodeId);
11181167
/* We are lucky: caugth-up without locking cluster! */
11191168
}
1120-
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
1121-
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
1122-
Mtm->nLiveNodes+=1;
1169+
MtmEnableNode(nodeId);
11231170
Mtm->nConfigChanges+=1;
11241171
caughtUp= true;
11251172
}elseif (!BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)
@@ -1262,17 +1309,13 @@ bool MtmRefreshClusterStatus(bool nowait)
12621309
mask= ~clique& (((nodemask_t)1 <<Mtm->nAllNodes)-1)& ~Mtm->disabledNodeMask;/* new disabled nodes mask */
12631310
for (i=0;mask!=0;i++,mask >>=1) {
12641311
if (mask&1) {
1265-
Mtm->nLiveNodes-=1;
1266-
BIT_SET(Mtm->disabledNodeMask,i);
1267-
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
1312+
MtmDisableNode(i+1);
12681313
}
12691314
}
12701315
mask=clique&Mtm->disabledNodeMask;/* new enabled nodes mask */
12711316
for (i=0;mask!=0;i++,mask >>=1) {
12721317
if (mask&1) {
1273-
Mtm->nLiveNodes+=1;
1274-
BIT_CLEAR(Mtm->disabledNodeMask,i);
1275-
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
1318+
MtmEnableNode(i+1);
12761319
}
12771320
}
12781321
MtmCheckQuorum();
@@ -1317,7 +1360,6 @@ void MtmOnNodeDisconnect(int nodeId)
13171360
/* Avoid false detection of node failure and prevent node status blinking */
13181361
return;
13191362
}
1320-
13211363
BIT_SET(Mtm->connectivityMask,nodeId-1);
13221364
BIT_SET(Mtm->reconnectMask,nodeId-1);
13231365
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
@@ -1328,9 +1370,7 @@ void MtmOnNodeDisconnect(int nodeId)
13281370
if (!MtmRefreshClusterStatus(false)) {
13291371
MtmLock(LW_EXCLUSIVE);
13301372
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
1331-
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
1332-
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
1333-
Mtm->nLiveNodes-=1;
1373+
MtmDisableNode(nodeId);
13341374
MtmCheckQuorum();
13351375
/* Interrupt voting for active transaction and abort them */
13361376
for (ts=Mtm->transListHead;ts!=NULL;ts=ts->next) {
@@ -1504,6 +1544,7 @@ static void MtmInitialize()
15041544
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
15051545
Mtm->nodes[i].con=MtmConnections[i];
15061546
Mtm->nodes[i].flushPos=0;
1547+
Mtm->nodes[i].lastHeartbeat=0;
15071548
}
15081549
PGSemaphoreCreate(&Mtm->votingSemaphore);
15091550
PGSemaphoreReset(&Mtm->votingSemaphore);
@@ -1628,6 +1669,36 @@ _PG_init(void)
16281669
if (!process_shared_preload_libraries_in_progress)
16291670
return;
16301671

1672+
DefineCustomIntVariable(
1673+
"multimaster.heartbeat_send_timeout",
1674+
"Timeout in milliseconds of sending heartbeat messages",
1675+
"Period of broadcasting heartbeat messages by abiter to all nodes",
1676+
&MtmHeartbeatSendTimeout,
1677+
1000,
1678+
1,
1679+
INT_MAX,
1680+
PGC_BACKEND,
1681+
0,
1682+
NULL,
1683+
NULL,
1684+
NULL
1685+
);
1686+
1687+
DefineCustomIntVariable(
1688+
"multimaster.heartbeat_recv_timeout",
1689+
"Timeout in milliseconds of receiving heartbeat messages",
1690+
"If no heartbeat message is received from node within this period, it assumed to be dead",
1691+
&MtmHeartbeatRecvTimeout,
1692+
2000,
1693+
1,
1694+
INT_MAX,
1695+
PGC_BACKEND,
1696+
0,
1697+
NULL,
1698+
NULL,
1699+
NULL
1700+
);
1701+
16311702
DefineCustomIntVariable(
16321703
"multimaster.gc_period",
16331704
"Number of distributed transactions after which garbage collection is started",
@@ -2057,9 +2128,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
20572128
{
20582129
elog(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nLiveNodes);
20592130
}
2060-
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
2061-
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
2062-
Mtm->nLiveNodes-=1;
2131+
MtmDisableNode(nodeId);
20632132
MtmCheckQuorum();
20642133
if (!MtmIsBroadcast())
20652134
{
@@ -2111,17 +2180,13 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
21112180
if (MtmIsRecoverySession) {
21122181
MTM_LOG1("%d: Node %d start recovery of node %d",MyProcPid,MtmNodeId,MtmReplicationNodeId);
21132182
if (!BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
2114-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime=MtmGetSystemTime();
2115-
BIT_SET(Mtm->disabledNodeMask,MtmReplicationNodeId-1);
2116-
Mtm->nLiveNodes-=1;
2183+
MtmDisableNode(MtmReplicationNodeId);
21172184
MtmCheckQuorum();
21182185
}
21192186
}elseif (BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
21202187
if (recoveryCompleted) {
21212188
MTM_LOG1("Node %d consider that recovery of node %d is completed: start normal replication",MtmNodeId,MtmReplicationNodeId);
2122-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime=MtmGetSystemTime();
2123-
BIT_CLEAR(Mtm->disabledNodeMask,MtmReplicationNodeId-1);
2124-
Mtm->nLiveNodes+=1;
2189+
MtmEnableNode(MtmReplicationNodeId);
21252190
MtmCheckQuorum();
21262191
}else {
21272192
elog(ERROR,"Disabled node %d tries to reconnect without recovery",MtmReplicationNodeId);

‎contrib/mmts/multimaster.h‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ typedef enum
9292
MSG_PREPARE,
9393
MSG_PREPARED,
9494
MSG_ABORTED,
95-
MSG_STATUS
95+
MSG_STATUS,
96+
MSG_HEARTBEAT
9697
}MtmMessageCode;
9798

9899
typedefenum
@@ -127,6 +128,7 @@ typedef struct
127128
timestamp_tlastStatusChangeTime;
128129
timestamp_treceiverStartTime;
129130
timestamp_tsenderStartTime;
131+
timestamp_tlastHeartbeat;
130132
intsenderPid;
131133
intreceiverPid;
132134
XLogRecPtrflushPos;
@@ -218,6 +220,8 @@ extern int MtmReconnectAttempts;
218220
externintMtmKeepaliveTimeout;
219221
externintMtmNodeDisableDelay;
220222
externintMtmTransSpillThreshold;
223+
externintMtmHeartbeatSendTimeout;
224+
externintMtmHeartbeatRecvTimeout;
221225
externboolMtmUseDtm;
222226
externHTAB*MtmXid2State;
223227

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp