Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitde99b01

Browse files
knizhnikkelvich
authored andcommitted
Implement internal heartbeat for multimaster
1 parent8a26667 commitde99b01

File tree

3 files changed

+147
-52
lines changed

3 files changed

+147
-52
lines changed

‎arbiter.c

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include"utils/array.h"
4545
#include"utils/builtins.h"
4646
#include"utils/memutils.h"
47+
#include"utils/timeout.h"
4748
#include"commands/dbcommands.h"
4849
#include"miscadmin.h"
4950
#include"postmaster/autovacuum.h"
@@ -101,22 +102,23 @@ typedef struct
101102

102103
staticint*sockets;
103104
staticintgateway;
105+
staticboolsend_heartbeat;
104106

105107
staticvoidMtmTransSender(Datumarg);
106108
staticvoidMtmTransReceiver(Datumarg);
107109

108-
/*
109-
*static char const* const messageText[] =
110-
*{
111-
*"INVALID",
112-
*"HANDSHAKE",
113-
*"READY",
114-
*"PREPARE",
115-
*"PREPARED",
116-
*"ABORTED",
117-
*"STATUS"
118-
*};
119-
*/
110+
111+
staticcharconst*constmessageText[]=
112+
{
113+
"INVALID",
114+
"HANDSHAKE",
115+
"READY",
116+
"PREPARE",
117+
"PREPARED",
118+
"ABORTED",
119+
"STATUS",
120+
"HEARTBEAT"
121+
};
120122

121123
staticBackgroundWorkerMtmSender= {
122124
"mtm-sender",
@@ -513,14 +515,19 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
513515
}
514516
buf->used=0;
515517
}
516-
MTM_LOG3("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d",
517-
messageText[ts->cmd],ts->csn,node+1,MtmNodeId,ts->gtid.xid,ts->xid);
518-
519-
Assert(ts->cmd!=MSG_INVALID);
520-
buf->data[buf->used].code=ts->cmd;
521518
buf->data[buf->used].dxid=xid;
522-
buf->data[buf->used].sxid=ts->xid;
523-
buf->data[buf->used].csn=ts->csn;
519+
520+
if (ts!=NULL) {
521+
MTM_LOG3("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d",
522+
messageText[ts->cmd],ts->csn,node+1,MtmNodeId,ts->gtid.xid,ts->xid);
523+
Assert(ts->cmd!=MSG_INVALID);
524+
buf->data[buf->used].code=ts->cmd;
525+
buf->data[buf->used].sxid=ts->xid;
526+
buf->data[buf->used].csn=ts->csn;
527+
}else {
528+
buf->data[buf->used].code=MSG_HEARTBEAT;
529+
MTM_LOG3("Send HEARTBEAT message to node %d from node %d\n",node+1,MtmNodeId);
530+
}
524531
buf->data[buf->used].node=MtmNodeId;
525532
buf->data[buf->used].disabledNodeMask=Mtm->disabledNodeMask;
526533
buf->data[buf->used].oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
@@ -533,15 +540,21 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
533540
intn=1;
534541
for (i=0;i<Mtm->nAllNodes;i++)
535542
{
536-
if (!BIT_CHECK(Mtm->disabledNodeMask,i)&&TransactionIdIsValid(ts->xids[i])) {
543+
if (!BIT_CHECK(Mtm->disabledNodeMask,i)&&(ts==NULL||TransactionIdIsValid(ts->xids[i]))) {
537544
Assert(i+1!=MtmNodeId);
538-
MtmAppendBuffer(txBuffer,ts->xids[i],i,ts);
545+
MtmAppendBuffer(txBuffer,ts ?ts->xids[i] :InvalidTransactionId,i,ts);
539546
n+=1;
540547
}
541548
}
542549
Assert(n==Mtm->nLiveNodes);
543550
}
544551

552+
staticvoidMtmSendHeartbeat()
553+
{
554+
send_heartbeat= true;
555+
PGSemaphoreUnlock(&Mtm->votingSemaphore);
556+
}
557+
545558

