Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7a17422

Browse files
knizhnikkelvich
authored andcommitted
Avaid false detection of lost heartbeats in watchdog
1 parentd2c7edb commit7a17422

File tree

5 files changed

+111
-35
lines changed

5 files changed

+111
-35
lines changed

‎arbiter.c

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,18 @@ static int MtmWaitSocket(int sd, bool forWrite, time_t timeoutMsec)
219219
structtimevaltv;
220220
fd_setset;
221221
intrc;
222-
tv.tv_sec=timeoutMsec/1000;
223-
tv.tv_usec=timeoutMsec%1000*1000;
222+
timestamp_tdeadline=MtmGetSystemTime()+MSEC_TO_USEC(timeoutMsec);
224223
FD_ZERO(&set);
225224
FD_SET(sd,&set);
226225
do {
226+
timestamp_tnow;
227227
MtmCheckHeartbeat();
228+
now=MtmGetSystemTime();
229+
if (now>deadline) {
230+
return0;
231+
}
232+
tv.tv_sec= (deadline-now)/USECS_PER_SEC;
233+
tv.tv_usec= (deadline-now)%USECS_PER_SEC;
228234
}while ((rc=select(sd+1,forWrite ?NULL :&set,forWrite ?&set :NULL,NULL,&tv))<0&&errno==EINTR);
229235
returnrc;
230236
}
@@ -371,7 +377,7 @@ static void MtmSendHeartbeat()
371377
elog(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
372378
}else {
373379
if (last_heartbeat_to_node[i]+MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2<now) {
374-
MTM_LOG1("Lasthearbeat to node %d was sent %ld microseconds ago",i+1,now-last_heartbeat_to_node[i]);
380+
MTM_LOG1("Lastheartbeat to node %d was sent %ld microseconds ago",i+1,now-last_heartbeat_to_node[i]);
375381
}
376382
last_heartbeat_to_node[i]=now;
377383
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
@@ -865,6 +871,7 @@ static void MtmReceiver(Datum arg)
865871
MtmDisconnect(i);
866872
}
867873
}
874+
now=MtmGetSystemTime();
868875
for (j=0;j<n;j++) {
869876
if (events[j].events&EPOLLIN)
870877
#else
@@ -882,6 +889,7 @@ static void MtmReceiver(Datum arg)
882889
if (n<0) {
883890
elog(ERROR,"Arbiter failed to select sockets: %d",errno);
884891
}
892+
now=MtmGetSystemTime();
885893
for (i=0;i<nNodes;i++) {
886894
if (sockets[i] >=0&&FD_ISSET(sockets[i],&events))
887895
#endif
@@ -1126,7 +1134,7 @@ static void MtmReceiver(Datum arg)
11261134
}
11271135
}
11281136
if (Mtm->status==MTM_ONLINE) {
1129-
now=MtmGetSystemTime();
1137+
/* "now" is time of performing select, so that delays in processing should not cause false detection */
11301138
if (now>lastHeartbeatCheck+MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
11311139
if (!MtmWatchdog(now)) {
11321140
for (i=0;i<nNodes;i++) {

‎multimaster.c

Lines changed: 92 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,6 @@ static int MtmMaxRecoveryLag;
243243
staticintMtmGcPeriod;
244244
staticboolMtmIgnoreTablesWithoutPk;
245245
staticintMtmLockCount;
246-
staticintMtmSenderStarted;
247246

248247
staticExecutorStart_hook_typePreviousExecutorStartHook;
249248
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
@@ -261,7 +260,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
261260
/*
262261
* -------------------------------------------
263262
* Synchronize access to MTM structures.
264-
* Using LWLock seems to be more efficient (at our benchmarks)
263+
* Using LWLock seems to be more efficient (at our benchmarks)
264+
* Multimaster uses trash of 2N+1 lwlocks, where N is number of nodes.
265+
* locks[0] is used to synchronize access to multimaster state,
266+
* locks[1..N] are used to provide exclusive access to replication session for each node
267+
* locks[N+1..2*N] are used to synchronize access to distributed lock graph at each node
265268
* -------------------------------------------
266269
*/
267270
voidMtmLock(LWLockModemode)
@@ -316,6 +319,9 @@ timestamp_t MtmGetSystemTime(void)
316319
return (timestamp_t)tv.tv_sec*USECS_PER_SEC+tv.tv_usec;
317320
}
318321

322+
/*
323+
* Get adjusted system time: taking in account time shift
324+
*/
319325
timestamp_tMtmGetCurrentTime(void)
320326
{
321327
returnMtmGetSystemTime()+Mtm->timeShift;
@@ -610,13 +616,16 @@ MtmAdjustOldestXid(TransactionId xid)
610616
}
611617
returnxid;
612618
}
619+
613620
/*
614621
* -------------------------------------------
615-
* Transaction list manipulation
622+
* Transaction list manipulation.
623+
* All distributed transactions are linked in L1-list ordered by transaction start time.
624+
* This list is inspected by MtmAdjustOldestXid and transactions which are not used in any snapshot at any node
625+
* are removed from the list and from the hash.
616626
* -------------------------------------------
617627
*/
618628

619-
620629
staticvoidMtmTransactionListAppend(MtmTransState*ts)
621630
{
622631
if (!ts->isEnqueued) {
@@ -1293,6 +1302,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12931302
}
12941303
}
12951304

1305+
/*
1306+
* Send arbiter's message
1307+
*/
12961308
voidMtmSendMessage(MtmArbiterMessage*msg)
12971309
{
12981310
SpinLockAcquire(&Mtm->queueSpinlock);
@@ -1315,6 +1327,11 @@ void MtmSendMessage(MtmArbiterMessage* msg)
13151327
SpinLockRelease(&Mtm->queueSpinlock);
13161328
}
13171329

1330+
/*
1331+
* Send arbiter's 2PC message. Right now only responses to coordinates are
1332+
* sent through arbiter. Brodcasts from coordinator to noes are done
1333+
* using logical decoding.
1334+
*/
13181335
voidMtmSend2PCMessage(MtmTransState*ts,MtmMessageCodecmd)
13191336
{
13201337
MtmArbiterMessagemsg;
@@ -1347,6 +1364,11 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
13471364
}
13481365
}
13491366

1367+
/*
1368+
* Broadcase poll state message to all nodes.
1369+
* This function is used to gather information about state of prepared transaction
1370+
* at node startup or after crash of some node.
1371+
*/
13501372
staticvoidMtmBroadcastPollMessage(MtmTransState*ts)
13511373
{
13521374
inti;
@@ -1370,7 +1392,9 @@ static void MtmBroadcastPollMessage(MtmTransState* ts)
13701392
}
13711393

13721394
/*
1373-
* Restore state of recovered prepared transaction in memory
1395+
* Restore state of recovered prepared transaction in memory.
1396+
* This function is called at system startup to make it possible to
1397+
* handle this prepared transactions in normal way.
13741398
*/
13751399
staticvoidMtmLoadPreparedTransactions(void)
13761400
{
@@ -1426,6 +1450,10 @@ static void MtmStartRecovery()
14261450
MtmUnlock();
14271451
}
14281452

1453+
1454+
/*
1455+
* Prepare context for applying transaction at replica
1456+
*/
14291457
voidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tglobalSnapshot)
14301458
{
14311459
MtmTx.gtid=*gtid;
@@ -1479,6 +1507,13 @@ XidStatus MtmGetCurrentTransactionStatus(void)
14791507
returnMtmTx.status;
14801508
}
14811509

