@@ -695,6 +695,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
695695ts -> votingCompleted = false;
696696ts -> cmd = MSG_INVALID ;
697697ts -> nSubxids = xactGetCommittedChildren (& subxids );
698+ Mtm -> nActiveTransactions += 1 ;
698699
699700x -> isPrepared = true;
700701x -> csn = ts -> csn ;
@@ -794,6 +795,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
794795ts -> status = TRANSACTION_STATUS_ABORTED ;
795796}
796797MtmAdjustSubtransactions (ts );
798+ Assert (Mtm -> nActiveTransactions != 0 );
799+ Mtm -> nActiveTransactions -= 1 ;
797800}
798801if (!commit && x -> isReplicated && TransactionIdIsValid (x -> gtid .xid )) {
799802/*
@@ -835,6 +838,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
835838}
836839}
837840
841+ void MtmRecoveryCompleted (void )
842+ {
843+ elog (WARNING ,"Recevoery of node %d is completed" ,MtmNodeId );
844+ Mtm -> recoverySlot = 0 ;
845+ MtmSwitchClusterMode (MTM_ONLINE );
846+ }
847+
838848void MtmJoinTransaction (GlobalTransactionId * gtid ,csn_t globalSnapshot )
839849{
840850MtmLock (LW_EXCLUSIVE );
@@ -846,8 +856,7 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
846856Assert (Mtm -> status == MTM_RECOVERY );
847857}else if (Mtm -> status == MTM_RECOVERY ) {
848858/* When recovery is completed we get normal transaction ID and switch to normal mode */
849- Mtm -> recoverySlot = 0 ;
850- MtmSwitchClusterMode (MTM_ONLINE );
859+ MtmRecoveryCompleted ();
851860}
852861MtmTx .gtid = * gtid ;
853862MtmTx .xid = GetCurrentTransactionId ();
@@ -972,35 +981,52 @@ static int64 MtmGetSlotLag(int nodeId)
972981 */
973982bool MtmIsRecoveredNode (int nodeId )
974983{
975- return BIT_CHECK (Mtm -> disabledNodeMask ,nodeId - 1 )) ;
984+ return BIT_CHECK (Mtm -> disabledNodeMask ,nodeId - 1 );
976985}
977986
978987
979- void MtmRecoveryPorgress ( XLogRecPtr lsn )
988+ bool MtmRecoveryCaughtUp ( int nodeId , XLogRecPtr slotLSN )
980989{
981-
982- Assert (MyWalSnd != NULL );/* This function is called by WAL-sender, so it should not be NULL */
983- if (!BIT_CHECK (Mtm -> nodeLockerMask ,nodeId - 1 )
984- && MyWalSnd -> sentPtr + MtmMinRecoveryLag > GetXLogInsertRecPtr ())
990+ bool caughtUp = false;
991+ if (MtmIsRecoveredNode (nodeId )) {
992+ XLogRecPtr walLSN = GetXLogInsertRecPtr ();
993+ MtmLock (LW_EXCLUSIVE );
994+ if (slotLSN == walLSN ) {
995+ if (BIT_CHECK (Mtm -> nodeLockerMask ,nodeId - 1 )) {
996+ elog (WARNING ,"Node %d is caught-up" ,nodeId );
997+ BIT_CLEAR (Mtm -> disabledNodeMask ,nodeId - 1 );
998+ BIT_CLEAR (Mtm -> walSenderLockerMask ,MyWalSnd - WalSndCtl -> walsnds );
999+ BIT_CLEAR (Mtm -> nodeLockerMask ,nodeId - 1 );
1000+ Mtm -> nLockers -= 1 ;
1001+ }else {
1002+ elog (WARNING ,"Node %d is caugth-up without locking cluster" ,nodeId );
1003+ /* We are lucky: caugth-up without locking cluster! */
1004+ Mtm -> nNodes += 1 ;
1005+ BIT_CLEAR (Mtm -> disabledNodeMask ,nodeId - 1 );
1006+ }
1007+ caughtUp = true;
1008+ }else if (!BIT_CHECK (Mtm -> nodeLockerMask ,nodeId - 1 )
1009+ && slotLSN + MtmMinRecoveryLag > walLSN )
9851010{
9861011/*
9871012 * Wal sender almost catched up.
9881013 * Lock cluster preventing new transaction to start until wal is completely replayed.
9891014 * We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
9901015 * Is there some better way to establish mapping between nodes ad WAL-seconder?
9911016 */
992- elog (WARNING ,"Node %d iscatching up " ,nodeId );
993- MtmLock ( LW_EXCLUSIVE );
1017+ elog (WARNING ,"Node %d isalmost caught-up: lock cluster " ,nodeId );
1018+ Assert ( MyWalSnd != NULL ); /* This function is called by WAL-sender, so it should not be NULL */
9941019BIT_SET (Mtm -> nodeLockerMask ,nodeId - 1 );
9951020BIT_SET (Mtm -> walSenderLockerMask ,MyWalSnd - WalSndCtl -> walsnds );
9961021Mtm -> nLockers += 1 ;
997- MtmUnlock ();
9981022}else {
999- MTM_INFO ("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n" ,nodeId ,MyWalSnd -> sentPtr ,GetXLogInsertRecPtr () ,Mtm -> nLockers );
1023+ MTM_INFO ("Continue recovery of node %d, slot position %lx, WAL position %lx,WAL sender position %lx, lockers %d, active transactions %d \n" ,nodeId ,slotLSN , walLSN , MyWalSnd -> sentPtr ,Mtm -> nLockers ,Mtm -> nActiveTransactions );
10001024}
1001- return true;
1025+ MtmUnlock ();
1026+ }else {
1027+ MTM_INFO ("Node %d is not in recovery mode\n" ,nodeId );
10021028}
1003- return false ;
1029+ return caughtUp ;
10041030}
10051031
10061032void MtmSwitchClusterMode (MtmNodeStatus mode )
@@ -1019,22 +1045,24 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
10191045static void
10201046MtmCheckClusterLock ()
10211047{
1048+ timestamp_t delay = MIN_WAIT_TIMEOUT ;
10221049while (true)
10231050{
10241051nodemask_t mask = Mtm -> walSenderLockerMask ;
10251052if (mask != 0 ) {
1026- XLogRecPtr currLogPos = GetXLogInsertRecPtr ();
1027- int i ;
1028- timestamp_t delay = MIN_WAIT_TIMEOUT ;
1029- for (i = 0 ;mask != 0 ;i ++ ,mask >>=1 ) {
1030- if (mask & 1 ) {
1031- if (WalSndCtl -> walsnds [i ].sentPtr != currLogPos ) {
1032- /* recovery is in progress */
1033- break ;
1034- }else {
1035- /* recovered replica catched up with master */
1036- elog (WARNING ,"WAL-sender %d complete recovery" ,i );
1037- BIT_CLEAR (Mtm -> walSenderLockerMask ,i );
1053+ if (Mtm -> nActiveTransactions == 0 ) {
1054+ XLogRecPtr currLogPos = GetXLogInsertRecPtr ();
1055+ int i ;
1056+ for (i = 0 ;mask != 0 ;i ++ ,mask >>=1 ) {
1057+ if (mask & 1 ) {
1058+ if (WalSndCtl -> walsnds [i ].sentPtr != currLogPos ) {
1059+ /* recovery is in progress */
1060+ break ;
1061+ }else {
1062+ /* recovered replica catched up with master */
1063+ elog (WARNING ,"WAL-sender %d complete recovery" ,i );
1064+ BIT_CLEAR (Mtm -> walSenderLockerMask ,i );
1065+ }
10381066}
10391067}
10401068}
@@ -1294,6 +1322,7 @@ static void MtmInitialize()
12941322Mtm -> walSenderLockerMask = 0 ;
12951323Mtm -> nodeLockerMask = 0 ;
12961324Mtm -> nLockers = 0 ;
1325+ Mtm -> nActiveTransactions = 0 ;
12971326Mtm -> votingTransactions = NULL ;
12981327Mtm -> transListHead = NULL ;
12991328Mtm -> transListTail = & Mtm -> transListHead ;
@@ -1734,12 +1763,31 @@ void MtmDropNode(int nodeId, bool dropSlot)
17341763static void
17351764MtmReplicationStartupHook (struct PGLogicalStartupHookArgs * args )
17361765{
1766+ ListCell * param ;
1767+ bool isRecoverySession = false;
1768+ foreach (param ,args -> in_params )
1769+ {
1770+ DefElem * elem = lfirst (param );
1771+ if (strcmp ("mtm_replication_mode" ,elem -> defname )== 0 ) {
1772+ isRecoverySession = elem -> arg != NULL && strVal (elem -> arg )!= NULL && strcmp (strVal (elem -> arg ),"recovery" )== 0 ;
1773+ break ;
1774+ }
1775+ }
17371776MtmLock (LW_EXCLUSIVE );
1738- if (BIT_CHECK (Mtm -> disabledNodeMask ,MtmReplicationNodeId - 1 )) {
1739- elog (WARNING ,"Recovery of node %d is completed: start normal replication" ,MtmReplicationNodeId );
1777+ if (isRecoverySession ) {
1778+ elog (WARNING ,"Node %d start recovery of node %d" ,MtmNodeId ,MtmReplicationNodeId );
1779+ if (!BIT_CHECK (Mtm -> disabledNodeMask ,MtmReplicationNodeId - 1 )) {
1780+ BIT_SET (Mtm -> disabledNodeMask ,MtmReplicationNodeId - 1 );
1781+ Mtm -> nNodes -= 1 ;
1782+ MtmCheckQuorum ();
1783+ }
1784+ }else if (BIT_CHECK (Mtm -> disabledNodeMask ,MtmReplicationNodeId - 1 )) {
1785+ elog (WARNING ,"Node %d consider that recovery of node %d is completed: start normal replication" ,MtmNodeId ,MtmReplicationNodeId );
17401786BIT_CLEAR (Mtm -> disabledNodeMask ,MtmReplicationNodeId - 1 );
17411787Mtm -> nNodes += 1 ;
17421788MtmCheckQuorum ();
1789+ }else {
1790+ elog (NOTICE ,"Node %d start logical replication to node %d in normal mode" ,MtmNodeId ,MtmReplicationNodeId );
17431791}
17441792MtmUnlock ();
17451793}
@@ -1757,7 +1805,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
17571805bool res = Mtm -> status != MTM_RECOVERY
17581806&& (args -> origin_id == InvalidRepOriginId
17591807|| MtmIsRecoveredNode (MtmReplicationNodeId ));
1760- MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" ,MyProcPid ,res );
1808+ MTM_INFO ("%d: MtmReplicationTxnFilterHook->%d\n" ,MyProcPid ,res );
17611809return res ;
17621810}
17631811