Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0f23a54

Browse files
knizhnikkelvich
authored andcommitted
recovery in progress
1 parentcb8d490 commit0f23a54

File tree

5 files changed

+97
-41
lines changed

5 files changed

+97
-41
lines changed

‎multimaster.c

Lines changed: 77 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
695695
ts->votingCompleted= false;
696696
ts->cmd=MSG_INVALID;
697697
ts->nSubxids=xactGetCommittedChildren(&subxids);
698+
Mtm->nActiveTransactions+=1;
698699

699700
x->isPrepared= true;
700701
x->csn=ts->csn;
@@ -794,6 +795,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
794795
ts->status=TRANSACTION_STATUS_ABORTED;
795796
}
796797
MtmAdjustSubtransactions(ts);
798+
Assert(Mtm->nActiveTransactions!=0);
799+
Mtm->nActiveTransactions-=1;
797800
}
798801
if (!commit&&x->isReplicated&&TransactionIdIsValid(x->gtid.xid)) {
799802
/*
@@ -835,6 +838,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
835838
}
836839
}
837840

841+
voidMtmRecoveryCompleted(void)
842+
{
843+
elog(WARNING,"Recevoery of node %d is completed",MtmNodeId);
844+
Mtm->recoverySlot=0;
845+
MtmSwitchClusterMode(MTM_ONLINE);
846+
}
847+
838848
voidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tglobalSnapshot)
839849
{
840850
MtmLock(LW_EXCLUSIVE);
@@ -846,8 +856,7 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
846856
Assert(Mtm->status==MTM_RECOVERY);
847857
}elseif (Mtm->status==MTM_RECOVERY) {
848858
/* When recovery is completed we get normal transaction ID and switch to normal mode */
849-
Mtm->recoverySlot=0;
850-
MtmSwitchClusterMode(MTM_ONLINE);
859+
MtmRecoveryCompleted();
851860
}
852861
MtmTx.gtid=*gtid;
853862
MtmTx.xid=GetCurrentTransactionId();
@@ -972,35 +981,52 @@ static int64 MtmGetSlotLag(int nodeId)
972981
*/
973982
boolMtmIsRecoveredNode(intnodeId)
974983
{
975-
returnBIT_CHECK(Mtm->disabledNodeMask,nodeId-1));
984+
returnBIT_CHECK(Mtm->disabledNodeMask,nodeId-1);
976985
}
977986

978987

979-
voidMtmRecoveryPorgress(XLogRecPtrlsn)
988+
boolMtmRecoveryCaughtUp(intnodeId,XLogRecPtrslotLSN)
980989
{
981-
982-
Assert(MyWalSnd!=NULL);/* This function is called by WAL-sender, so it should not be NULL */
983-
if (!BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)
984-
&&MyWalSnd->sentPtr+MtmMinRecoveryLag>GetXLogInsertRecPtr())
990+
boolcaughtUp= false;
991+
if (MtmIsRecoveredNode(nodeId)) {
992+
XLogRecPtrwalLSN=GetXLogInsertRecPtr();
993+
MtmLock(LW_EXCLUSIVE);
994+
if (slotLSN==walLSN) {
995+
if (BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)) {
996+
elog(WARNING,"Node %d is caught-up",nodeId);
997+
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
998+
BIT_CLEAR(Mtm->walSenderLockerMask,MyWalSnd-WalSndCtl->walsnds);
999+
BIT_CLEAR(Mtm->nodeLockerMask,nodeId-1);
1000+
Mtm->nLockers-=1;
1001+
}else {
1002+
elog(WARNING,"Node %d is caugth-up without locking cluster",nodeId);
1003+
/* We are lucky: caugth-up without locking cluster! */
1004+
Mtm->nNodes+=1;
1005+
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
1006+
}
1007+
caughtUp= true;
1008+
}elseif (!BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)
1009+
&&slotLSN+MtmMinRecoveryLag>walLSN)
9851010
{
9861011
/*
9871012
* Wal sender almost catched up.
9881013
* Lock cluster preventing new transaction to start until wal is completely replayed.
9891014
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
9901015
* Is there some better way to establish mapping between nodes ad WAL-seconder?
9911016
*/
992-
elog(WARNING,"Node %d iscatching up",nodeId);
993-
MtmLock(LW_EXCLUSIVE);
1017+
elog(WARNING,"Node %d isalmost caught-up: lock cluster",nodeId);
1018+
Assert(MyWalSnd!=NULL);/* This function is called by WAL-sender, so it should not be NULL */
9941019
BIT_SET(Mtm->nodeLockerMask,nodeId-1);
9951020
BIT_SET(Mtm->walSenderLockerMask,MyWalSnd-WalSndCtl->walsnds);
9961021
Mtm->nLockers+=1;
997-
MtmUnlock();
9981022
}else {
999-
MTM_INFO("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n",nodeId,MyWalSnd->sentPtr,GetXLogInsertRecPtr(),Mtm->nLockers);
1023+
MTM_INFO("Continue recovery of node %d, slot position %lx, WAL position %lx,WAL sender position %lx,lockers %d, active transactions %d\n",nodeId,slotLSN,walLSN,MyWalSnd->sentPtr,Mtm->nLockers,Mtm->nActiveTransactions);
10001024
}
1001-
return true;
1025+
MtmUnlock();
1026+
}else {
1027+
MTM_INFO("Node %d is not in recovery mode\n",nodeId);
10021028
}
1003-
returnfalse;
1029+
returncaughtUp;
10041030
}
10051031