1510+
/*
1511+
* Perform atomic exchange of global transaction status.
1512+
* The problem is that because of concurrent applying transactions at replica by multiple
1513+
* threads we can proceed ABORT request before PREPARE - when transaction is not yet
1514+
* applied at this node and there is MtmTransState associated with this transactions.
1515+
* We remember information about status of this transaction in MtmTransMap.
1516+
*/
14821517
XidStatusMtmExchangeGlobalTransactionStatus(charconst*gid,XidStatusnew_status)
14831518
{
14841519
MtmTransMap*tm;
@@ -1526,6 +1561,9 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
15261561
returncsn;
15271562
}
15281563

1564+
/*
1565+
* Wakeup coordinator's backend when voting is completed
1566+
*/
15291567
voidMtmWakeUpBackend(MtmTransState*ts)
15301568
{
15311569
if (!ts->votingCompleted) {
@@ -1536,6 +1574,10 @@ void MtmWakeUpBackend(MtmTransState* ts)
15361574
}
15371575
}
15381576

1577+
1578+
/*
1579+
* Abort the transaction if it is not yet aborted
1580+
*/
15391581
voidMtmAbortTransaction(MtmTransState*ts)
15401582
{
15411583
Assert(MtmLockCount!=0);/* should be invoked with exclsuive lock */
@@ -1561,6 +1603,11 @@ void MtmAbortTransaction(MtmTransState* ts)
15611603
* -------------------------------------------
15621604
*/
15631605

1606+
/*
1607+
* Handle critical errors while applying transaction at replica.
1608+
* Such errors should cause shutdown of this cluster node to allow other nodes to continue serving client requests.
1609+
* Other error will be just reported and ignored
1610+
*/
15641611
voidMtmHandleApplyError(void)
15651612
{
15661613
ErrorData*edata=CopyErrorData();
@@ -1570,13 +1617,15 @@ void MtmHandleApplyError(void)
15701617
caseERRCODE_IO_ERROR:
15711618
caseERRCODE_DATA_CORRUPTED:
15721619
caseERRCODE_INDEX_CORRUPTED:
1620+
/* Should we really treate this errors as fatal?
15731621
case ERRCODE_SYSTEM_ERROR:
15741622
case ERRCODE_INTERNAL_ERROR:
15751623
case ERRCODE_OUT_OF_MEMORY:
1624+
*/
15761625
elog(WARNING,"Node is excluded from cluster because of non-recoverable error %d, %s, pid=%u",
15771626
edata->sqlerrcode,edata->message,getpid());
1578-
//MtmSwitchClusterMode(MTM_OUT_OF_SERVICE);
1579-
//kill(PostmasterPid, SIGQUIT);
1627+
MtmSwitchClusterMode(MTM_OUT_OF_SERVICE);
1628+
kill(PostmasterPid,SIGQUIT);
15801629
break;
15811630
}
15821631
FreeErrorData(edata);
@@ -1643,6 +1692,9 @@ static void MtmEnableNode(int nodeId)
16431692
elog(WARNING,"Enable node %d at xlog position %lx",nodeId,GetXLogInsertRecPtr());
16441693
}
16451694

