Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7755cd2

Browse files
knizhnikkelvich
authored andcommitted
Use timer for cluster status refresh
1 parente5f7e91 commit7755cd2

File tree

3 files changed

+123
-21
lines changed

3 files changed

+123
-21
lines changed

‎arbiter.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -817,7 +817,7 @@ static void MtmMonitor(Datum arg)
817817
if (rc&WL_POSTMASTER_DEATH) {
818818
break;
819819
}
820-
MtmRefreshClusterStatus(true);
820+
MtmRefreshClusterStatus();
821821
}
822822
}
823823

@@ -1160,7 +1160,7 @@ static void MtmReceiver(Datum arg)
11601160
}
11611161
if (Mtm->disabledNodeMask!=0) {
11621162
/* If timeout is expired and there are disabled nodes, then recheck cluster's state */
1163-
MtmRefreshClusterStatus(false);
1163+
MtmRefreshClusterStatus();
11641164
}
11651165
}else {
11661166
if (now>lastHeartbeatCheck+MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {

‎multimaster.c

Lines changed: 120 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,9 @@ static bool MtmIsRecoverySession;
173173
staticMtmConnectionInfo*MtmConnections;
174174

175175
staticMtmCurrentTransMtmTx;
176-
176+
staticTimeoutIdMtmRefreshClusterStatusTimer;
177177
staticdlist_headMtmLsnMapping=DLIST_STATIC_INIT(MtmLsnMapping);
178+
staticboolMtmRefreshClusterStatusTimerCocked;
178179

179180
staticTransactionManagerMtmTM= {
180181
PgTransactionIdGetStatus,
@@ -1656,7 +1657,7 @@ void MtmHandleApplyError(void)
16561657
* The reason is that we want to avoid extra polling to obtain maximum CSN from all nodes to assign it to committed transaction.
16571658
* Called only from MtmDisableNode in critical section.
16581659
*/
1659-
staticvoidMtmPollStatusOfPreparedTransactions(intdisabledNodeId)
1660+
staticvoidMtmPollStatusOfPreparedTransactionsForDisabledNode(intdisabledNodeId)
16601661
{
16611662
MtmTransState*ts;
16621663
for (ts=Mtm->transListHead;ts!=NULL;ts=ts->next) {
@@ -1680,6 +1681,34 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
16801681
}
16811682
}
16821683

1684+
/*
1685+
* Poll status of all active prepared transaction.
1686+
* This function is called before start of recovery to prevent blocking of recovery process by some
1687+
* prepared transaction which is not recovered
1688+
*/
1689+
staticvoidMtmPollStatusOfPreparedTransactions()
1690+
{
1691+
MtmTransState*ts;
1692+
for (ts=Mtm->transListHead;ts!=NULL;ts=ts->next) {
1693+
if (TransactionIdIsValid(ts->gtid.xid)
1694+
&&ts->votingCompleted/* If voting is not yet completed, then there is some backend coordinating this transaction */
1695+
&& (ts->status==TRANSACTION_STATUS_UNKNOWN||ts->status==TRANSACTION_STATUS_IN_PROGRESS))
1696+
{
1697+
Assert(ts->gid[0]);
1698+
MTM_LOG1("Poll state of transaction %d (%s) from node %d",ts->xid,ts->gid,ts->gtid.node);
1699+
MtmBroadcastPollMessage(ts);
1700+
}else {
1701+
MTM_LOG2("Skip prepared transaction %d (%s) with status %s gtid.node=%d gtid.xid=%d votedMask=%lx",
1702+
ts->xid,ts->gid,MtmTxnStatusMnem[ts->status],ts->gtid.node,ts->gtid.xid,ts->votedMask);
1703+
}
1704+
}
1705+
}
1706+
1707+
/*
1708+
* Node is disabled if it is not part of clique built using connectivity masks of all nodes.
1709+
* There is no warranty that all noeds will make the same decision about clique, btu as far as we want to avoid
1710+
* some global coordinator (which will be SPOF), we have to rely on Bron–Kerbosch algorithm locating maximum clique in graph
1711+
*/
16831712
staticvoidMtmDisableNode(intnodeId)
16841713
{
16851714
timestamp_tnow=MtmGetSystemTime();
@@ -1694,10 +1723,14 @@ static void MtmDisableNode(int nodeId)
16941723
}
16951724
if (Mtm->nLiveNodes >=Mtm->nAllNodes/2+1) {
16961725
/* Make decision about prepared transaction status only in quorum */
1697-
MtmPollStatusOfPreparedTransactions(nodeId);
1726+
MtmPollStatusOfPreparedTransactionsForDisabledNode(nodeId);
16981727
}
16991728
}
1700-
1729+
1730+
/*
1731+
* Node is anabled when it's recovery is completed.
1732+
* This why node is mostly marked as recovered when logical sender/receiver to this node is (re)started.
1733+
*/
17011734
staticvoidMtmEnableNode(intnodeId)
17021735
{
17031736
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
@@ -1763,6 +1796,9 @@ MtmCheckSlots()
17631796
}
17641797
}
17651798

1799+
/*
1800+
* Get lag between replication slot position (dsata proceeded by WAL sender) and current position in WAL
1801+
*/
17661802
staticint64MtmGetSlotLag(intnodeId)
17671803
{
17681804
inti;
@@ -1849,6 +1885,9 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
18491885
returncaughtUp;
18501886
}
18511887

1888+
/*
1889+
* This function is called inside critical section
1890+
*/
18521891
voidMtmSwitchClusterMode(MtmNodeStatusmode)
18531892
{
18541893
Mtm->status=mode;
@@ -1918,7 +1957,7 @@ MtmCheckClusterLock()
19181957
* Build internode connectivity mask. 1 - means that node is disconnected.
19191958
*/
19201959
staticbool
1921-
MtmBuildConnectivityMatrix(nodemask_t*matrix,boolnowait)
1960+
MtmBuildConnectivityMatrix(nodemask_t*matrix)
19221961
{
19231962
inti,j,n=Mtm->nAllNodes;
19241963
boolchanged= false;
@@ -1969,24 +2008,25 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
19692008

19702009
/**
19712010
* Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
1972-
* This function returns false if current node is excluded from cluster, true otherwise
19732011
*/
1974-
boolMtmRefreshClusterStatus(boolnowait)
2012+
voidMtmRefreshClusterStatus()
19752013
{
19762014
nodemask_tmask,clique,disabled;
19772015
nodemask_tmatrix[MAX_NODES];
19782016
intclique_size;
19792017
inti;
19802018

1981-
if (!MtmBuildConnectivityMatrix(matrix,nowait)) {
1982-
return false;
2019+
MtmRefreshClusterStatusTimerCocked= false;
2020+
2021+
if (!MtmBuildConnectivityMatrix(matrix)) {
2022+
return;
19832023
}
19842024

19852025
clique=MtmFindMaxClique(matrix,Mtm->nAllNodes,&clique_size);
19862026

19872027
if (clique== (~Mtm->disabledNodeMask& (((nodemask_t)1 <<Mtm->nAllNodes)-1)) )
19882028
/* Nothing is changed */
1989-
return false;
2029+
return;
19902030

19912031
if (clique_size >=Mtm->nAllNodes/2+1) {/* have quorum */
19922032
fprintf(stderr,"Old mask: ");
@@ -2043,9 +2083,11 @@ bool MtmRefreshClusterStatus(bool nowait)
20432083
MTM_LOG1("Clique %lx has no quorum", (long)clique);
20442084
MtmSwitchClusterMode(MTM_IN_MINORITY);
20452085
}
2046-
return true;
20472086
}
20482087

2088+
/*
2089+
* Check if there is quorum: current node see more than half of all nodes
2090+
*/
20492091
voidMtmCheckQuorum(void)
20502092
{
20512093
Mtm->nConfigChanges+=1;
@@ -2062,6 +2104,13 @@ void MtmCheckQuorum(void)
20622104
}
20632105
}
20642106

2107+
/*
2108+
* This function is called in case of non-recoverable connection failure with this node.
2109+
* Non-recoverable means that connections can not be reestablish using specified number of attempts.
2110+
* It sets bit in connectivity mask and register delayed refresh of cluster status which build connectivity matrix
2111+
* and determine clique of connected nodes. Timeout here is needed to allow all nodes to exchanges their connectivity masks (them
2112+
* are sent together with any arbiter message, including heartbeats.
2113+
*/
20652114
voidMtmOnNodeDisconnect(intnodeId)
20662115
{
20672116
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
@@ -2078,12 +2127,16 @@ void MtmOnNodeDisconnect(int nodeId)
20782127
BIT_SET(Mtm->connectivityMask,nodeId-1);
20792128
BIT_SET(Mtm->reconnectMask,nodeId-1);
20802129
elog(LOG,"Disconnect node %d connectivity mask %llx",nodeId, (long long)Mtm->connectivityMask);
2130+
if (!MtmRefreshClusterStatusTimerCocked) {
2131+
MtmRefreshClusterStatusTimerCocked= true;
2132+
enable_timeout_after(MtmRefreshClusterStatusTimer,MtmHeartbeatSendTimeout);
2133+
}
20812134
MtmUnlock();
2082-
2083-
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
2084-
MtmRefreshClusterStatus(false);
20852135
}
20862136

2137+
/*
2138+
* This method is called when connection with node is reestablished
2139+
*/
20872140
voidMtmOnNodeConnect(intnodeId)
20882141
{
20892142
MtmLock(LW_EXCLUSIVE);
@@ -2093,6 +2146,9 @@ void MtmOnNodeConnect(int nodeId)
20932146
MtmUnlock();
20942147
}
20952148

2149+
/*
2150+
* Set reconnect mask to force reconnection attempt to the node
2151+
*/
20962152
voidMtmReconnectNode(intnodeId)
20972153
{
20982154
MtmLock(LW_EXCLUSIVE);
@@ -2108,7 +2164,11 @@ void MtmReconnectNode(int nodeId)
21082164
* -------------------------------------------
21092165
*/
21102166

2111-
2167+
/*
2168+
* Initialize Xid2State hash table to obtain status of transaction by its local XID.
2169+
* Size of this hash table should be limited by MtmAdjustOldestXid function which performs cleanup
2170+
* of transaction list and from the list and from the hash table transactions which XIDs are not used in any snapshot at any node
2171+
*/
21122172
staticHTAB*
21132173
MtmCreateXidMap(void)
21142174
{
@@ -2127,6 +2187,11 @@ MtmCreateXidMap(void)
21272187
returnhtab;
21282188
}
21292189

2190+
/*
2191+
* Initialize Gid2State hash table to obtain status of transaction by GID.
2192+
* Size of this hash table should be limited by MtmAdjustOldestXid function which performs cleanup
2193+
* of transaction list and from the list and from the hash table transactions which XIDs are not used in any snapshot at any node
2194+
*/
21302195
staticHTAB*
21312196
MtmCreateGidMap(void)
21322197
{
@@ -2144,6 +2209,9 @@ MtmCreateGidMap(void)
21442209
returnhtab;
21452210
}
21462211

2212+
/*
2213+
* Initialize hash table used to mark local (not distributed) tables
2214+
*/
21472215
staticHTAB*
21482216
MtmCreateLocalTableMap(void)
21492217
{
@@ -2208,6 +2276,13 @@ static void MtmLoadLocalTables(void)
22082276
}
22092277
}
22102278

2279+
/*
2280+
* Multimaster control file is used to prevent erroneous inclusion of node in the cluster.
2281+
* It contains cluster name (any user defined identifier) and node id.
2282+
* In case of creating new cluster node using pg_basebackup this file is copied together will
2283+
* all other PostgreSQL files and so new node will know ID of the cluster node from which it
2284+
* is cloned. It is necessary to complete synchronization of new node with the rest of the cluster.
2285+
*/
22112286
staticvoidMtmCheckControlFile(void)
22122287
{
22132288
charcontrolFilePath[MAXPGPATH];
@@ -2242,7 +2317,10 @@ static void MtmCheckControlFile(void)
22422317
}
22432318
}
22442319

2245-
2320+
/*
2321+
* Perform initialization of multimaster state.
2322+
* This function is called from shared memory startup hook (after completion of initialization of shared memory)
2323+
*/
22462324
staticvoidMtmInitialize()
22472325
{
22482326
boolfound;
@@ -2311,6 +2389,7 @@ static void MtmInitialize()
23112389
RegisterXactCallback(MtmXactCallback,NULL);
23122390
MtmTx.snapshot=INVALID_CSN;
23132391
MtmTx.xid=InvalidTransactionId;
2392+
MtmRefreshClusterStatusTimer=RegisterTimeout(USER_TIMEOUT,MtmRefreshClusterStatus);
23142393
}
23152394
MtmXid2State=MtmCreateXidMap();
23162395
MtmGid2State=MtmCreateGidMap();
@@ -2331,6 +2410,10 @@ MtmShmemStartup(void)
23312410
MtmInitialize();
23322411
}
23332412

2413+
/*
2414+
* Parse node connection string.
2415+
* This function is called at cluster startup and while adding new cluster node
2416+
*/
23342417
voidMtmUpdateNodeConnectionInfo(MtmConnectionInfo*conn,charconst*connStr)
23352418
{
23362419
charconst*host;
@@ -2371,6 +2454,10 @@ void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr)
23712454
elog(WARNING,"Using arbiter port: %d",conn->arbiterPort);
23722455
}
23732456

2457+
/*
2458+
* Parse "multimaster.conn_strings" configuration parameter and
2459+
* set connection string for each node using MtmUpdateNodeConnectionInfo
2460+
*/
23742461
staticvoidMtmSplitConnStrs(void)
23752462
{
23762463
inti;
@@ -2494,6 +2581,9 @@ static void MtmSplitConnStrs(void)
24942581
MemoryContextSwitchTo(old_context);
24952582
}
24962583

2584+
/*
2585+
* Check correctness of multimaster configuration
2586+
*/
24972587
staticboolConfigIsSane(void)
24982588
{
24992589
boolok= true;
@@ -2991,13 +3081,22 @@ void MtmReceiverStarted(int nodeId)
29913081
MtmUnlock();
29923082
}
29933083

3084+
/*
3085+
* Recovery slot is node ID from which new or crash node is performing recovery.
3086+
* This function is called in case of logical receiver error to make it possible to try to perform
3087+
* recovery from some other node
3088+
*/
29943089
voidMtmReleaseRecoverySlot(intnodeId)
29953090
{
29963091
if (Mtm->recoverySlot==nodeId) {
29973092
Mtm->recoverySlot=0;
29983093
}
29993094
}
30003095

3096+
/*
3097+
* Rollback transaction originated from the specified node.
3098+
* This function is called either for commit logical message with AbortPrepared flag either for abort prepared logical message.
3099+
*/
30013100
voidMtmRollbackPreparedTransaction(intnodeId,charconst*gid)
30023101
{
30033102
XidStatusstatus=MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED);
@@ -3021,8 +3120,10 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
30213120
}
30223121