10061032
voidMtmSwitchClusterMode(MtmNodeStatusmode)
@@ -1019,22 +1045,24 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
10191045
staticvoid
10201046
MtmCheckClusterLock()
10211047
{
1048+
timestamp_tdelay=MIN_WAIT_TIMEOUT;
10221049
while (true)
10231050
{
10241051
nodemask_tmask=Mtm->walSenderLockerMask;
10251052
if (mask!=0) {
1026-
XLogRecPtrcurrLogPos=GetXLogInsertRecPtr();
1027-
inti;
1028-
timestamp_tdelay=MIN_WAIT_TIMEOUT;
1029-
for (i=0;mask!=0;i++,mask >>=1) {
1030-
if (mask&1) {
1031-
if (WalSndCtl->walsnds[i].sentPtr!=currLogPos) {
1032-
/* recovery is in progress */
1033-
break;
1034-
}else {
1035-
/* recovered replica catched up with master */
1036-
elog(WARNING,"WAL-sender %d complete recovery",i);
1037-
BIT_CLEAR(Mtm->walSenderLockerMask,i);
1053+
if (Mtm->nActiveTransactions==0) {
1054+
XLogRecPtrcurrLogPos=GetXLogInsertRecPtr();
1055+
inti;
1056+
for (i=0;mask!=0;i++,mask >>=1) {
1057+
if (mask&1) {
1058+
if (WalSndCtl->walsnds[i].sentPtr!=currLogPos) {
1059+
/* recovery is in progress */
1060+
break;
1061+
}else {
1062+
/* recovered replica catched up with master */
1063+
elog(WARNING,"WAL-sender %d complete recovery",i);
1064+
BIT_CLEAR(Mtm->walSenderLockerMask,i);
1065+
}
10381066
}
10391067
}
10401068
}
@@ -1294,6 +1322,7 @@ static void MtmInitialize()
12941322
Mtm->walSenderLockerMask=0;
12951323
Mtm->nodeLockerMask=0;
12961324
Mtm->nLockers=0;
1325+
Mtm->nActiveTransactions=0;
12971326
Mtm->votingTransactions=NULL;
12981327
Mtm->transListHead=NULL;
12991328
Mtm->transListTail=&Mtm->transListHead;
@@ -1734,12 +1763,31 @@ void MtmDropNode(int nodeId, bool dropSlot)
17341763
staticvoid
17351764
MtmReplicationStartupHook(structPGLogicalStartupHookArgs*args)
17361765
{
1766+
ListCell*param;
1767+
boolisRecoverySession= false;
1768+
foreach(param,args->in_params)
1769+
{
1770+
DefElem*elem=lfirst(param);
1771+
if (strcmp("mtm_replication_mode",elem->defname)==0) {
1772+
isRecoverySession=elem->arg!=NULL&&strVal(elem->arg)!=NULL&&strcmp(strVal(elem->arg),"recovery")==0;
1773+
break;
1774+
}
1775+
}
17371776
MtmLock(LW_EXCLUSIVE);
1738-
if (BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
1739-
elog(WARNING,"Recovery of node %d is completed: start normal replication",MtmReplicationNodeId);
1777+
if (isRecoverySession) {
1778+
elog(WARNING,"Node %d start recovery of node %d",MtmNodeId,MtmReplicationNodeId);
1779+
if (!BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
1780+
BIT_SET(Mtm->disabledNodeMask,MtmReplicationNodeId-1);
1781+
Mtm->nNodes-=1;
1782+
MtmCheckQuorum();
1783+
}
1784+
}elseif (BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
1785+
elog(WARNING,"Node %d consider that recovery of node %d is completed: start normal replication",MtmNodeId,MtmReplicationNodeId);
17401786
BIT_CLEAR(Mtm->disabledNodeMask,MtmReplicationNodeId-1);
17411787
Mtm->nNodes+=1;
17421788
MtmCheckQuorum();
1789+
}else {
1790+
elog(NOTICE,"Node %d start logical replication to node %d in normal mode",MtmNodeId,MtmReplicationNodeId);
17431791
}
17441792
MtmUnlock();
17451793
}
@@ -1757,7 +1805,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
17571805
boolres=Mtm->status!=MTM_RECOVERY
17581806
&& (args->origin_id==InvalidRepOriginId
17591807
||MtmIsRecoveredNode(MtmReplicationNodeId));
1760-
MTM_TRACE("%d: MtmReplicationTxnFilterHook->%d\n",MyProcPid,res);
1808+
MTM_INFO("%d: MtmReplicationTxnFilterHook->%d\n",MyProcPid,res);
17611809
returnres;
17621810
}
17631811

‎multimaster.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,18 +130,19 @@ typedef struct
130130
nodemask_tpglogicalNodeMask;/* bitmask of started pglogic receivers */
131131
nodemask_twalSenderLockerMask;/* Mask of WAL-senders IDs locking the cluster */
132132
nodemask_tnodeLockerMask;/* Mask of node IDs which WAL-senders are locking the cluster */
133-
intnNodes;/* number of active nodes */
134-
intnReceivers;/* number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
135-
intnLockers;/* number of lockers */
136-
longtimeShift;/* local time correction */
137-
csn_tcsn;/* last obtained CSN: used to provide unique acending CSNs based on system time */
133+
intnNodes;/* Number of active nodes */
134+
intnReceivers;/* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
135+
intnLockers;/* Number of lockers */
136+
intnActiveTransactions;/* Nunmber of active 2PC transactions */
137+
longtimeShift;/* Local time correction */
138+
csn_tcsn;/* Last obtained CSN: used to provide unique acending CSNs based on system time */
138139
MtmTransState*votingTransactions;/* L1-list of replicated transactions sendings notifications to coordinator.
139140
This list is used to pass information to mtm-sender BGW */
140141
MtmTransState*transListHead;/* L1 list of all finished transactions present in xid2state hash.
141142
It is cleanup by MtmGetOldestXmin */
142143
MtmTransState**transListTail;/* Tail of L1 list of all finished transactionds, used to append new elements.
143144
This list is expected to be in CSN ascending order, by strict order may be violated */
144-
uint64transCount;/* Counter of transactions perfromed by this node */
145+
uint64transCount;/* Counter of transactions perfromed by this node */
145146
time_tnodeTransDelay[MAX_NODES];/* Time of waiting transaction acknowledgment from node */
146147
BgwPoolpool;/* Pool of background workers for applying logical replication patches */
147148
MtmNodeInfonodes[1];/* [MtmNodes]: per-node data */
@@ -200,5 +201,6 @@ extern void MtmSwitchClusterMode(MtmNodeStatus mode);
200201
externvoidMtmUpdateNodeConnectionInfo(MtmConnectionInfo*conn,charconst*connStr);
201202
externvoidMtmSetupReplicationHooks(structPGLogicalHooks*hooks);
202203
externvoidMtmCheckQuorum(void);
203-
204+
externboolMtmRecoveryCaughtUp(intnodeId,XLogRecPtrslotLSN);
205+
externvoidMtmRecoveryCompleted(void);
204206
#endif

‎pglogical_apply.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,10 +497,12 @@ process_remote_commit(StringInfo in)
497497
uint8flags;
498498
csn_tcsn;
499499
constchar*gid=NULL;
500+
boolcaughtUp;
500501

501502
/* read flags */
502503
flags=pq_getmsgbyte(in);
503504
MtmReplicationNode=pq_getmsgbyte(in);
505+
caughtUp=pq_getmsgbyte(in)!=0;
504506

505507
/* read fields */
506508
replorigin_session_origin_lsn=pq_getmsgint64(in);/* commit_lsn */
@@ -568,7 +570,10 @@ process_remote_commit(StringInfo in)
568570
default:
569571
Assert(false);
570572
}
571-
MtmEndSession();
573+
MtmEndSession(true);
574+
if (caughtUp) {
575+
MtmRecoveryCompleted();
576+
}
572577
}
573578

574579
staticvoid

‎pglogical_proto.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,12 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
150150

151151
pq_sendbyte(out,'C');/* sending COMMIT */
152152

153-
MTM_TRACE("PGLOGICAL_SEND commit: event=%d, gid=%s\n",flags,txn->gid);
153+
MTM_INFO("PGLOGICAL_SEND commit: event=%d, gid=%s, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx\n",flags,txn->gid,commit_lsn,txn->end_lsn,GetXLogInsertRecPtr());
154154

155155
/* send the flags field */
156156
pq_sendbyte(out,flags);
157157
pq_sendbyte(out,MtmNodeId);
158+
pq_sendbyte(out,MtmRecoveryCaughtUp(MtmReplicationNodeId,txn->end_lsn));
158159

159160
/* send fixed fields */
160161
pq_sendint64(out,commit_lsn);
@@ -167,7 +168,6 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
167168
if (flags!=PGLOGICAL_COMMIT) {
168169
pq_sendstring(out,txn->gid);
169170
}
170-
171171
}
172172

173173
/*

‎pglogical_receiver.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,12 +292,13 @@ pglogical_receiver_main(Datum main_arg)
292292
}
293293
CommitTransactionCommand();
294294

295-
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %u/%u (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d')",
295+
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %u/%u (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"mtm_replication_mode\" '%s')",
296296
args->receiver_slot,
297297
(uint32) (originStartPos >>32),
298298
(uint32)originStartPos,
299299
MULTIMASTER_MAX_PROTO_VERSION,
300-
MULTIMASTER_MIN_PROTO_VERSION
300+
MULTIMASTER_MIN_PROTO_VERSION,
301+
mode==SLOT_OPEN_EXISTED ?"recovery" :"normal"
301302
);
302303
res=PQexec(conn,query->data);
303304
if (PQresultStatus(res)!=PGRES_COPY_BOTH)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp