@@ -70,6 +70,7 @@ typedef struct {
7070#define USEC 1000000
7171#define MIN_WAIT_TIMEOUT 1000
7272#define MAX_WAIT_TIMEOUT 100000
73+ #define STATUS_POLL_DELAY USEC
7374
7475void _PG_init (void );
7576void _PG_fini (void );
@@ -147,7 +148,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
147148void MtmLock (LWLockMode mode )
148149{
149150#ifdef USE_SPINLOCK
150- SpinLockAcquire (& dtm -> hashSpinlock );
151+ SpinLockAcquire (& dtm -> spinlock );
151152#else
152153LWLockAcquire (dtm -> hashLock ,mode );
153154#endif
@@ -156,7 +157,7 @@ void MtmLock(LWLockMode mode)
156157void MtmUnlock (void )
157158{
158159#ifdef USE_SPINLOCK
159- SpinLockRelease (& dtm -> hashSpinlock );
160+ SpinLockRelease (& dtm -> spinlock );
160161#else
161162LWLockRelease (dtm -> hashLock );
162163#endif
@@ -409,20 +410,22 @@ static void MtmInitialize()
409410dtm = (MtmState * )ShmemInitStruct (MULTIMASTER_NAME ,sizeof (MtmState ),& found );
410411if (!found )
411412{
413+ dtm -> status = MTM_INITIALIZATION ;
414+ dtm -> recoverySlot = 0 ;
412415dtm -> hashLock = (LWLock * )GetNamedLWLockTranche (MULTIMASTER_NAME );
413416dtm -> csn = MtmGetCurrentTime ();
414417dtm -> oldestXid = FirstNormalTransactionId ;
415418dtm -> nNodes = MtmNodes ;
416419dtm -> disabledNodeMask = 0 ;
420+ dtm -> pglogicalNodeMask = 0 ;
417421dtm -> votingTransactions = NULL ;
418422dtm -> transListHead = NULL ;
419- dtm -> transListTail = & dtm -> transListHead ;
420- pg_atomic_write_u32 ( & dtm -> nReceivers , 0 ) ;
423+ dtm -> transListTail = & dtm -> transListHead ;
424+ dtm -> nReceivers = 0 ;
421425dtm -> timeShift = 0 ;
422- dtm -> initialized = false;
423426PGSemaphoreCreate (& dtm -> votingSemaphore );
424427PGSemaphoreReset (& dtm -> votingSemaphore );
425- SpinLockInit (& dtm -> hashSpinlock );
428+ SpinLockInit (& dtm -> spinlock );
426429BgwPoolInit (& dtm -> pool ,MtmExecutor ,MtmDatabaseName ,MtmQueueSize );
427430RegisterXactCallback (MtmXactCallback ,NULL );
428431dtmTx .snapshot = INVALID_CSN ;
@@ -463,7 +466,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
463466MtmLock (LW_EXCLUSIVE );
464467x -> xid = GetCurrentTransactionIdIfAny ();
465468x -> isReplicated = false;
466- x -> isDistributed = IsNormalProcessingMode ()&& dtm -> initialized && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess ();
469+ x -> isDistributed = IsNormalProcessingMode ()&& dtm -> status == MTM_ONLINE && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess ();
467470x -> containsDML = false;
468471x -> snapshot = MtmAssignCSN ();
469472x -> gtid .xid = InvalidTransactionId ;
@@ -575,8 +578,6 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
575578XidStatus prevStatus = TRANSACTION_STATUS_UNKNOWN ;
576579bool found ;
577580
578- Assert (status == TRANSACTION_STATUS_ABORTED );
579-
580581MtmLock (LW_EXCLUSIVE );
581582ts = hash_search (xid2state ,& xid ,HASH_ENTER ,& found );
582583if (!found ) {
@@ -590,7 +591,7 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
590591ts -> status = status ;
591592MtmAdjustSubtransactions (ts );
592593
593- if (prevStatus != TRANSACTION_STATUS_ABORTED ) {
594+ if (dtm -> status != MTM_RECOVERY && prevStatus != TRANSACTION_STATUS_ABORTED ) {
594595ts -> cmd = MSG_ABORTED ;
595596MtmSendNotificationMessage (ts );
596597}
@@ -607,7 +608,7 @@ MtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids,
607608MTM_TRACE ("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d\n" ,getpid (),xid ,dtmTx .xid ,status ,dtmTx .isDistributed );
608609if (xid == dtmTx .xid && dtmTx .isDistributed )
609610{
610- if (status == TRANSACTION_STATUS_ABORTED || !dtmTx .containsDML )
611+ if (status == TRANSACTION_STATUS_ABORTED || !dtmTx .containsDML || dtm -> status == MTM_RECOVERY )
611612{
612613MtmFinishTransaction (xid ,nsubxids ,subxids ,status );
613614MTM_TRACE ("Finish transaction %d, status=%d, DML=%d\n" ,xid ,status ,dtmTx .containsDML );
@@ -863,11 +864,17 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
863864dtmTx .containsDML = true;
864865}
865866
866- void MtmReceiverStarted ()
867+ void MtmReceiverStarted (int nodeId )
867868{
868- if (pg_atomic_fetch_add_u32 (& dtm -> nReceivers ,1 )== dtm -> nNodes - 2 ) {
869- dtm -> initialized = true;
869+ SpinLockAcquire (& dtm -> spinlock );
870+ if (!BIT_CHECK (dtm -> pglogicalNodeMask ,nodeId - 1 )) {
871+ dtm -> pglogicalNodeMask |= (int64 )1 << (nodeId - 1 );
872+ if (++ dtm -> nReceivers == dtm -> nNodes - 1 ) {
873+ Assert (dtm -> status == MTM_CONNECTED );
874+ dtm -> status = MTM_ONLINE ;
875+ }
870876 }
877+ SpinLockRelease (& dtm -> spinlock );
871878}
872879
873880csn_t MtmTransactionSnapshot (TransactionId xid )
@@ -885,10 +892,23 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
885892return snapshot ;
886893}
887894
888-
895+ MtmSlotMode MtmReceiverSlotMode (int nodeId )
896+ {
897+ while (dtm -> status != MTM_CONNECTED && dtm -> status != MTM_ONLINE ) {
898+ if (dtm -> status == MTM_RECOVERY ) {
899+ if (dtm -> recoverySlot == 0 || dtm -> recoverySlot == nodeId ) {
900+ dtm -> recoverySlot = nodeId ;
901+ return SLOT_OPEN_EXISTED ;
902+ }
903+ }
904+ MtmSleep (STATUS_POLL_DELAY );
905+ }
906+ return dtm -> recoverySlot ?SLOT_CREATE_NEW :SLOT_OPEN_ALWAYS ;
907+ }
908+
889909void MtmDropNode (int nodeId ,bool dropSlot )
890910{
891- if (!BIT_SET (dtm -> disabledNodeMask ,nodeId - 1 ))
911+ if (!BIT_CHECK (dtm -> disabledNodeMask ,nodeId - 1 ))
892912{
893913if (nodeId <=0 || nodeId > dtm -> nNodes )
894914{
@@ -969,7 +989,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
969989p = conn_str_end ;
970990}
971991* p = '\0' ;
972- if (!BIT_SET (disabledNodeMask ,i ))
992+ if (!BIT_CHECK (disabledNodeMask ,i ))
973993{
974994conns [i ]= PQconnectdb (conn_str );
975995if (PQstatus (conns [i ])!= CONNECTION_OK )