1695+
/*
1696+
* Function call when recovery of node is completed
1697+
*/
16461698
voidMtmRecoveryCompleted(void)
16471699
{
16481700
inti;
@@ -1712,7 +1764,7 @@ static int64 MtmGetSlotLag(int nodeId)
17121764

17131765
/*
17141766
* This function is called by WAL sender when start sending new transaction.
1715-
* It returns true if specified node is in recovery mode. In this case we should send all transactions from WAL,
1767+
* It returns true if specified node is in recovery mode. In this case we should sendto itall transactions from WAL,
17161768
* not only coordinated by self node as in normal mode.
17171769
*/
17181770
boolMtmIsRecoveredNode(intnodeId)
@@ -1728,7 +1780,13 @@ bool MtmIsRecoveredNode(int nodeId)
17281780
}
17291781
}
17301782

1731-
1783+
/*
1784+
* Check if wal sender replayed all transactions from WAL log.
1785+
* It can never happen if there are many active transactions.
1786+
* In this case we wait until gap between sent and current position in the
1787+
* WAL becomes smaller than threshold value MtmMinRecoveryLag and
1788+
* after it prohibit start of new transactions until WAL is completely replayed.
1789+
*/
17321790
boolMtmRecoveryCaughtUp(intnodeId,XLogRecPtrslotLSN)
17331791
{
17341792
boolcaughtUp= false;
@@ -1822,7 +1880,7 @@ MtmCheckClusterLock()
18221880
MtmLock(LW_EXCLUSIVE);
18231881
continue;
18241882
}else {
1825-
/* All lockersare synchronized their logs */
1883+
/* All lockershave synchronized their logs */
18261884
/* Remove lock and mark them as recovered */
18271885
MTM_LOG1("Complete recovery of %d nodes (node mask %lx)",Mtm->nLockers, (long)Mtm->nodeLockerMask);
18281886
Assert(Mtm->walSenderLockerMask==0);
@@ -2186,7 +2244,8 @@ static void MtmInitialize()
21862244
Mtm->nAllNodes=MtmNodes;
21872245
Mtm->disabledNodeMask=0;
21882246
Mtm->connectivityMask=0;
2189-
Mtm->pglogicalNodeMask=0;
2247+
Mtm->pglogicalReceiverMask=0;
2248+
Mtm->pglogicalSenderMask=0;
21902249
Mtm->walSenderLockerMask=0;
21912250
Mtm->nodeLockerMask=0;
21922251
Mtm->reconnectMask=0;
@@ -2900,8 +2959,8 @@ _PG_fini(void)
29002959
voidMtmReceiverStarted(intnodeId)
29012960
{
29022961
MtmLock(LW_EXCLUSIVE);
2903-
if (!BIT_CHECK(Mtm->pglogicalNodeMask,nodeId-1)) {
2904-
BIT_SET(Mtm->pglogicalNodeMask,nodeId-1);
2962+
if (!BIT_CHECK(Mtm->pglogicalReceiverMask,nodeId-1)) {
2963+
BIT_SET(Mtm->pglogicalReceiverMask,nodeId-1);
29052964
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
29062965
MtmEnableNode(nodeId);
29072966
MtmCheckQuorum();
@@ -3014,7 +3073,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
30143073
Mtm->nReceivers=0;
30153074
Mtm->nSenders=0;
30163075
Mtm->recoveryCount+=1;
3017-
Mtm->pglogicalNodeMask=0;
3076+
Mtm->pglogicalReceiverMask=0;
3077+
Mtm->pglogicalSenderMask=0;
30183078
MtmUnlock();
30193079
returnREPLMODE_RECOVERY;
30203080
}
@@ -3155,19 +3215,21 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
31553215
MtmEnableNode(MtmReplicationNodeId);
31563216
MtmCheckQuorum();
31573217
}else {
3158-
/* Force arbiter to reestablish connection with thisnodem send heartbeat to inform this node that it was disabled and should perform recovery */
3218+
/* Force arbiter to reestablish connection with thisnode, send heartbeat to inform this node that it was disabled and should perform recovery */
31593219
BIT_SET(Mtm->reconnectMask,MtmReplicationNodeId-1);
31603220
MtmUnlock();
31613221
elog(ERROR,"Disabled node %d tries to reconnect without recovery",MtmReplicationNodeId);
31623222
}
31633223
}else {
31643224
MTM_LOG1("Node %d start logical replication to node %d in normal mode",MtmNodeId,MtmReplicationNodeId);
31653225
}
3166-
elog(LOG,"Start %d senders and %d receivers from %d cluster status %s",Mtm->nSenders+1,Mtm->nReceivers,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
3167-
MtmSenderStarted=1;
3168-
if (++Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->status==MTM_CONNECTED) {
3169-
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
3170-
MtmSwitchClusterMode(MTM_ONLINE);
3226+
if (!BIT_CHECK(Mtm->pglogicalSenderMask,MtmReplicationNodeId-1)) {
3227+
elog(LOG,"Start %d senders and %d receivers from %d cluster status %s",Mtm->nSenders+1,Mtm->nReceivers,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
3228+
BIT_SET(Mtm->pglogicalSenderMask,MtmReplicationNodeId-1);
3229+
if (++Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->status==MTM_CONNECTED) {
3230+
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
3231+
MtmSwitchClusterMode(MTM_ONLINE);
3232+
}
31713233
}
31723234
BIT_SET(Mtm->reconnectMask,MtmReplicationNodeId-1);/* arbiter should try to reestablish connection with this node */
31733235
MtmUnlock();
@@ -3227,14 +3289,15 @@ void MtmUpdateLsnMapping(int node_id, XLogRecPtr end_lsn)
32273289
staticvoid
32283290
MtmReplicationShutdownHook(structPGLogicalShutdownHookArgs*args)
32293291
{
3230-
if (MtmReplicationNodeId >=0) {
3231-
MtmLock(LW_EXCLUSIVE);
3232-
Mtm->nSenders-=MtmSenderStarted;
3233-
MtmUnlock();
3292+
MtmLock(LW_EXCLUSIVE);
3293+
if (MtmReplicationNodeId >=0&&BIT_CHECK(Mtm->pglogicalSenderMask,MtmReplicationNodeId-1)) {
3294+
BIT_CLEAR(Mtm->pglogicalSenderMask,MtmReplicationNodeId-1);
3295+
Mtm->nSenders-=1;
32343296
MTM_LOG1("Logical replication to node %d is stopped",MtmReplicationNodeId);
32353297
/* MtmOnNodeDisconnect(MtmReplicationNodeId); */
3236-
MtmReplicationNodeId=-1;/* defuseon_proc_exit hook */
3298+
MtmReplicationNodeId=-1;/* defuseMtmOnProcExit hook */
32373299
}
3300+
MtmUnlock();
32383301
}
32393302

32403303
/*
@@ -3949,6 +4012,10 @@ MtmGenerateGid(char* gid)
39494012
sprintf(gid,"MTM-%d-%d-%d",MtmNodeId,MyProcPid,++localCount);
39504013
}
39514014

4015+
/*
4016+
* Replace normal commit with two-phase commit.
4017+
* It is called either for commit of standalone command either for commit of transaction block.
4018+
*/
39524019
staticboolMtmTwoPhaseCommit(MtmCurrentTrans*x)
39534020
{
39544021
// if (MyXactAccessedTempRel)

‎multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,8 @@ typedef struct
263263
TransactionIdoldestXid;/* XID of oldest transaction visible by any active transaction (local or global) */
264264
nodemask_tdisabledNodeMask;/* bitmask of disabled nodes */
265265
nodemask_tconnectivityMask;/* bitmask of disconnected nodes */
266-
nodemask_tpglogicalNodeMask;/* bitmask of started pglogic receivers */
266+
nodemask_tpglogicalReceiverMask;/* bitmask of started pglogic receivers */
267+
nodemask_tpglogicalSenderMask;/* bitmask of started pglogic senders */
267268
nodemask_twalSenderLockerMask;/* Mask of WAL-senders IDs locking the cluster */
268269
nodemask_tnodeLockerMask;/* Mask of node IDs which WAL-senders are locking the cluster */
269270
nodemask_treconnectMask;/* Mask of nodes connection to which has to be reestablished by sender */

‎pglogical_proto.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
211211
return;
212212
}
213213
if (isRecovery) {
214-
MTM_LOG1("PGLOGICAL_SEND recover transaction: event=%d, gid=%s, xid=%d, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx",
214+
MTM_LOG2("PGLOGICAL_SEND recover transaction: event=%d, gid=%s, xid=%d, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx",
215215
flags,txn->gid,txn->xid,commit_lsn,txn->end_lsn,GetXLogInsertRecPtr());
216216
}
217217
if (flags==PGLOGICAL_ABORT_PREPARED) {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp