4545#include "storage/proc.h"
4646#include "utils/syscache.h"
4747#include "replication/walsender.h"
48+ #include "replication/walsender_private.h"
4849#include "replication/slot.h"
4950#include "port/atomics.h"
5051#include "tcop/utility.h"
@@ -60,16 +61,14 @@ typedef struct {
6061bool isReplicated ;/* transaction on replica */
6162bool isDistributed ;/* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6263bool containsDML ;/* transaction contains DML statements */
63- bool isPrepared ;/* transaction was prepared for commit */
6464csn_t snapshot ;/* transaction snaphsot */
6565}MtmCurrentTrans ;
6666
6767/* #define USE_SPINLOCK 1 */
6868
6969typedef enum
7070{
71- HASH_LOCK_ID ,
72- COMMIT_LOCK_ID ,
71+ MTM_STATE_LOCK_ID ,
7372N_LOCKS
7473}MtmLockIds ;
7574
@@ -142,6 +141,7 @@ int MtmReconnectAttempts;
142141static int MtmQueueSize ;
143142static int MtmWorkers ;
144143static int MtmVacuumDelay ;
144+ static int MtmMinRecoveryLag ;
145145
146146static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
147147static ProcessUtility_hook_type PreviousProcessUtilityHook ;
@@ -158,7 +158,7 @@ void MtmLock(LWLockMode mode)
158158#ifdef USE_SPINLOCK
159159SpinLockAcquire (& dtm -> spinlock );
160160#else
161- LWLockAcquire (dtm -> locks [HASH_LOCK_ID ],mode );
161+ LWLockAcquire (( LWLockId ) & dtm -> locks [MTM_STATE_LOCK_ID ],mode );
162162#endif
163163}
164164
@@ -167,7 +167,7 @@ void MtmUnlock(void)
167167#ifdef USE_SPINLOCK
168168SpinLockRelease (& dtm -> spinlock );
169169#else
170- LWLockRelease (dtm -> locks [HASH_LOCK_ID ]);
170+ LWLockRelease (( LWLockId ) & dtm -> locks [MTM_STATE_LOCK_ID ]);
171171#endif
172172}
173173
@@ -426,12 +426,14 @@ static void MtmInitialize()
426426dtm -> nNodes = MtmNodes ;
427427dtm -> disabledNodeMask = 0 ;
428428dtm -> pglogicalNodeMask = 0 ;
429+ dtm -> walSenderLockerMask = 0 ;
430+ dtm -> nodeLockerMask = 0 ;
431+ dtm -> nLockers = 0 ;
429432dtm -> votingTransactions = NULL ;
430433dtm -> transListHead = NULL ;
431434dtm -> transListTail = & dtm -> transListHead ;
432435dtm -> nReceivers = 0 ;
433436dtm -> timeShift = 0 ;
434- pg_atomic_write_u32 (& dtm -> nCommittingTrans ,0 );
435437PGSemaphoreCreate (& dtm -> votingSemaphore );
436438PGSemaphoreReset (& dtm -> votingSemaphore );
437439SpinLockInit (& dtm -> spinlock );
@@ -476,7 +478,6 @@ MtmBeginTransaction(MtmCurrentTrans* x)
476478x -> xid = GetCurrentTransactionIdIfAny ();
477479x -> isReplicated = false;
478480x -> isDistributed = IsNormalProcessingMode ()&& dtm -> status == MTM_ONLINE && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess ();
479- x -> isPrepared = false;
480481x -> containsDML = false;
481482x -> snapshot = MtmAssignCSN ();
482483x -> gtid .xid = InvalidTransactionId ;
@@ -487,6 +488,53 @@ MtmBeginTransaction(MtmCurrentTrans* x)
487488}
488489
489490
491+ /* This function is called at transaction start with multimaster ock set */
492+ static void
493+ MtmCheckClusterLock ()
494+ {
495+ while (true)
496+ {
497+ nodemask_t mask = dtm -> walSenderLockerMask ;
498+ if (mask != 0 ) {
499+ XLogRecPtr currLogPos = GetXLogInsertRecPtr ();
500+ int i ;
501+ timestamp_t delay = MIN_WAIT_TIMEOUT ;
502+ for (i = 0 ;mask != 0 ;i ++ ,mask >>=1 ) {
503+ if (mask & 1 ) {
504+ if (WalSndCtl -> walsnds [i ].sentPtr != currLogPos ) {
505+ /* recovery is in progress */
506+ break ;
507+ }else {
508+ /* recovered replica catched up with master */
509+ dtm -> walSenderLockerMask &= ~((nodemask_t )1 <<i );
510+ }
511+ }
512+ }
513+ if (mask != 0 ) {
514+ /* some "almost catch-up" wal-senders are still working */
515+ /* Do not start new transactions until them complete */
516+ MtmUnlock ();
517+ MtmSleep (delay );
518+ if (delay * 2 <=MAX_WAIT_TIMEOUT ) {
519+ delay *=2 ;
520+ }
521+ MtmLock (LW_EXCLUSIVE );
522+ continue ;
523+ }else {
524+ /* All lockers are synchronized their logs */
525+ /* Remove lock and mark them as receovered */
526+ Assert (dtm -> walSenderLockerMask == 0 );
527+ Assert ((dtm -> nodeLockerMask & dtm -> disabledNodeMask )== dtm -> nodeLockerMask );
528+ dtm -> disabledNodeMask &= ~dtm -> nodeLockerMask ;
529+ dtm -> nNodes += dtm -> nLockers ;
530+ dtm -> nLockers = 0 ;
531+ dtm -> nodeLockerMask = 0 ;
532+ }
533+ }
534+ break ;
535+ }
536+ }
537+
490538/*
491539 * We need to pass snapshot to WAL-sender, so create record in transaction status hash table
492540 * before commit
@@ -499,15 +547,12 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
499547if (!x -> isDistributed ) {
500548return ;
501549}
502- /* Check that commits are not disabled */
503- LWLockAcquire (dtm -> locks [COMMIT_LOCK_ID ],LW_SHARED );
504- LWLockRelease (dtm -> locks [COMMIT_LOCK_ID ]);
505550
506- pg_atomic_fetch_add_u32 (dtm -> nCommittingTransactions ,1 );
507- x -> isPrepared = true;
508551x -> xid = GetCurrentTransactionId ();
509552
510553MtmLock (LW_EXCLUSIVE );
554+ MtmCheckClusterLock ();
555+
511556ts = hash_search (xid2state ,& x -> xid ,HASH_ENTER ,NULL );
512557ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
513558ts -> snapshot = x -> isReplicated || !x -> containsDML ?INVALID_CSN :x -> snapshot ;
@@ -546,9 +591,6 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
546591MtmAdjustSubtransactions (ts );
547592MtmUnlock ();
548593}
549- if (x -> isPrepared ) {
550- pg_atomic_fetch_add_u32 (dtm -> nCommittingTransactions ,-1 );
551- }
552594x -> snapshot = INVALID_CSN ;
553595x -> xid = InvalidTransactionId ;
554596x -> gtid .xid = InvalidTransactionId ;
@@ -568,39 +610,29 @@ void MtmSendNotificationMessage(MtmTransState* ts)
568610}
569611}
570612
571- void MtmUpdateStatus (bool recovered )
572- {
573- if (dtm -> status == MTM_RECOVERY ) {
574- MtmLock (LW_EXCLUSIVE );
575- dtm -> status = MTM_ONLINE ;/* Is it all we shoudl do t switch to nortmal state */
576- MtmUnlock ();
577- }
578- }
579-
580- void MtmRecoveryCompleted (int nodeId )
613+ /*
614+ * This function is called by WAL sender when start sending new transaction
615+ */
616+ bool MtmIsRecoveredNode (int nodeId )
581617{
582- if (BIT_CHECK (dtm -> pglogicalNodeMask ,nodeId - 1 )) {
583- if (MyWalSnd -> sentPtr == GetXLogInsertRecPtr ()) {
584- /* Ok, now we done with recovery of node */
618+ if (BIT_CHECK (dtm -> disabledNodeMask ,nodeId - 1 )) {
619+ Assert (MyWalSnd != NULL );
620+ if (!BIT_CHECK (dtm -> nodeLockerMask ,nodeId - 1 )
621+ && MyWalSnd -> sentPtr + MtmMinRecoveryLag > GetXLogInsertRecPtr ())
622+ {
623+ /* Wal sender almost catched up */
624+ /* Lock cluster preventing new transaction to start until wal is completely replayed */
585625MtmLock (LW_EXCLUSIVE );
586- dtm -> pglogicalNodeMask &= (int64 )1 << (nodeId - 1 );/* now node is assumed as recovered */
587- dtm -> nNodes += 1 ;
626+ dtm -> nodeLockerMask |= (nodemask_t )1 << (nodeId - 1 );
627+ dtm -> walSenderLockerMask |= (nodemask_t )1 << (MyWalSnd - WalSndCtl -> walsnds );
628+ dtm -> nLockers += 1 ;
588629MtmUnlock ();
589-
590- LWLockRelease (dtm -> locks [COMMIT_LOCK_ID ]);/* enable commits */
591-
592- return true;
593- }else if (MyWalSnd -> sentPtr + MtmSlotDelayThreashold > GetXLogInsertRecPtr ()) {
594- /* we almost done with recovery of node.. */
595- LWLockAcquire (dtm -> locks [COMMIT_LOCK_ID ],LW_EXCLUSIVE );/* disable new commits */
596630}
597- return false;
598- }else {
599631return true;
600632}
633+ return false;
601634}
602635
603-
604636static bool
605637MtmCommitTransaction (TransactionId xid ,int nsubxids ,TransactionId * subxids )
606638{
@@ -717,6 +749,21 @@ _PG_init(void)
717749if (!process_shared_preload_libraries_in_progress )
718750return ;
719751
752+ DefineCustomIntVariable (
753+ "multimaster.min_recovery_lag" ,
754+ "Minamal lag of WAL-sender performing recovery after which cluster is locked until recovery is completed" ,
755+ NULL ,
756+ & MtmMinRecoveryLag ,
757+ 100000 ,
758+ 1 ,
759+ INT_MAX ,
760+ PGC_BACKEND ,
761+ 0 ,
762+ NULL ,
763+ NULL ,
764+ NULL
765+ );
766+
720767DefineCustomIntVariable (
721768"multimaster.vacuum_delay" ,
722769"Minimal age of records which can be vacuumed (seconds)" ,
@@ -897,6 +944,12 @@ _PG_fini(void)
897944 * ***************************************************************************
898945 */
899946
947+ static void MtmSwitchFromRecoveryToNormalMode ()
948+ {
949+ dtm -> status = MTM_ONLINE ;
950+ /* ??? Something else to do here? */
951+ }
952+
900953void MtmJoinTransaction (GlobalTransactionId * gtid ,csn_t globalSnapshot )
901954{
902955csn_t localSnapshot ;
@@ -910,6 +963,11 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
910963elog (ERROR ,"Too old snapshot: requested %ld, current %ld" ,globalSnapshot ,localSnapshot );
911964}
912965
966+ if (!TransactionIdIsValid (gtid -> xid )) {
967+ Assert (dtm -> status == MTM_RECOVERY );
968+ }else if (dtm -> status == MTM_RECOVERY ) {
969+ MtmSwitchFromRecoveryToNormalMode ();
970+ }
913971dtmTx .gtid = * gtid ;
914972dtmTx .xid = GetCurrentTransactionId ();
915973dtmTx .snapshot = globalSnapshot ;