@@ -154,6 +154,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
154154ProcessUtilityContext context ,ParamListInfo params ,
155155DestReceiver * dest ,char * completionTag );
156156
157+ /*
158+ * Using LWLock seems to be more efficient (at our benchmarks)
159+ */
157160void MtmLock (LWLockMode mode )
158161{
159162#ifdef USE_SPINLOCK
@@ -197,6 +200,9 @@ void MtmSleep(timestamp_t interval)
197200 }
198201}
199202
203+ /**
204+ * Return ascending unique timestamp which is used as CSN
205+ */
200206csn_t MtmAssignCSN ()
201207{
202208csn_t csn = MtmGetCurrentTime ();
@@ -208,6 +214,9 @@ csn_t MtmAssignCSN()
208214return csn ;
209215}
210216
217+ /**
218+ * "Adjust" system clock if we receive message from future
219+ */
211220csn_t MtmSyncClock (csn_t global_csn )
212221{
213222csn_t local_csn ;
@@ -471,14 +480,23 @@ MtmXactCallback(XactEvent event, void *arg)
471480}
472481}
473482
483+ /*
484+ * Check if this is "normal" user trnsaction which shoudl be distributed to other nodes
485+ */
486+ static bool
487+ MtmIsUserTransaction ()
488+ {
489+ return IsNormalProcessingMode ()&& dtm -> status == MTM_ONLINE && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess ();
490+ }
491+
474492static void
475493MtmBeginTransaction (MtmCurrentTrans * x )
476494{
477495if (x -> snapshot == INVALID_CSN ) {
478496MtmLock (LW_EXCLUSIVE );
479497x -> xid = GetCurrentTransactionIdIfAny ();
480498x -> isReplicated = false;
481- x -> isDistributed = IsNormalProcessingMode () && dtm -> status == MTM_ONLINE && MtmDoReplication && ! am_walsender && ! IsBackgroundWorker && ! IsAutoVacuumWorkerProcess ();
499+ x -> isDistributed = MtmIsUserTransaction ();
482500x -> containsDML = false;
483501x -> snapshot = MtmAssignCSN ();
484502x -> gtid .xid = InvalidTransactionId ;
@@ -489,7 +507,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
489507}
490508
491509
492- /* This function is called at transaction start with multimaster ock set */
510+ /*
511+ * If there are recovering nodes which are catching-up WAL, check the status and prevent new transaction from commit to give
512+ * WAL-sender a chance to catch-up WAL, completely synchronize replica and switch it to normal mode.
513+ * This function is called at transaction start with multimaster lock set
514+ */
493515static void
494516MtmCheckClusterLock ()
495517{
@@ -507,6 +529,7 @@ MtmCheckClusterLock()
507529break ;
508530}else {
509531/* recovered replica catched up with master */
532+ elog (WARNING ,"WAL-sender %d complete receovery" ,i );
510533dtm -> walSenderLockerMask &= ~((nodemask_t )1 <<i );
511534}
512535}
@@ -524,6 +547,7 @@ MtmCheckClusterLock()
524547}else {
525548/* All lockers are synchronized their logs */
526549/* Remove lock and mark them as receovered */
550+ elog (WARNING ,"Complete recovery of %d nodes (node mask %lx)" ,dtm -> nLockers ,dtm -> nodeLockerMask );
527551Assert (dtm -> walSenderLockerMask == 0 );
528552Assert ((dtm -> nodeLockerMask & dtm -> disabledNodeMask )== dtm -> nodeLockerMask );
529553dtm -> disabledNodeMask &= ~dtm -> nodeLockerMask ;
@@ -552,6 +576,10 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
552576x -> xid = GetCurrentTransactionId ();
553577
554578MtmLock (LW_EXCLUSIVE );
579+
580+ /*
581+ * Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to cache-up
582+ */
555583MtmCheckClusterLock ();
556584
557585ts = hash_search (xid2state ,& x -> xid ,HASH_ENTER ,NULL );
@@ -580,6 +608,10 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
580608MTM_TRACE ("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n" ,getpid (),x -> xid ,ts -> csn );
581609}
582610
611+ /**
612+ * Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
613+ * WAL overflow
614+ */
583615static void MtmCheckSlots ()
584616{
585617if (MtmMaxRecoveryLag != 0 && dtm -> disabledNodeMask != 0 )
@@ -636,17 +668,23 @@ void MtmSendNotificationMessage(MtmTransState* ts)
636668}
637669
638670/*
639- * This function is called by WAL sender when start sending new transaction
671+ * This function is called by WAL sender when start sending new transaction.
672+ * It returns true if specified node is in recovery mode. In this case we should send all transactions from WAL,
673+ * not only coordinated by self node as in normal mode.
640674 */
641675bool MtmIsRecoveredNode (int nodeId )
642676{
643677if (BIT_CHECK (dtm -> disabledNodeMask ,nodeId - 1 )) {
644- Assert (MyWalSnd != NULL );
678+ Assert (MyWalSnd != NULL );/* This function is called by WAL-sender, so it should not be NULL */
645679if (!BIT_CHECK (dtm -> nodeLockerMask ,nodeId - 1 )
646680&& MyWalSnd -> sentPtr + MtmMinRecoveryLag > GetXLogInsertRecPtr ())
647681{
648- /* Wal sender almost catched up */
649- /* Lock cluster preventing new transaction to start until wal is completely replayed */
682+ /*
683+ * Wal sender almost catched up.
684+ * Lock cluster preventing new transaction to start until wal is completely replayed.
685+ * We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
686+ * Is there some better way to establish mapping between nodes ad WAL-seconder?
687+ */
650688MtmLock (LW_EXCLUSIVE );
651689dtm -> nodeLockerMask |= (nodemask_t )1 << (nodeId - 1 );
652690dtm -> walSenderLockerMask |= (nodemask_t )1 << (MyWalSnd - WalSndCtl -> walsnds );
@@ -793,8 +831,8 @@ _PG_init(void)
793831DefineCustomIntVariable (
794832"multimaster.max_recovery_lag" ,
795833"Maximal lag of replication slot of failed node after which this slot is dropped to avoid transaction log overflow" ,
796- "Dropping slog makes it not possible to recover node using logical replication mechanism, it willeb ncessary to completely copy content of some other nodes "
797- "usimg basebackup or similar tool" ,
834+ "Dropping slog makes it not possible to recover node using logical replication mechanism, it willbe ncessary to completely copy content of some other nodes "
835+ "usimg basebackup or similar tool. Zero value of parameter disable droipping slot. " ,
798836& MtmMaxRecoveryLag ,
799837100000000 ,
8008380 ,
@@ -990,6 +1028,7 @@ _PG_fini(void)
9901028static void MtmSwitchFromRecoveryToNormalMode ()
9911029{
9921030dtm -> status = MTM_ONLINE ;
1031+ elog (WARNING ,"Switch to normal mode" );
9931032/* ??? Something else to do here? */
9941033}
9951034
@@ -1008,8 +1047,10 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
10081047}
10091048
10101049if (!TransactionIdIsValid (gtid -> xid )) {
1050+ /* In case of recovery InvalidTransactionId is passed */
10111051Assert (dtm -> status == MTM_RECOVERY );
10121052}else if (dtm -> status == MTM_RECOVERY ) {
1053+ /* When recovery is completed we get normal transaction ID and switch to normal mode */
10131054MtmSwitchFromRecoveryToNormalMode ();
10141055}
10151056dtmTx .gtid = * gtid ;
@@ -1026,6 +1067,7 @@ void MtmReceiverStarted(int nodeId)
10261067if (!BIT_CHECK (dtm -> pglogicalNodeMask ,nodeId - 1 )) {
10271068dtm -> pglogicalNodeMask |= (int64 )1 << (nodeId - 1 );
10281069if (++ dtm -> nReceivers == dtm -> nNodes - 1 ) {
1070+ elog (WARNING ,"All receivers are started, switch to normal mode" );
10291071Assert (dtm -> status == MTM_CONNECTED );
10301072dtm -> status = MTM_ONLINE ;
10311073}
@@ -1048,17 +1090,25 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
10481090return snapshot ;
10491091}
10501092
1093+ /*
1094+ * Determine when and how we should open replication slot.
1095+ * Druing recovery we need to open only one replication slot from which node should receive all transactions.
1096+ * Slots at other nodes should be removed
1097+ */
10511098MtmSlotMode MtmReceiverSlotMode (int nodeId )
10521099{
10531100while (dtm -> status != MTM_CONNECTED && dtm -> status != MTM_ONLINE ) {
10541101if (dtm -> status == MTM_RECOVERY ) {
10551102if (dtm -> recoverySlot == 0 || dtm -> recoverySlot == nodeId ) {
1103+ /* Choose for recovery first available slot */
10561104dtm -> recoverySlot = nodeId ;
10571105return SLOT_OPEN_EXISTED ;
10581106}
10591107}
1108+ /* delay opening of other slots until recovery is completed */
10601109MtmSleep (STATUS_POLL_DELAY );
10611110}
1111+ /* After recovery completion we need to drop all other slots to avoid receive of redundant data */
10621112return dtm -> recoverySlot ?SLOT_CREATE_NEW :SLOT_OPEN_ALWAYS ;
10631113}
10641114