546559
staticvoidMtmTransSender(Datumarg)
547560
{
@@ -556,6 +569,8 @@ static void MtmTransSender(Datum arg)
556569
sigfillset(&sset);
557570
sigprocmask(SIG_UNBLOCK,&sset,NULL);
558571

572+
RegisterTimeout(USER_TIMEOUT,MtmSendHeartbeat);
573+
559574
MtmOpenConnections();
560575

561576
for (i=0;i<nNodes;i++) {
@@ -567,6 +582,10 @@ static void MtmTransSender(Datum arg)
567582
PGSemaphoreLock(&Mtm->votingSemaphore);
568583
CHECK_FOR_INTERRUPTS();
569584

585+
if (send_heartbeat) {
586+
send_heartbeat= false;
587+
MtmBroadcastMessage(txBuffer,NULL);
588+
}
570589
/*
571590
* Use shared lock to improve locality,
572591
* because all other process modifying this list are using exclusive lock
@@ -700,15 +719,22 @@ static void MtmTransReceiver(Datum arg)
700719

701720
for (j=0;j<nResponses;j++) {
702721
MtmArbiterMessage*msg=&rxBuffer[i].data[j];
703-
MtmTransState*ts= (MtmTransState*)hash_search(MtmXid2State,&msg->dxid,HASH_FIND,NULL);
704-
Assert(ts!=NULL);
722+
MtmTransState*ts;
723+
705724
Assert(msg->node>0&&msg->node <=nNodes&&msg->node!=MtmNodeId);
725+
Mtm->nodes[msg->node-1].oldestSnapshot=msg->oldestSnapshot;
726+
Mtm->nodes[msg->node-1].lastHeartbeat=MtmGetSystemTime();
727+
728+
if (msg->code==MSG_HEARTBEAT) {
729+
continue;
730+
}
731+
ts= (MtmTransState*)hash_search(MtmXid2State,&msg->dxid,HASH_FIND,NULL);
732+
Assert(ts!=NULL);
706733

707734
if (BIT_CHECK(msg->disabledNodeMask,MtmNodeId-1)&&Mtm->status!=MTM_RECOVERY) {
708735
elog(PANIC,"Node %d thinks that I was dead: perform hara-kiri not to be a zombie",msg->node);
709736
}
710-
Mtm->nodes[msg->node-1].oldestSnapshot=msg->oldestSnapshot;
711-
737+
712738
if (MtmIsCoordinator(ts)) {
713739
switch (msg->code) {
714740
caseMSG_READY:
@@ -768,7 +794,7 @@ static void MtmTransReceiver(Datum arg)
768794
}else {
769795
switch (msg->code) {
770796
caseMSG_PREPARE:
771-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
797+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
772798
ts->status=TRANSACTION_STATUS_UNKNOWN;
773799
ts->csn=MtmAssignCSN();
774800
MtmAdjustSubtransactions(ts);

‎multimaster.c

Lines changed: 90 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ int MtmReconnectAttempts;
191191
intMtmNodeDisableDelay;
192192
intMtmTransSpillThreshold;
193193
intMtmMaxNodes;
194+
intMtmHeartbeatSendTimeout;
195+
intMtmHeartbeatRecvTimeout;
194196
boolMtmUseRaftable;
195197
boolMtmUseDtm;
196198

@@ -741,6 +743,27 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
741743

742744
}
743745

746+
/*
747+
* Check heartbeats
748+
*/
749+
staticvoidMtmWatchdog()
750+
{
751+
inti,n=Mtm->nAllNodes;
752+
timestamp_tnow=MtmGetSystemTime();
753+
for (i=0;i<n;i++) {
754+
if (i+1!=MtmNodeId&& !BIT_CHECK(Mtm->disabledNodeMask,i)) {
755+
if (Mtm->nodes[i].lastHeartbeat!=0
756+
&&now>Mtm->nodes[i].lastHeartbeat+MSEC_TO_USEC(MtmHeartbeatRecvTimeout))
757+
{
758+
elog(WARNING,"Disable node %d because last heartbeat was received %d msec ago",
759+
i+1, (int)USEC_TO_MSEC(now-Mtm->nodes[i].lastHeartbeat));
760+
MtmOnNodeDisconnect(i+1);
761+
}
762+
}
763+
}
764+
}
765+
766+
744767
staticvoid
745768
MtmPostPrepareTransaction(MtmCurrentTrans*x)
746769
{
@@ -770,14 +793,24 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
770793
MtmUnlock();
771794
MtmResetTransaction(x);
772795
}else {
773-
time_ttimeout=Max(Mtm2PCMinTimeout, (ts->csn-ts->snapshot)*Mtm2PCPrepareRatio/100000);/* usec->msec and percents */
796+
time_ttransTimeout=Max(Mtm2PCMinTimeout, (ts->csn-ts->snapshot)*Mtm2PCPrepareRatio/100000);/* usec->msec and percents */
797+
time_ttimeout=transTimeout<MtmHeartbeatRecvTimeout ?transTimeout :MtmHeartbeatRecvTimeout;
798+
timestamp_tdeadline=MtmGetSystemTime()+MSEC_TO_USEC(transTimeout);
774799
intresult=0;
775800
intnConfigChanges=Mtm->nConfigChanges;
776801
/* wait votes from all nodes */
777-
while (!ts->votingCompleted&& !(result&WL_TIMEOUT)) {
802+
while (!ts->votingCompleted) {
778803
MtmUnlock();
804+
MtmWatchdog();
779805
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET|WL_TIMEOUT,timeout);
780-
ResetLatch(&MyProc->procLatch);
806+
if (result&WL_TIMEOUT) {
807+
if (MtmGetSystemTime()>deadline) {
808+
MtmLock(LW_SHARED);
809+
break;
810+
}
811+
}else {
812+
ResetLatch(&MyProc->procLatch);
813+
}
781814
MtmLock(LW_SHARED);
782815
}
783816
if (!ts->votingCompleted) {
@@ -1022,6 +1055,22 @@ void MtmHandleApplyError(void)
10221055
}
10231056

10241057

1058+
staticvoidMtmDisableNode(intnodeId)
1059+
{
1060+
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
1061+
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
1062+
Mtm->nodes[nodeId-1].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
1063+
Mtm->nLiveNodes-=1;
1064+
}
1065+
1066+
staticvoidMtmEnableNode(intnodeId)
1067+
{
1068+
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
1069+
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
1070+
Mtm->nodes[nodeId-1].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
1071+
Mtm->nLiveNodes+=1;
1072+
}
1073+
10251074
voidMtmRecoveryCompleted(void)
10261075
{
10271076
MTM_LOG1("Recovery of node %d is completed",MtmNodeId);
@@ -1116,9 +1165,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
11161165
MTM_LOG1("%d: node %d is caugth-up without locking cluster",MyProcPid,nodeId);
11171166
/* We are lucky: caugth-up without locking cluster! */
11181167
}
1119-
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
1120-
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
1121-
Mtm->nLiveNodes+=1;
1168+
MtmEnableNode(nodeId);
11221169
Mtm->nConfigChanges+=1;
11231170
caughtUp= true;
11241171
}elseif (!BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)
@@ -1261,17 +1308,13 @@ bool MtmRefreshClusterStatus(bool nowait)
12611308
mask= ~clique& (((nodemask_t)1 <<Mtm->nAllNodes)-1)& ~Mtm->disabledNodeMask;/* new disabled nodes mask */
12621309
for (i=0;mask!=0;i++,mask >>=1) {
12631310
if (mask&1) {
1264-
Mtm->nLiveNodes-=1;
1265-
BIT_SET(Mtm->disabledNodeMask,i);
1266-
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
1311+
MtmDisableNode(i+1);
12671312
}
12681313
}
12691314
mask=clique&Mtm->disabledNodeMask;/* new enabled nodes mask */
12701315
for (i=0;mask!=0;i++,mask >>=1) {
12711316
if (mask&1) {
1272-
Mtm->nLiveNodes+=1;
1273-
BIT_CLEAR(Mtm->disabledNodeMask,i);
1274-
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
1317+
MtmEnableNode(i+1);
12751318
}
12761319
}
12771320
MtmCheckQuorum();
@@ -1316,7 +1359,6 @@ void MtmOnNodeDisconnect(int nodeId)
13161359
/* Avoid false detection of node failure and prevent node status blinking */
13171360
return;
13181361
}
1319-
13201362
BIT_SET(Mtm->connectivityMask,nodeId-1);
13211363
BIT_SET(Mtm->reconnectMask,nodeId-1);
13221364
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
@@ -1327,9 +1369,7 @@ void MtmOnNodeDisconnect(int nodeId)
13271369
if (!MtmRefreshClusterStatus(false)) {
13281370
MtmLock(LW_EXCLUSIVE);
13291371
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
1330-
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
1331-
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
1332-
Mtm->nLiveNodes-=1;
1372+
MtmDisableNode(nodeId);
13331373
MtmCheckQuorum();
13341374
/* Interrupt voting for active transaction and abort them */
13351375
for (ts=Mtm->transListHead;ts!=NULL;ts=ts->next) {
@@ -1503,6 +1543,7 @@ static void MtmInitialize()
15031543
Mtm->nodes[i].lastStatusChangeTime=MtmGetSystemTime();
15041544
Mtm->nodes[i].con=MtmConnections[i];
15051545
Mtm->nodes[i].flushPos=0;
1546+
Mtm->nodes[i].lastHeartbeat=0;
15061547
}
15071548
PGSemaphoreCreate(&Mtm->votingSemaphore);
15081549
PGSemaphoreReset(&Mtm->votingSemaphore);
@@ -1627,6 +1668,36 @@ _PG_init(void)
16271668
if (!process_shared_preload_libraries_in_progress)
16281669
return;
16291670

1671+
DefineCustomIntVariable(
1672+
"multimaster.heartbeat_send_timeout",
1673+
"Timeout in milliseconds of sending heartbeat messages",
1674+
"Period of broadcasting heartbeat messages by abiter to all nodes",
1675+
&MtmHeartbeatSendTimeout,
1676+
1000,
1677+
1,
1678+
INT_MAX,
1679+
PGC_BACKEND,
1680+
0,
1681+
NULL,
1682+
NULL,
1683+
NULL
1684+
);
1685+
1686+
DefineCustomIntVariable(
1687+
"multimaster.heartbeat_recv_timeout",
1688+
"Timeout in milliseconds of receiving heartbeat messages",
1689+
"If no heartbeat message is received from node within this period, it assumed to be dead",
1690+
&MtmHeartbeatRecvTimeout,
1691+
2000,
1692+
1,
1693+
INT_MAX,
1694+
PGC_BACKEND,
1695+
0,
1696+
NULL,
1697+
NULL,
1698+
NULL
1699+
);
1700+
16301701
DefineCustomIntVariable(
16311702
"multimaster.gc_period",
16321703
"Number of distributed transactions after which garbage collection is started",
@@ -2056,9 +2127,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
20562127
{
20572128
elog(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nLiveNodes);
20582129
}
2059-
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
2060-
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
2061-
Mtm->nLiveNodes-=1;
2130+
MtmDisableNode(nodeId);
20622131
MtmCheckQuorum();
20632132
if (!MtmIsBroadcast())
20642133
{
@@ -2110,17 +2179,13 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
21102179
if (MtmIsRecoverySession) {
21112180
MTM_LOG1("%d: Node %d start recovery of node %d",MyProcPid,MtmNodeId,MtmReplicationNodeId);
21122181
if (!BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
2113-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime=MtmGetSystemTime();
2114-
BIT_SET(Mtm->disabledNodeMask,MtmReplicationNodeId-1);
2115-
Mtm->nLiveNodes-=1;
2182+
MtmDisableNode(MtmReplicationNodeId);
21162183
MtmCheckQuorum();
21172184
}
21182185
}elseif (BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
21192186
if (recoveryCompleted) {
21202187
MTM_LOG1("Node %d consider that recovery of node %d is completed: start normal replication",MtmNodeId,MtmReplicationNodeId);
2121-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime=MtmGetSystemTime();
2122-
BIT_CLEAR(Mtm->disabledNodeMask,MtmReplicationNodeId-1);
2123-
Mtm->nLiveNodes+=1;
2188+
MtmEnableNode(MtmReplicationNodeId);
21242189
MtmCheckQuorum();
21252190
}else {
21262191
elog(ERROR,"Disabled node %d tries to reconnect without recovery",MtmReplicationNodeId);

‎multimaster.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ typedef enum
9292
MSG_PREPARE,
9393
MSG_PREPARED,
9494
MSG_ABORTED,
95-
MSG_STATUS
95+
MSG_STATUS,
96+
MSG_HEARTBEAT
9697
}MtmMessageCode;
9798

9899
typedefenum
@@ -127,6 +128,7 @@ typedef struct
127128
timestamp_tlastStatusChangeTime;
128129
timestamp_treceiverStartTime;
129130
timestamp_tsenderStartTime;
131+
timestamp_tlastHeartbeat;
130132
intsenderPid;
131133
intreceiverPid;
132134
XLogRecPtrflushPos;
@@ -218,6 +220,8 @@ extern int MtmReconnectAttempts;
218220
externintMtmKeepaliveTimeout;
219221
externintMtmNodeDisableDelay;
220222
externintMtmTransSpillThreshold;
223+
externintMtmHeartbeatSendTimeout;
224+
externintMtmHeartbeatRecvTimeout;
221225
externboolMtmUseDtm;
222226
externHTAB*MtmXid2State;
223227

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp