Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc2989af

Browse files
committed
recovery in progress
1 parente6faa29 commitc2989af

File tree

5 files changed

+96
-40
lines changed

5 files changed

+96
-40
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 77 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
696696
ts->votingCompleted= false;
697697
ts->cmd=MSG_INVALID;
698698
ts->nSubxids=xactGetCommittedChildren(&subxids);
699+
Mtm->nActiveTransactions+=1;
699700

700701
x->isPrepared= true;
701702
x->csn=ts->csn;
@@ -795,6 +796,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
795796
ts->status=TRANSACTION_STATUS_ABORTED;
796797
}
797798
MtmAdjustSubtransactions(ts);
799+
Assert(Mtm->nActiveTransactions!=0);
800+
Mtm->nActiveTransactions-=1;
798801
}
799802
if (!commit&&x->isReplicated&&TransactionIdIsValid(x->gtid.xid)) {
800803
/*
@@ -836,6 +839,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
836839
}
837840
}
838841

842+
voidMtmRecoveryCompleted(void)
843+
{
844+
elog(WARNING,"Recevoery of node %d is completed",MtmNodeId);
845+
Mtm->recoverySlot=0;
846+
MtmSwitchClusterMode(MTM_ONLINE);
847+
}
848+
839849
voidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tglobalSnapshot)
840850
{
841851
MtmLock(LW_EXCLUSIVE);
@@ -847,8 +857,7 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
847857
Assert(Mtm->status==MTM_RECOVERY);
848858
}elseif (Mtm->status==MTM_RECOVERY) {
849859
/* When recovery is completed we get normal transaction ID and switch to normal mode */
850-
Mtm->recoverySlot=0;
851-
MtmSwitchClusterMode(MTM_ONLINE);
860+
MtmRecoveryCompleted();
852861
}
853862
MtmTx.gtid=*gtid;
854863
MtmTx.xid=GetCurrentTransactionId();
@@ -973,35 +982,52 @@ static int64 MtmGetSlotLag(int nodeId)
973982
*/
974983
boolMtmIsRecoveredNode(intnodeId)
975984
{
976-
returnBIT_CHECK(Mtm->disabledNodeMask,nodeId-1));
985+
returnBIT_CHECK(Mtm->disabledNodeMask,nodeId-1);
977986
}
978987

979988

980-
voidMtmRecoveryPorgress(XLogRecPtrlsn)
989+
boolMtmRecoveryCaughtUp(intnodeId,XLogRecPtrslotLSN)
981990
{
982-
983-
Assert(MyWalSnd!=NULL);/* This function is called by WAL-sender, so it should not be NULL */
984-
if (!BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)
985-
&&MyWalSnd->sentPtr+MtmMinRecoveryLag>GetXLogInsertRecPtr())
991+
boolcaughtUp= false;
992+
if (MtmIsRecoveredNode(nodeId)) {
993+
XLogRecPtrwalLSN=GetXLogInsertRecPtr();
994+
MtmLock(LW_EXCLUSIVE);
995+
if (slotLSN==walLSN) {
996+
if (BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)) {
997+
elog(WARNING,"Node %d is caught-up",nodeId);
998+
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
999+
BIT_CLEAR(Mtm->walSenderLockerMask,MyWalSnd-WalSndCtl->walsnds);
1000+
BIT_CLEAR(Mtm->nodeLockerMask,nodeId-1);
1001+
Mtm->nLockers-=1;
1002+
}else {
1003+
elog(WARNING,"Node %d is caugth-up without locking cluster",nodeId);
1004+
/* We are lucky: caugth-up without locking cluster! */
1005+
Mtm->nNodes+=1;
1006+
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
1007+
}
1008+
caughtUp= true;
1009+
}elseif (!BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)
1010+
&&slotLSN+MtmMinRecoveryLag>walLSN)
9861011
{
9871012
/*
9881013
* Wal sender almost catched up.
9891014
* Lock cluster preventing new transaction to start until wal is completely replayed.
9901015
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
9911016
* Is there some better way to establish mapping between nodes ad WAL-seconder?
9921017
*/
993-
elog(WARNING,"Node %d iscatching up",nodeId);
994-
MtmLock(LW_EXCLUSIVE);
1018+
elog(WARNING,"Node %d isalmost caught-up: lock cluster",nodeId);
1019+
Assert(MyWalSnd!=NULL);/* This function is called by WAL-sender, so it should not be NULL */
9951020
BIT_SET(Mtm->nodeLockerMask,nodeId-1);
9961021
BIT_SET(Mtm->walSenderLockerMask,MyWalSnd-WalSndCtl->walsnds);
9971022
Mtm->nLockers+=1;
998-
MtmUnlock();
9991023
}else {
1000-
MTM_INFO("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n",nodeId,MyWalSnd->sentPtr,GetXLogInsertRecPtr(),Mtm->nLockers);
1024+
MTM_INFO("Continue recovery of node %d, slot position %lx, WAL position %lx,WAL sender position %lx,lockers %d, active transactions %d\n",nodeId,slotLSN,walLSN,MyWalSnd->sentPtr,Mtm->nLockers,Mtm->nActiveTransactions);
10011025
}
1002-
return true;
1026+
MtmUnlock();
1027+
}else {
1028+
MTM_INFO("Node %d is not in recovery mode\n",nodeId);
10031029
}
1004-
returnfalse;
1030+
returncaughtUp;
10051031
}
10061032

