@@ -60,11 +60,19 @@ typedef struct {
6060bool isReplicated ;/* transaction on replica */
6161bool isDistributed ;/* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6262bool containsDML ;/* transaction contains DML statements */
63+ bool isPrepared ;/* transaction was prepared for commit */
6364csn_t snapshot ;/* transaction snaphsot */
6465}MtmCurrentTrans ;
6566
6667/* #define USE_SPINLOCK 1 */
6768
69+ typedef enum
70+ {
71+ HASH_LOCK_ID ,
72+ COMMIT_LOCK_ID ,
73+ N_LOCKS
74+ }MtmLockIds ;
75+
6876#define MTM_SHMEM_SIZE (64*1024*1024)
6977#define MTM_HASH_SIZE 100003
7078#define USEC 1000000
@@ -150,7 +158,7 @@ void MtmLock(LWLockMode mode)
150158#ifdef USE_SPINLOCK
151159SpinLockAcquire (& dtm -> spinlock );
152160#else
153- LWLockAcquire (dtm -> hashLock ,mode );
161+ LWLockAcquire (dtm -> locks [ HASH_LOCK_ID ] ,mode );
154162#endif
155163}
156164
@@ -159,7 +167,7 @@ void MtmUnlock(void)
159167#ifdef USE_SPINLOCK
160168SpinLockRelease (& dtm -> spinlock );
161169#else
162- LWLockRelease (dtm -> hashLock );
170+ LWLockRelease (dtm -> locks [ HASH_LOCK_ID ] );
163171#endif
164172}
165173
@@ -412,7 +420,7 @@ static void MtmInitialize()
412420{
413421dtm -> status = MTM_INITIALIZATION ;
414422dtm -> recoverySlot = 0 ;
415- dtm -> hashLock = ( LWLock * ) GetNamedLWLockTranche (MULTIMASTER_NAME );
423+ dtm -> locks = GetNamedLWLockTranche (MULTIMASTER_NAME );
416424dtm -> csn = MtmGetCurrentTime ();
417425dtm -> oldestXid = FirstNormalTransactionId ;
418426dtm -> nNodes = MtmNodes ;
@@ -423,6 +431,7 @@ static void MtmInitialize()
423431dtm -> transListTail = & dtm -> transListHead ;
424432dtm -> nReceivers = 0 ;
425433dtm -> timeShift = 0 ;
434+ pg_atomic_write_u32 (& dtm -> nCommittingTrans ,0 );
426435PGSemaphoreCreate (& dtm -> votingSemaphore );
427436PGSemaphoreReset (& dtm -> votingSemaphore );
428437SpinLockInit (& dtm -> spinlock );
@@ -467,6 +476,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
467476x -> xid = GetCurrentTransactionIdIfAny ();
468477x -> isReplicated = false;
469478x -> isDistributed = IsNormalProcessingMode ()&& dtm -> status == MTM_ONLINE && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess ();
479+ x -> isPrepared = false;
470480x -> containsDML = false;
471481x -> snapshot = MtmAssignCSN ();
472482x -> gtid .xid = InvalidTransactionId ;
@@ -476,6 +486,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
476486 }
477487}
478488
489+
479490/*
480491 * We need to pass snapshot to WAL-sender, so create record in transaction status hash table
481492 * before commit
@@ -488,8 +499,14 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
488499if (!x -> isDistributed ) {
489500return ;
490501}
491- x -> xid = GetCurrentTransactionId ();
502+ /* Check that commits are not disabled */
503+ LWLockAcquire (dtm -> locks [COMMIT_LOCK_ID ],LW_SHARED );
504+ LWLockRelease (dtm -> locks [COMMIT_LOCK_ID ]);
492505
506+ pg_atomic_fetch_add_u32 (dtm -> nCommittingTransactions ,1 );
507+ x -> isPrepared = true;
508+ x -> xid = GetCurrentTransactionId ();
509+
493510MtmLock (LW_EXCLUSIVE );
494511ts = hash_search (xid2state ,& x -> xid ,HASH_ENTER ,NULL );
495512ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
@@ -500,6 +517,7 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
500517ts -> procno = MyProc -> pgprocno ;
501518ts -> nVotes = 0 ;
502519ts -> done = false;
520+
503521if (TransactionIdIsValid (x -> gtid .xid )) {
504522ts -> gtid = x -> gtid ;
505523}else {
@@ -528,6 +546,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
528546MtmAdjustSubtransactions (ts );
529547MtmUnlock ();
530548}
549+ if (x -> isPrepared ) {
550+ pg_atomic_fetch_add_u32 (dtm -> nCommittingTransactions ,-1 );
551+ }
531552x -> snapshot = INVALID_CSN ;
532553x -> xid = InvalidTransactionId ;
533554x -> gtid .xid = InvalidTransactionId ;
@@ -547,6 +568,39 @@ void MtmSendNotificationMessage(MtmTransState* ts)
547568}
548569}
549570
571+ void MtmUpdateStatus (bool recovered )
572+ {
573+ if (dtm -> status == MTM_RECOVERY ) {
574+ MtmLock (LW_EXCLUSIVE );
575+ dtm -> status = MTM_ONLINE ;/* Is it all we shoudl do t switch to nortmal state */
576+ MtmUnlock ();
577+ }
578+ }
579+
580+ void MtmRecoveryCompleted (int nodeId )
581+ {
582+ if (BIT_CHECK (dtm -> pglogicalNodeMask ,nodeId - 1 )) {
583+ if (MyWalSnd -> sentPtr == GetXLogInsertRecPtr ()) {
584+ /* Ok, now we done with recovery of node */
585+ MtmLock (LW_EXCLUSIVE );
586+ dtm -> pglogicalNodeMask &= (int64 )1 << (nodeId - 1 );/* now node is assumed as recovered */
587+ dtm -> nNodes += 1 ;
588+ MtmUnlock ();
589+
590+ LWLockRelease (dtm -> locks [COMMIT_LOCK_ID ]);/* enable commits */
591+
592+ return true;
593+ }else if (MyWalSnd -> sentPtr + MtmSlotDelayThreashold > GetXLogInsertRecPtr ()) {
594+ /* we almost done with recovery of node.. */
595+ LWLockAcquire (dtm -> locks [COMMIT_LOCK_ID ],LW_EXCLUSIVE );/* disable new commits */
596+ }
597+ return false;
598+ }else {
599+ return true;
600+ }
601+ }
602+
603+
550604static bool
551605MtmCommitTransaction (TransactionId xid ,int nsubxids ,TransactionId * subxids )
552606{
@@ -803,7 +857,7 @@ _PG_init(void)
803857 * resources in mtm_shmem_startup().
804858 */
805859RequestAddinShmemSpace (MTM_SHMEM_SIZE + MtmQueueSize );
806- RequestNamedLWLockTranche (MULTIMASTER_NAME ,1 );
860+ RequestNamedLWLockTranche (MULTIMASTER_NAME ,N_LOCKS );
807861
808862MtmNodes = MtmStartReceivers (MtmConnStrs ,MtmNodeId );
809863if (MtmNodes < 2 ) {