@@ -77,7 +77,6 @@ typedef enum
7777
7878#define MTM_SHMEM_SIZE (64*1024*1024)
7979#define MTM_HASH_SIZE 100003
80- #define USEC 1000000
8180#define MIN_WAIT_TIMEOUT 1000
8281#define MAX_WAIT_TIMEOUT 100000
8382#define STATUS_POLL_DELAY USEC
@@ -132,6 +131,15 @@ static TransactionManager MtmTM = {
132131MtmGetName
133132};
134133
134+ static char const * const MtmNodeStatusMnem []=
135+ {
136+ "Intialization" ,
137+ "Offline" ,
138+ "Connected" ,
139+ "Online" ,
140+ "Recovery"
141+ };
142+
135143bool MtmDoReplication ;
136144char * MtmDatabaseName ;
137145
@@ -509,8 +517,9 @@ MtmBeginTransaction(MtmCurrentTrans* x)
509517x -> isReplicated = false;
510518x -> isDistributed = MtmIsUserTransaction ();
511519if (x -> isDistributed && dtm -> status != MTM_ONLINE ) {
512- MtmUnlock ();
513- elog (ERROR ,"Multimaster node is offline" );
520+ /* reject all user's transactions at offline cluster */
521+ MtmUnlock ();
522+ elog (ERROR ,"Multimaster node is not online" );
514523}
515524x -> containsDML = false;
516525x -> isPrepared = false;
@@ -594,6 +603,9 @@ static void MtmPrecommitTransaction(MtmCurrentTrans* x)
594603
595604if (dtm -> disabledNodeMask != 0 ) {
596605MtmUpdateClusterStatus ();
606+ if (dtm -> status != MTM_ONLINE ) {
607+ elog (ERROR ,"Abort current transaction because this cluster node is not online" );
608+ }
597609}
598610
599611MtmLock (LW_EXCLUSIVE );
@@ -1084,21 +1096,13 @@ _PG_fini(void)
10841096 */
10851097
10861098
1087- void MtmSwitchToNormalMode ()
1088- {
1089- dtm -> status = MTM_ONLINE ;
1090- elog (WARNING ,"Switch to normal mode" );
1091- /* ??? Something else to do here? */
1092- }
1093-
1094- void MtmSwitchToRecoveryMode ()
1099+ void MtmClusterSwitchMode (MtmNodeStatus mode )
10951100{
1096- dtm -> status = MTM_RECOVERY ;
1101+ dtm -> status = mode ;
1102+ elog (WARNING ,"Switch to %s mode" ,MtmNodeStatusMnem [mode ]);
10971103/* ??? Something else to do here? */
1098- elog (ERROR ,"Switch to normal mode" );
10991104}
11001105
1101-
11021106void MtmJoinTransaction (GlobalTransactionId * gtid ,csn_t globalSnapshot )
11031107{
11041108csn_t localSnapshot ;
@@ -1117,7 +1121,7 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
11171121Assert (dtm -> status == MTM_RECOVERY );
11181122}else if (dtm -> status == MTM_RECOVERY ) {
11191123/* When recovery is completed we get normal transaction ID and switch to normal mode */
1120- MtmSwitchToNormalMode ( );
1124+ MtmClusterSwitchMode ( MTM_ONLINE );
11211125}
11221126dtmTx .gtid = * gtid ;
11231127dtmTx .xid = GetCurrentTransactionId ();
@@ -1670,7 +1674,10 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix)
16701674}
16711675}
16721676
1673-
1677+ /**
1678+ * Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
1679+ * This function returns false if current node is excluded from cluster, true otherwise
1680+ */
16741681void MtmUpdateClusterStatus (void )
16751682{
16761683nodemask_t mask ,clique ;
@@ -1683,17 +1690,29 @@ void MtmUpdateClusterStatus(void)
16831690clique = MtmFindMaxClique (matrix ,MtmNodes ,& clique_size );
16841691if (clique_size >=MtmNodes /2 + 1 ) {/* have quorum */
16851692MtmLock (LW_EXCLUSIVE );
1686- mask = ~clique & (((nodemask_t )1 <<MtmNodes )- 1 )& ~dtm -> disabledNodeMask ;
1693+ mask = ~clique & (((nodemask_t )1 <<MtmNodes )- 1 )& ~dtm -> disabledNodeMask ;/* new disabled nodes mask */
16871694for (i = 0 ;mask != 0 ;i ++ ,mask >>=1 ) {
16881695if (mask & 1 ) {
16891696dtm -> nNodes -= 1 ;
16901697BIT_SET (dtm -> disabledNodeMask ,i );
16911698}
16921699}
1700+ mask = clique & dtm -> disabledNodeMask ;/* new enabled nodes mask */
1701+ for (i = 0 ;mask != 0 ;i ++ ,mask >>=1 ) {
1702+ if (mask & 1 ) {
1703+ dtm -> nNodes += 1 ;
1704+ BIT_CLEAR (dtm -> disabledNodeMask ,i );
1705+ }
1706+ }
16931707MtmUnlock ();
16941708if (BIT_CHECK (dtm -> disabledNodeMask ,MtmNodeId - 1 )) {
1695- /* I was excluded from cluster:( */
1696- MtmSwitchToRecoveryMode ();
1709+ if (dtm -> status == MTM_ONLINE ) {
1710+ /* I was excluded from cluster:( */
1711+ MtmClusterSwitchMode (MTM_OFFLINE );
1712+ }
1713+ }else if (dtm -> status == MTM_OFFLINE ) {
1714+ /* Should we somehow restart logical receivers? */
1715+ MtmClusterSwitchMode (MTM_RECOVERY );
16971716}
16981717}else {
16991718elog (WARNING ,"Clique %lx has no quorum" ,clique );