10071033
voidMtmSwitchClusterMode(MtmNodeStatusmode)
@@ -1020,22 +1046,24 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
10201046
staticvoid
10211047
MtmCheckClusterLock()
10221048
{
1049+
timestamp_tdelay=MIN_WAIT_TIMEOUT;
10231050
while (true)
10241051
{
10251052
nodemask_tmask=Mtm->walSenderLockerMask;
10261053
if (mask!=0) {
1027-
XLogRecPtrcurrLogPos=GetXLogInsertRecPtr();
1028-
inti;
1029-
timestamp_tdelay=MIN_WAIT_TIMEOUT;
1030-
for (i=0;mask!=0;i++,mask >>=1) {
1031-
if (mask&1) {
1032-
if (WalSndCtl->walsnds[i].sentPtr!=currLogPos) {
1033-
/* recovery is in progress */
1034-
break;
1035-
}else {
1036-
/* recovered replica catched up with master */
1037-
elog(WARNING,"WAL-sender %d complete recovery",i);
1038-
BIT_CLEAR(Mtm->walSenderLockerMask,i);
1054+
if (Mtm->nActiveTransactions==0) {
1055+
XLogRecPtrcurrLogPos=GetXLogInsertRecPtr();
1056+
inti;
1057+
for (i=0;mask!=0;i++,mask >>=1) {
1058+
if (mask&1) {
1059+
if (WalSndCtl->walsnds[i].sentPtr!=currLogPos) {
1060+
/* recovery is in progress */
1061+
break;
1062+
}else {
1063+
/* recovered replica catched up with master */
1064+
elog(WARNING,"WAL-sender %d complete recovery",i);
1065+
BIT_CLEAR(Mtm->walSenderLockerMask,i);
1066+
}
10391067
}
10401068
}
10411069
}
@@ -1265,6 +1293,7 @@ static void MtmInitialize()
12651293
Mtm->walSenderLockerMask=0;
12661294
Mtm->nodeLockerMask=0;
12671295
Mtm->nLockers=0;
1296+
Mtm->nActiveTransactions=0;
12681297
Mtm->votingTransactions=NULL;
12691298
Mtm->transListHead=NULL;
12701299
Mtm->transListTail=&Mtm->transListHead;
@@ -1705,12 +1734,31 @@ void MtmDropNode(int nodeId, bool dropSlot)
17051734
staticvoid
17061735
MtmReplicationStartupHook(structPGLogicalStartupHookArgs*args)
17071736
{
1737+
ListCell*param;
1738+
boolisRecoverySession= false;
1739+
foreach(param,args->in_params)
1740+
{
1741+
DefElem*elem=lfirst(param);
1742+
if (strcmp("mtm_replication_mode",elem->defname)==0) {
1743+
isRecoverySession=elem->arg!=NULL&&strVal(elem->arg)!=NULL&&strcmp(strVal(elem->arg),"recovery")==0;
1744+
break;
1745+
}
1746+
}
17081747
MtmLock(LW_EXCLUSIVE);
1709-
if (BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
1710-
elog(WARNING,"Recovery of node %d is completed: start normal replication",MtmReplicationNodeId);
1748+
if (isRecoverySession) {
1749+
elog(WARNING,"Node %d start recovery of node %d",MtmNodeId,MtmReplicationNodeId);
1750+
if (!BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
1751+
BIT_SET(Mtm->disabledNodeMask,MtmReplicationNodeId-1);
1752+
Mtm->nNodes-=1;
1753+
MtmCheckQuorum();
1754+
}
1755+
}elseif (BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
1756+
elog(WARNING,"Node %d consider that recovery of node %d is completed: start normal replication",MtmNodeId,MtmReplicationNodeId);
17111757
BIT_CLEAR(Mtm->disabledNodeMask,MtmReplicationNodeId-1);
17121758
Mtm->nNodes+=1;
17131759
MtmCheckQuorum();
1760+
}else {
1761+
elog(NOTICE,"Node %d start logical replication to node %d in normal mode",MtmNodeId,MtmReplicationNodeId);
17141762
}
17151763
MtmUnlock();
17161764
}
@@ -1728,7 +1776,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
17281776
boolres=Mtm->status!=MTM_RECOVERY
17291777
&& (args->origin_id==InvalidRepOriginId
17301778
||MtmIsRecoveredNode(MtmReplicationNodeId));
1731-
MTM_TRACE("%d: MtmReplicationTxnFilterHook->%d\n",MyProcPid,res);
1779+
MTM_INFO("%d: MtmReplicationTxnFilterHook->%d\n",MyProcPid,res);
17321780
returnres;
17331781
}
17341782

‎contrib/mmts/multimaster.h‎

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,18 +130,19 @@ typedef struct
130130
nodemask_tpglogicalNodeMask;/* bitmask of started pglogic receivers */
131131
nodemask_twalSenderLockerMask;/* Mask of WAL-senders IDs locking the cluster */
132132
nodemask_tnodeLockerMask;/* Mask of node IDs which WAL-senders are locking the cluster */
133-
intnNodes;/* number of active nodes */
134-
intnReceivers;/* number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
135-
intnLockers;/* number of lockers */
136-
longtimeShift;/* local time correction */
137-
csn_tcsn;/* last obtained CSN: used to provide unique acending CSNs based on system time */
133+
intnNodes;/* Number of active nodes */
134+
intnReceivers;/* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
135+
intnLockers;/* Number of lockers */
136+
intnActiveTransactions;/* Nunmber of active 2PC transactions */
137+
longtimeShift;/* Local time correction */
138+
csn_tcsn;/* Last obtained CSN: used to provide unique acending CSNs based on system time */
138139
MtmTransState*votingTransactions;/* L1-list of replicated transactions sendings notifications to coordinator.
139140
This list is used to pass information to mtm-sender BGW */
140141
MtmTransState*transListHead;/* L1 list of all finished transactions present in xid2state hash.
141142
It is cleanup by MtmGetOldestXmin */
142143
MtmTransState**transListTail;/* Tail of L1 list of all finished transactionds, used to append new elements.
143144
This list is expected to be in CSN ascending order, by strict order may be violated */
144-
uint64transCount;/* Counter of transactions perfromed by this node */
145+
uint64transCount;/* Counter of transactions perfromed by this node */
145146
time_tnodeTransDelay[MAX_NODES];/* Time of waiting transaction acknowledgment from node */
146147
BgwPoolpool;/* Pool of background workers for applying logical replication patches */
147148
MtmNodeInfonodes[1];/* [MtmNodes]: per-node data */
@@ -200,5 +201,6 @@ extern void MtmSwitchClusterMode(MtmNodeStatus mode);
200201
externvoidMtmUpdateNodeConnectionInfo(MtmConnectionInfo*conn,charconst*connStr);
201202
externvoidMtmSetupReplicationHooks(structPGLogicalHooks*hooks);
202203
externvoidMtmCheckQuorum(void);
203-
204+
externboolMtmRecoveryCaughtUp(intnodeId,XLogRecPtrslotLSN);
205+
externvoidMtmRecoveryCompleted(void);
204206
#endif

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,10 +499,12 @@ process_remote_commit(StringInfo in)
499499
uint8flags;
500500
csn_tcsn;
501501
constchar*gid=NULL;
502+
boolcaughtUp;
502503

503504
/* read flags */
504505
flags=pq_getmsgbyte(in);
505506
MtmReplicationNode=pq_getmsgbyte(in);
507+
caughtUp=pq_getmsgbyte(in)!=0;
506508

507509
/* read fields */
508510
replorigin_session_origin_lsn=pq_getmsgint64(in);/* commit_lsn */
@@ -571,6 +573,9 @@ process_remote_commit(StringInfo in)
571573
Assert(false);
572574
}
573575
MtmEndSession(true);
576+
if (caughtUp) {
577+
MtmRecoveryCompleted();
578+
}
574579
}
575580

576581
staticvoid

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,12 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
150150

151151
pq_sendbyte(out,'C');/* sending COMMIT */
152152

153-
MTM_TRACE("PGLOGICAL_SEND commit: event=%d, gid=%s\n",flags,txn->gid);
153+
MTM_INFO("PGLOGICAL_SEND commit: event=%d, gid=%s, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx\n",flags,txn->gid,commit_lsn,txn->end_lsn,GetXLogInsertRecPtr());
154154

155155
/* send the flags field */
156156
pq_sendbyte(out,flags);
157157
pq_sendbyte(out,MtmNodeId);
158+
pq_sendbyte(out,MtmRecoveryCaughtUp(MtmReplicationNodeId,txn->end_lsn));
158159

159160
/* send fixed fields */
160161
pq_sendint64(out,commit_lsn);
@@ -167,7 +168,6 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
167168
if (flags!=PGLOGICAL_COMMIT) {
168169
pq_sendstring(out,txn->gid);
169170
}
170-
171171
}
172172

173173
/*

‎contrib/mmts/pglogical_receiver.c‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,12 +292,13 @@ pglogical_receiver_main(Datum main_arg)
292292
}
293293
CommitTransactionCommand();
294294

295-
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %u/%u (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d')",
295+
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %u/%u (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"mtm_replication_mode\" '%s')",
296296
args->receiver_slot,
297297
(uint32) (originStartPos >>32),
298298
(uint32)originStartPos,
299299
MULTIMASTER_MAX_PROTO_VERSION,
300-
MULTIMASTER_MIN_PROTO_VERSION
300+
MULTIMASTER_MIN_PROTO_VERSION,
301+
mode==SLOT_OPEN_EXISTED ?"recovery" :"normal"
301302
);
302303
res=PQexec(conn,query->data);
303304
if (PQresultStatus(res)!=PGRES_COPY_BOTH)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp