@@ -159,6 +159,9 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
159159static void MtmBroadcastUtilityStmt (char const * sql ,bool ignoreError );
160160static void MtmProcessDDLCommand (char const * queryString ,bool transactional );
161161
162+ static void MtmSuspendNode (void );
163+ static void MtmResumeNode (void );
164+
162165MtmState * Mtm ;
163166
164167VacuumStmt * MtmVacuumStmt ;
@@ -251,6 +254,7 @@ static bool MtmIgnoreTablesWithoutPk;
251254static int MtmLockCount ;
252255static bool MtmMajorNode ;
253256static bool MtmBreakConnection ;
257+ static bool MtmSuspended ;
254258
255259static ExecutorStart_hook_type PreviousExecutorStartHook ;
256260static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
@@ -272,8 +276,11 @@ static bool MtmAtExitHookRegistered = false;
272276 * This function is called when backend is terminated because of critical error or when error is catched
273277 * by FINALLY block
274278 */
275- void MtmReleaseLock (void )
279+ void MtmReleaseLocks (void )
276280{
281+ if (MtmSuspended ) {
282+ MtmResumeNode ();
283+ }
277284if (MtmLockCount != 0 ) {
278285Assert (Mtm -> lastLockHolder == MyProcPid );
279286MtmLockCount = 0 ;
@@ -296,7 +303,7 @@ void MtmLock(LWLockMode mode)
296303{
297304timestamp_t start ,stop ;
298305if (!MtmAtExitHookRegistered ) {
299- atexit (MtmReleaseLock );
306+ atexit (MtmReleaseLocks );
300307MtmAtExitHookRegistered = true;
301308}
302309if (mode == LW_EXCLUSIVE || MtmLockCount != 0 ) {
@@ -848,7 +855,6 @@ MtmBeginTransaction(MtmCurrentTrans* x)
848855MTM_ELOG (MtmBreakConnection ?FATAL :ERROR ,"Multimaster node is not online: current status %s" ,MtmNodeStatusMnem [Mtm -> status ]);
849856}
850857x -> containsDML = false;
851- x -> snapshot = MtmAssignCSN ();
852858x -> gtid .xid = InvalidTransactionId ;
853859x -> gid [0 ]= '\0' ;
854860x -> status = TRANSACTION_STATUS_IN_PROGRESS ;
@@ -858,9 +864,14 @@ MtmBeginTransaction(MtmCurrentTrans* x)
858864 * Allow applying of replicated transactions to avoid deadlock (to caught-up we need active transaction counter to become zero).
859865 * Also allow user to complete explicit 2PC transactions.
860866 */
861- if (x -> isDistributed && !x -> isReplicated && !x -> isTwoPhase && strcmp (application_name ,MULTIMASTER_ADMIN )!= 0 ) {
867+ if (x -> isDistributed
868+ && (Mtm -> exclusiveLock || (!x -> isReplicated && !x -> isTwoPhase ))
869+ && !MtmSuspended
870+ && strcmp (application_name ,MULTIMASTER_ADMIN )!= 0 )
871+ {
862872MtmCheckClusterLock ();
863873}
874+ x -> snapshot = MtmAssignCSN ();
864875
865876MtmUnlock ();
866877
@@ -1319,6 +1330,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
13191330{
13201331MTM_LOG2 ("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s" ,
13211332MyProcPid ,x -> xid ,x -> isPrepared ,x -> isReplicated ,x -> isDistributed ,x -> isTwoPhase ,x -> gid ,commit ?"commit" :"abort" );
1333+ if (MtmSuspended ) {
1334+ MtmResumeNode ();
1335+ }
13221336commit &= (x -> status != TRANSACTION_STATUS_ABORTED );
13231337if (x -> isDistributed && (x -> isPrepared || x -> isReplicated )&& !x -> isTwoPhase ) {
13241338MtmTransState * ts = NULL ;
@@ -2038,7 +2052,44 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
20382052/* ??? Something else to do here? */
20392053}
20402054
2055+ /*
2056+ * Prevent start of any new transactions at this node
2057+ */
2058+ static void
2059+ MtmSuspendNode (void )
2060+ {
2061+ timestamp_t delay = MIN_WAIT_TIMEOUT ;
2062+ bool insideTransaction = MtmTx .isActive ;
2063+ Assert (!MtmSuspended );
2064+ MtmLock (LW_EXCLUSIVE );
2065+ if (Mtm -> exclusiveLock ) {
2066+ elog (ERROR ,"There is already pending exclusive lock" );
2067+ }
2068+ Mtm -> exclusiveLock = true;
2069+ MtmSuspended = true;
2070+ while (Mtm -> nActiveTransactions != insideTransaction ) {
2071+ MtmUnlock ();
2072+ MtmSleep (delay );
2073+ if (delay * 2 <=MAX_WAIT_TIMEOUT ) {
2074+ delay *=2 ;
2075+ }
2076+ MtmLock (LW_EXCLUSIVE );
2077+ }
2078+ MtmUnlock ();
2079+ }
20412080
2081+ /*
2082+ * Resume transaction processing at node (blocked by MtmSuspendNode)
2083+ */
2084+ static void
2085+ MtmResumeNode (void )
2086+ {
2087+ MtmLock (LW_EXCLUSIVE );
2088+ Mtm -> exclusiveLock = false;
2089+ MtmSuspended = false;
2090+ MtmUnlock ();
2091+ }
2092+
20422093/*
20432094 * If there are recovering nodes which are catching-up WAL, check the status and prevent new transaction from commit to give
20442095 * WAL-sender a chance to catch-up WAL, completely synchronize replica and switch it to normal mode.
@@ -2050,7 +2101,7 @@ MtmCheckClusterLock()
20502101timestamp_t delay = MIN_WAIT_TIMEOUT ;
20512102while (true)
20522103{
2053- if (Mtm -> globalLockerMask |Mtm -> walSenderLockerMask ) {
2104+ if (Mtm -> exclusiveLock || ( Mtm -> globalLockerMask |Mtm -> walSenderLockerMask ) ) {
20542105/* some "almost cautch-up" wal-senders are still working. */
20552106/* Do not start new transactions until them are completed. */
20562107MtmUnlock ();
@@ -2474,6 +2525,7 @@ static void MtmInitialize()
24742525Mtm -> reconnectMask = 0 ;
24752526Mtm -> recoveredLSN = INVALID_LSN ;
24762527Mtm -> nLockers = 0 ;
2528+ Mtm -> exclusiveLock = false;
24772529Mtm -> nActiveTransactions = 0 ;
24782530Mtm -> votingTransactions = NULL ;
24792531Mtm -> transListHead = NULL ;
@@ -4944,8 +4996,12 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
49444996}
49454997break ;
49464998
4947- case T_DropStmt :
49484999case T_TruncateStmt :
5000+ skipCommand = false;
5001+ MtmSuspendNode ();
5002+ break ;
5003+
5004+ case T_DropStmt :
49495005{
49505006DropStmt * stmt = (DropStmt * )parsetree ;
49515007if (stmt -> removeType == OBJECT_INDEX && stmt -> concurrent )