30233122
/*
3024-
* This function is call with MTM mutex locked.
3025-
* It shoudl unlock mutex before calling FinishPreparedTransaction to avoid deadlocks.
3123+
* Wrapper arround FinishPreparedTransaction function.
3124+
* This function shoudl proper context for invocation of this function.
3125+
* This function is called with MTM mutex locked.
3126+
* It should unlock mutex before calling FinishPreparedTransaction to avoid deadlocks.
30263127
* ts object is pinned to prevent deallocation while lock is released.
30273128
*/
30283129
voidMtmFinishPreparedTransaction(MtmTransState*ts,boolcommit)
@@ -3095,6 +3196,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
30953196
Mtm->recoveryCount+=1;
30963197
Mtm->pglogicalReceiverMask=0;
30973198
Mtm->pglogicalSenderMask=0;
3199+
MtmPollStatusOfPreparedTransactions();
30983200
MtmUnlock();
30993201
returnREPLMODE_RECOVERY;
31003202
}

‎multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ extern TransactionId MtmGetCurrentTransactionId(void);
374374
externXidStatusMtmGetCurrentTransactionStatus(void);
375375
externXidStatusMtmExchangeGlobalTransactionStatus(charconst*gid,XidStatusstatus);
376376
externboolMtmIsRecoveredNode(intnodeId);
377-
externboolMtmRefreshClusterStatus(boolnowait);
377+
externvoidMtmRefreshClusterStatus(void);
378378
externvoidMtmSwitchClusterMode(MtmNodeStatusmode);
379379
externvoidMtmUpdateNodeConnectionInfo(MtmConnectionInfo*conn,charconst*connStr);
380380
externvoidMtmSetupReplicationHooks(structPGLogicalHooks*hooks);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp