@@ -159,8 +159,8 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
159159static void MtmBroadcastUtilityStmt (char const * sql ,bool ignoreError );
160160static void MtmProcessDDLCommand (char const * queryString ,bool transactional );
161161
162- static void MtmSuspendNode (void );
163- static void MtmResumeNode (void );
162+ static void MtmLockCluster (void );
163+ static void MtmUnlockCluster (void );
164164
165165MtmState * Mtm ;
166166
@@ -254,7 +254,7 @@ static bool MtmIgnoreTablesWithoutPk;
254254static int MtmLockCount ;
255255static bool MtmMajorNode ;
256256static bool MtmBreakConnection ;
257- static bool MtmSuspended ;
257+ static bool MtmClusterLocked ;
258258static bool MtmInsideTransaction ;
259259
260260static ExecutorStart_hook_type PreviousExecutorStartHook ;
@@ -288,8 +288,8 @@ void MtmReleaseLocks(void)
288288MtmInsideTransaction = false;
289289MtmUnlock ();
290290}
291- if (MtmSuspended ) {
292- MtmResumeNode ();
291+ if (MtmClusterLocked ) {
292+ MtmUnlockCluster ();
293293}
294294if (MtmLockCount != 0 ) {
295295Assert (Mtm -> lastLockHolder == MyProcPid );
@@ -876,8 +876,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
876876 * Also allow user to complete explicit 2PC transactions.
877877 */
878878if (x -> isDistributed
879- && (Mtm -> exclusiveLock || (!x -> isReplicated && !x -> isTwoPhase ))
880- && !MtmSuspended
879+ && !MtmClusterLocked /* do not lock myself */
881880&& strcmp (application_name ,MULTIMASTER_ADMIN )!= 0 )
882881{
883882MtmCheckClusterLock ();
@@ -886,7 +885,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
886885Mtm -> nRunningTransactions += 1 ;
887886
888887x -> snapshot = MtmAssignCSN ();
889- MTM_LOG1 ("Start transaction %lld with snapshot %lld" , (long64 )x -> xid ,x -> snapshot );
888+ MTM_LOG2 ("Start transaction %lld with snapshot %lld" , (long64 )x -> xid ,x -> snapshot );
890889
891890MtmUnlock ();
892891
@@ -1448,11 +1447,25 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
14481447if (!MyReplicationSlot ) {
14491448MtmCheckSlots ();
14501449}
1451- if (MtmSuspended ) {
1452- MtmResumeNode ();
1450+ if (MtmClusterLocked ) {
1451+ MtmUnlockCluster ();
14531452}
14541453}
14551454
1455+ /*
1456+ * Initialize message
1457+ */
1458+ void MtmInitMessage (MtmArbiterMessage * msg ,MtmMessageCode code )
1459+ {
1460+ msg -> code = code ;
1461+ msg -> disabledNodeMask = Mtm -> disabledNodeMask ;
1462+ msg -> connectivityMask = SELF_CONNECTIVITY_MASK ;
1463+ msg -> oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
1464+ msg -> lockReq = Mtm -> originLockNodeMask != 0 ;
1465+ msg -> locked = (Mtm -> originLockNodeMask |Mtm -> inducedLockNodeMask )!= 0 ;
1466+ }
1467+
1468+
14561469/*
14571470 * Send arbiter's message
14581471 */
@@ -1489,13 +1502,9 @@ void MtmSendMessage(MtmArbiterMessage* msg)
14891502void MtmSend2PCMessage (MtmTransState * ts ,MtmMessageCode cmd )
14901503{
14911504MtmArbiterMessage msg ;
1492- msg . code = cmd ;
1505+ MtmInitMessage ( & msg , cmd ) ;
14931506msg .sxid = ts -> xid ;
14941507msg .csn = ts -> csn ;
1495- msg .disabledNodeMask = Mtm -> disabledNodeMask ;
1496- msg .connectivityMask = SELF_CONNECTIVITY_MASK ;
1497- msg .oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
1498- msg .lockReq = Mtm -> nodeLockerMask != 0 ;
14991508memcpy (msg .gid ,ts -> gid ,MULTIMASTER_MAX_GID_SIZE );
15001509
15011510Assert (!MtmIsCoordinator (ts ));/* All broadcasts are now done through logical decoding */
@@ -1516,11 +1525,7 @@ static void MtmBroadcastPollMessage(MtmTransState* ts)
15161525{
15171526int i ;
15181527MtmArbiterMessage msg ;
1519- msg .code = MSG_POLL_REQUEST ;
1520- msg .disabledNodeMask = Mtm -> disabledNodeMask ;
1521- msg .connectivityMask = SELF_CONNECTIVITY_MASK ;
1522- msg .oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
1523- msg .lockReq = Mtm -> nodeLockerMask != 0 ;
1528+ MtmInitMessage (& msg ,MSG_POLL_REQUEST );
15241529memcpy (msg .gid ,ts -> gid ,MULTIMASTER_MAX_GID_SIZE );
15251530ts -> votedMask = 0 ;
15261531
@@ -1928,7 +1933,7 @@ void MtmRecoveryCompleted(void)
19281933 * logical replication connections with this node.
19291934 * Under the intensive workload start of logical replication can be delayed for unpredictable amount of time
19301935 */
1931- BIT_SET (Mtm -> nodeLockerMask ,MtmNodeId - 1 );/* it is trick: this mask was originally used by WAL senders performing recovery, but here we are in opposite (recovered) side:
1936+ BIT_SET (Mtm -> originLockNodeMask ,MtmNodeId - 1 );/* it is trick: this mask was originally used by WAL senders performing recovery, but here we are in opposite (recovered) side:
19321937 * if this mask is not zero loadReq will be broadcasted to all other nodes by heartbeat, suspending their activity
19331938 */
19341939MtmSwitchClusterMode (MTM_RECOVERED );
@@ -2017,7 +2022,7 @@ void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN)
20172022MtmLock (LW_EXCLUSIVE );
20182023if (MtmIsRecoveredNode (nodeId )) {
20192024lsn_t walLSN = GetXLogInsertRecPtr ();
2020- if (!BIT_CHECK (Mtm -> nodeLockerMask ,nodeId - 1 )
2025+ if (!BIT_CHECK (Mtm -> originLockNodeMask ,nodeId - 1 )
20212026&& slotLSN + MtmMinRecoveryLag > walLSN )
20222027{
20232028/*
@@ -2028,14 +2033,11 @@ void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN)
20282033 */
20292034MTM_LOG1 ("Node %d is almost caught-up: slot position %llx, WAL position %llx, active transactions %d" ,
20302035nodeId ,slotLSN ,walLSN ,Mtm -> nActiveTransactions );
2031- Assert (MyWalSnd != NULL );/* This function is called by WAL-sender, so it should not be NULL */
2032- BIT_SET (Mtm -> nodeLockerMask ,nodeId - 1 );
2033- BIT_SET (Mtm -> walSenderLockerMask ,MyWalSnd - WalSndCtl -> walsnds );
2034- Mtm -> nLockers += 1 ;
2036+ BIT_SET (Mtm -> originLockNodeMask ,nodeId - 1 );
20352037}else {
20362038MTM_LOG2 ("Continue recovery of node %d, slot position %llx, WAL position %llx,"
2037- " WAL sender position %llx, lockers %d , active transactions %d" ,nodeId ,slotLSN ,
2038- walLSN ,MyWalSnd -> sentPtr ,Mtm -> nLockers ,Mtm -> nActiveTransactions );
2039+ " WAL sender position %llx, lockers %llx , active transactions %d" ,nodeId ,slotLSN ,
2040+ walLSN ,MyWalSnd -> sentPtr ,Mtm -> orinLockNodeMask ,Mtm -> nActiveTransactions );
20392041}
20402042}
20412043MtmUnlock ();
@@ -2051,11 +2053,13 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
20512053bool caughtUp = false;
20522054MtmLock (LW_EXCLUSIVE );
20532055if (MtmIsRecoveredNode (nodeId )&& Mtm -> nActiveTransactions == 0 ) {
2054- if (BIT_CHECK (Mtm -> nodeLockerMask ,nodeId - 1 )) {
2056+ if (BIT_CHECK (Mtm -> originLockNodeMask ,nodeId - 1 )) {
20552057MTM_LOG1 ("Node %d is caught-up at WAL position %llx" ,nodeId ,walEndPtr );
2056- BIT_CLEAR (Mtm -> walSenderLockerMask ,MyWalSnd - WalSndCtl -> walsnds );
2057- BIT_CLEAR (Mtm -> nodeLockerMask ,nodeId - 1 );
2058- Mtm -> nLockers -= 1 ;
2058+ Assert (BIT_CHECK (Mtm -> disabledNodeMask ,nodeId - 1 ));
2059+ BIT_CLEAR (Mtm -> originLockNodeMask ,nodeId - 1 );
2060+ BIT_CLEAR (Mtm -> disabledNodeMask ,nodeId - 1 );
2061+ Mtm -> nLiveNodes += 1 ;
2062+ MtmCheckQuorum ();
20592063}else {
20602064MTM_LOG1 ("Node %d is caught-up at WAL position %llx without locking cluster" ,nodeId ,walEndPtr );
20612065/* We are lucky: caught-up without locking cluster! */
@@ -2082,40 +2086,44 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
20822086 * Prevent start of any new transactions at this node
20832087 */
20842088static void
2085- MtmSuspendNode (void )
2089+ MtmLockCluster (void )
20862090{
20872091timestamp_t delay = MIN_WAIT_TIMEOUT ;
2088- Assert (!MtmSuspended );
2092+ Assert (!MtmClusterLocked );
20892093MtmLock (LW_EXCLUSIVE );
2090- if (Mtm -> exclusiveLock ) {
2094+ if (BIT_CHECK ( Mtm -> originLockNodeMask , MtmNodeId - 1 ) ) {
20912095elog (ERROR ,"There is already pending exclusive lock" );
20922096}
2093- Mtm -> exclusiveLock = true;
2094- MtmSuspended = true;
2095- MTM_LOG2 ("Transaction %lld tries to suspend node at %lld insideTransaction=%d, active transactions=%lld" ,
2096- (long64 )MtmTx .xid ,MtmGetCurrentTime (),insideTransaction , (long64 )Mtm -> nRunningTransactions );
2097- while (Mtm -> nRunningTransactions != 1 ) {/* I am one */
2097+ BIT_SET (Mtm -> originLockNodeMask ,MtmNodeId - 1 );
2098+ MtmClusterLocked = true;
2099+ MTM_LOG1 ("Transaction %lld tries to lock cluster at %lld, running transactions=%lld" ,
2100+ (long64 )MtmTx .xid ,MtmGetCurrentTime (), (long64 )Mtm -> nRunningTransactions );
2101+ /* Wait until everything is locked */
2102+ while (Mtm -> nRunningTransactions != 1 /* I am one */
2103+ || ((((nodemask_t )1 <<Mtm -> nAllNodes )- 1 )& ~(Mtm -> currentLockNodeMask |Mtm -> originLockNodeMask )& ~Mtm -> disabledNodeMask )!= 0 )
2104+ {
20982105MtmUnlock ();
20992106MtmSleep (delay );
21002107if (delay * 2 <=MAX_WAIT_TIMEOUT ) {
21012108delay *=2 ;
21022109}
21032110MtmLock (LW_EXCLUSIVE );
21042111}
2105- MTM_LOG2 ("Transaction %lld suspended node at %lld, LSN %lld, active transactions=%lld" , (long64 )MtmTx .xid ,MtmGetCurrentTime (), (long64 )GetXLogInsertRecPtr (), (long64 )Mtm -> nRunningTransactions );
2112+ MTM_LOG1 ("Transaction %lld locked cluster at %lld, LSN %lld, active transactions=%lld" ,
2113+ (long64 )MtmTx .xid ,MtmGetCurrentTime (), (long64 )GetXLogInsertRecPtr (), (long64 )Mtm -> nRunningTransactions );
21062114MtmUnlock ();
21072115}
21082116
21092117/*
2110- *Resume transaction processing at node (blocked byMtmSuspendNode)
2118+ *Remove global cluster lock set byMtmLockCluster
21112119 */
21122120static void
2113- MtmResumeNode (void )
2121+ MtmUnlockCluster (void )
21142122{
21152123MtmLock (LW_EXCLUSIVE );
2116- MTM_LOG2 ("Transaction %lldresume node at %lld status %s LSN %lld" , (long64 )MtmTx .xid ,MtmGetCurrentTime (),MtmTxnStatusMnem [MtmTx .status ], (long64 )GetXLogInsertRecPtr ());
2117- Mtm -> exclusiveLock = false ;
2118- MtmSuspended = false;
2124+ MTM_LOG1 ("Transaction %lldunlock cluster at %lld status %s LSN %lld" , (long64 )MtmTx .xid ,MtmGetCurrentTime (),MtmTxnStatusMnem [MtmTx .status ], (long64 )GetXLogInsertRecPtr ());
2125+ BIT_CLEAR ( Mtm -> originLockNodeMask , MtmNodeId - 1 ) ;
2126+ MtmClusterLocked = false;
21192127MtmUnlock ();
21202128}
21212129
@@ -2128,33 +2136,15 @@ static void
21282136MtmCheckClusterLock ()
21292137{
21302138timestamp_t delay = MIN_WAIT_TIMEOUT ;
2131- while (true)
2132- {
2133- if (Mtm -> exclusiveLock || (Mtm -> globalLockerMask |Mtm -> walSenderLockerMask )) {
2134- /* some "almost cautch-up" wal-senders are still working. */
2135- /* Do not start new transactions until them are completed. */
2136- MtmUnlock ();
2137- MtmSleep (delay );
2138- if (delay * 2 <=MAX_WAIT_TIMEOUT ) {
2139- delay *=2 ;
2140- }
2141- MtmLock (LW_EXCLUSIVE );
2142- }else {
2143- if (Mtm -> nodeLockerMask != 0 ) {
2144- /* All lockers have synchronized their logs */
2145- /* Remove lock and mark them as recovered */
2146- MTM_LOG1 ("Complete recovery of %d nodes (node mask %llx)" ,Mtm -> nLockers ,Mtm -> nodeLockerMask );
2147- Assert (Mtm -> walSenderLockerMask == 0 );
2148- Assert ((Mtm -> nodeLockerMask & Mtm -> disabledNodeMask )== Mtm -> nodeLockerMask );
2149- Mtm -> disabledNodeMask &= ~Mtm -> nodeLockerMask ;
2150- Mtm -> nConfigChanges += 1 ;
2151- Mtm -> nLiveNodes += Mtm -> nLockers ;
2152- Mtm -> nLockers = 0 ;
2153- Mtm -> nodeLockerMask = 0 ;
2154- MtmCheckQuorum ();
2155- }
2156- break ;
2139+ while (Mtm -> originLockNodeMask |Mtm -> inducedLockNodeMask ) {
2140+ /* some "almost cautch-up" wal-senders are still working. */
2141+ /* Do not start new transactions until them are completed. */
2142+ MtmUnlock ();
2143+ MtmSleep (delay );
2144+ if (delay * 2 <=MAX_WAIT_TIMEOUT ) {
2145+ delay *=2 ;
21572146}
2147+ MtmLock (LW_EXCLUSIVE );
21582148}
21592149}
21602150
@@ -2548,13 +2538,11 @@ static void MtmInitialize()
25482538Mtm -> stoppedNodeMask = 0 ;
25492539Mtm -> pglogicalReceiverMask = 0 ;
25502540Mtm -> pglogicalSenderMask = 0 ;
2551- Mtm -> walSenderLockerMask = 0 ;
2552- Mtm -> globalLockerMask = 0 ;
2553- Mtm -> nodeLockerMask = 0 ;
2541+ Mtm -> inducedLockNodeMask = 0 ;
2542+ Mtm -> currentLockNodeMask = 0 ;
2543+ Mtm -> originLockNodeMask = 0 ;
25542544Mtm -> reconnectMask = 0 ;
25552545Mtm -> recoveredLSN = INVALID_LSN ;
2556- Mtm -> nLockers = 0 ;
2557- Mtm -> exclusiveLock = false;
25582546Mtm -> nActiveTransactions = 0 ;
25592547Mtm -> nRunningTransactions = 0 ;
25602548Mtm -> votingTransactions = NULL ;
@@ -3326,7 +3314,7 @@ void MtmReceiverStarted(int nodeId)
33263314if (++ Mtm -> nReceivers == Mtm -> nLiveNodes - 1 && Mtm -> nSenders == Mtm -> nLiveNodes - 1
33273315&& (Mtm -> status == MTM_RECOVERED || Mtm -> status == MTM_CONNECTED ))
33283316{
3329- BIT_CLEAR (Mtm -> nodeLockerMask ,MtmNodeId - 1 );/* recovery is completed: release cluster lock */
3317+ BIT_CLEAR (Mtm -> originLockNodeMask ,MtmNodeId - 1 );/* recovery is completed: release cluster lock */
33303318MtmSwitchClusterMode (MTM_ONLINE );
33313319}
33323320}
@@ -3656,7 +3644,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
36563644&& (Mtm -> status == MTM_RECOVERED || Mtm -> status == MTM_CONNECTED ))
36573645{
36583646/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
3659- BIT_CLEAR (Mtm -> nodeLockerMask ,MtmNodeId - 1 );/* recovery is completed: release cluster lock */
3647+ BIT_CLEAR (Mtm -> originLockNodeMask ,MtmNodeId - 1 );/* recovery is completed: release cluster lock */
36603648MtmSwitchClusterMode (MTM_ONLINE );
36613649}
36623650}
@@ -4070,7 +4058,7 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
40704058usrfctx -> values [3 ]= BoolGetDatum (BIT_CHECK (Mtm -> stalledNodeMask ,usrfctx -> nodeId - 1 ));
40714059usrfctx -> values [4 ]= BoolGetDatum (BIT_CHECK (Mtm -> stoppedNodeMask ,usrfctx -> nodeId - 1 ));
40724060
4073- usrfctx -> values [5 ]= BoolGetDatum (BIT_CHECK (Mtm -> nodeLockerMask ,usrfctx -> nodeId - 1 ));
4061+ usrfctx -> values [5 ]= BoolGetDatum (BIT_CHECK (Mtm -> originLockNodeMask ,usrfctx -> nodeId - 1 ));
40744062lag = MtmGetSlotLag (usrfctx -> nodeId );
40754063usrfctx -> values [6 ]= Int64GetDatum (lag );
40764064usrfctx -> nulls [6 ]= lag < 0 ;
@@ -4196,7 +4184,7 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
41964184values [1 ]= CStringGetTextDatum (MtmNodeStatusMnem [Mtm -> status ]);
41974185values [2 ]= Int64GetDatum (Mtm -> disabledNodeMask );
41984186values [3 ]= Int64GetDatum (SELF_CONNECTIVITY_MASK );
4199- values [4 ]= Int64GetDatum (Mtm -> nodeLockerMask );
4187+ values [4 ]= Int64GetDatum (Mtm -> originLockNodeMask );
42004188values [5 ]= Int32GetDatum (Mtm -> nLiveNodes );
42014189values [6 ]= Int32GetDatum (Mtm -> nAllNodes );
42024190values [7 ]= Int32GetDatum ((int )Mtm -> pool .active );
@@ -5032,7 +5020,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
50325020
50335021case T_TruncateStmt :
50345022skipCommand = false;
5035- MtmSuspendNode ();
5023+ MtmLockCluster ();
50365024break ;
50375025
50385026case T_DropStmt :