@@ -83,6 +83,8 @@ typedef struct {
8383bool isReplicated ;/* transaction on replica */
8484bool isDistributed ;/* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
8585bool isPrepared ;/* transaction is perpared at first stage of 2PC */
86+ bool isSuspended ;/* prepared transaction is suspended because coordinator node is switch to offline */
87+ bool isTransactionBlock ;/* is transaction block */
8688bool containsDML ;/* transaction contains DML statements */
8789XidStatus status ;/* transaction status */
8890csn_t snapshot ;/* transaction snaphsot */
@@ -711,7 +713,7 @@ MtmXactCallback(XactEvent event, void *arg)
711713}
712714
713715/*
714- * Check if this is "normal" usertrnsaction which should be distributed to other nodes
716+ * Check if this is "normal" usertransaction which should be distributed to other nodes
715717 */
716718static bool
717719MtmIsUserTransaction ()
@@ -733,6 +735,7 @@ MtmResetTransaction()
733735x -> gtid .xid = InvalidTransactionId ;
734736x -> isDistributed = false;
735737x -> isPrepared = false;
738+ x -> isSuspended = false;
736739x -> isTwoPhase = false;
737740x -> csn =
738741x -> status = TRANSACTION_STATUS_UNKNOWN ;
@@ -762,6 +765,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
762765x -> isReplicated = MtmIsLogicalReceiver ;
763766x -> isDistributed = MtmIsUserTransaction ();
764767x -> isPrepared = false;
768+ x -> isSuspended = false;
765769x -> isTwoPhase = false;
766770x -> isTransactionBlock = IsTransactionBlock ();
767771/* Application name can be changed usnig PGAPPNAME environment variable */
@@ -1003,14 +1007,18 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10031007}
10041008if (ts -> status != TRANSACTION_STATUS_ABORTED && !ts -> votingCompleted ) {
10051009if (ts -> isPrepared ) {
1006- elog (ERROR ,"Commit of distributed transaction %s is suspended because node is switched to %s mode" ,ts -> gid ,MtmNodeStatusMnem [Mtm -> status ]);
1007- }
1008- if (Mtm -> status != MTM_ONLINE ) {
1009- elog (WARNING ,"Commit of distributed transaction is canceled because node is switched to %s mode" ,MtmNodeStatusMnem [Mtm -> status ]);
1010- }else {
1011- elog (WARNING ,"Commit of distributed transaction is canceled because cluster configuration was changed" );
1010+ // GetNewTransactionId(false); /* force increment of transaction counter */
1011+ // elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1012+ elog (WARNING ,"Commit of distributed transaction %s is suspended because node is switched to %s mode" ,ts -> gid ,MtmNodeStatusMnem [Mtm -> status ]);
1013+ x -> isSuspended = true;
1014+ }else {
1015+ if (Mtm -> status != MTM_ONLINE ) {
1016+ elog (WARNING ,"Commit of distributed transaction is canceled because node is switched to %s mode" ,MtmNodeStatusMnem [Mtm -> status ]);
1017+ }else {
1018+ elog (WARNING ,"Commit of distributed transaction is canceled because cluster configuration was changed" );
1019+ }
1020+ MtmAbortTransaction (ts );
10121021}
1013- MtmAbortTransaction (ts );
10141022}
10151023x -> status = ts -> status ;
10161024MTM_LOG3 ("%d: Result of vote: %d" ,MyProcPid ,ts -> status );
@@ -1077,14 +1085,18 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10771085}
10781086if (ts -> status != TRANSACTION_STATUS_ABORTED && !ts -> votingCompleted ) {
10791087if (ts -> isPrepared ) {
1080- elog (ERROR ,"Commit of distributed transaction %s is suspended because node is switched to %s mode" ,ts -> gid ,MtmNodeStatusMnem [Mtm -> status ]);
1081- }
1082- if (Mtm -> status != MTM_ONLINE ) {
1083- elog (WARNING ,"Commit of distributed transaction is canceled because node is switched to %s mode" ,MtmNodeStatusMnem [Mtm -> status ]);
1084- }else {
1085- elog (WARNING ,"Commit of distributed transaction is canceled because cluster configuration was changed" );
1088+ // GetNewTransactionId(false); /* force increment of transaction counter */
1089+ // elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1090+ elog (WARNING ,"Commit of distributed transaction %s is suspended because node is switched to %s mode" ,ts -> gid ,MtmNodeStatusMnem [Mtm -> status ]);
1091+ x -> isSuspended = true;
1092+ }else {
1093+ if (Mtm -> status != MTM_ONLINE ) {
1094+ elog (WARNING ,"Commit of distributed transaction is canceled because node is switched to %s mode" ,MtmNodeStatusMnem [Mtm -> status ]);
1095+ }else {
1096+ elog (WARNING ,"Commit of distributed transaction is canceled because cluster configuration was changed" );
1097+ }
1098+ MtmAbortTransaction (ts );
10861099}
1087- MtmAbortTransaction (ts );
10881100}
10891101x -> status = ts -> status ;
10901102x -> xid = ts -> xid ;
@@ -1292,6 +1304,7 @@ static void MtmStartRecovery()
12921304MtmLock (LW_EXCLUSIVE );
12931305BIT_SET (Mtm -> disabledNodeMask ,MtmNodeId - 1 );
12941306MtmSwitchClusterMode (MTM_RECOVERY );
1307+ Mtm -> recoveredLSN = InvalidXLogRecPtr ;
12951308MtmUnlock ();
12961309}
12971310
@@ -1603,6 +1616,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
16031616MTM_LOG1 ("%d: node %d is caugth-up without locking cluster" ,MyProcPid ,nodeId );
16041617/* We are lucky: caugth-up without locking cluster! */
16051618}
1619+ Mtm -> recoveredLSN = walLSN ;
16061620MtmEnableNode (nodeId );
16071621Mtm -> nConfigChanges += 1 ;
16081622caughtUp = true;
@@ -2074,6 +2088,7 @@ static void MtmInitialize()
20742088Mtm -> walSenderLockerMask = 0 ;
20752089Mtm -> nodeLockerMask = 0 ;
20762090Mtm -> reconnectMask = 0 ;
2091+ Mtm -> recoveredLSN = InvalidXLogRecPtr ;
20772092Mtm -> nLockers = 0 ;
20782093Mtm -> nActiveTransactions = 0 ;
20792094Mtm -> votingTransactions = NULL ;
@@ -2101,13 +2116,14 @@ static void MtmInitialize()
21012116Mtm -> nodes [i ].con = MtmConnections [i ];
21022117Mtm -> nodes [i ].flushPos = 0 ;
21032118Mtm -> nodes [i ].lastHeartbeat = 0 ;
2104- Mtm -> nodes [i ].restartLsn = 0 ;
2119+ Mtm -> nodes [i ].restartLSN = InvalidXLogRecPtr ;
21052120Mtm -> nodes [i ].originId = InvalidRepOriginId ;
21062121Mtm -> nodes [i ].timeline = 0 ;
2122+ Mtm -> nodes [i ].recoveredLSN = InvalidXLogRecPtr ;
21072123}
21082124Mtm -> nodes [MtmNodeId - 1 ].originId = DoNotReplicateId ;
21092125/* All transaction originated from the current node should be ignored during recovery */
2110- Mtm -> nodes [MtmNodeId - 1 ].restartLsn = (XLogRecPtr )PG_UINT64_MAX ;
2126+ Mtm -> nodes [MtmNodeId - 1 ].restartLSN = (XLogRecPtr )PG_UINT64_MAX ;
21112127PGSemaphoreCreate (& Mtm -> sendSemaphore );
21122128PGSemaphoreReset (& Mtm -> sendSemaphore );
21132129SpinLockInit (& Mtm -> spinlock );
@@ -2851,18 +2867,21 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28512867 */
28522868MtmReplicationMode MtmGetReplicationMode (int nodeId ,sig_atomic_t volatile * shutdown )
28532869{
2854- bool recovery = false ;
2870+ MtmReplicationMode mode = REPLMODE_OPEN_EXISTED ;
28552871
2856- while (Mtm -> status != MTM_CONNECTED && Mtm -> status != MTM_ONLINE )
2872+ while (( Mtm -> status != MTM_CONNECTED && Mtm -> status != MTM_ONLINE ) || BIT_CHECK ( Mtm -> disabledNodeMask , nodeId - 1 ) )
28572873{
28582874if (* shutdown )
28592875{
28602876return REPLMODE_EXIT ;
28612877}
2862- MTM_LOG2 ("%d: receiver slot mode %s" ,MyProcPid ,MtmNodeStatusMnem [Mtm -> status ]);
28632878MtmLock (LW_EXCLUSIVE );
2879+ if (BIT_CHECK (Mtm -> disabledNodeMask ,nodeId - 1 )) {
2880+ mode = REPLMODE_CREATE_NEW ;
2881+ }
2882+ MTM_LOG2 ("%d: receiver slot mode %s" ,MyProcPid ,MtmNodeStatusMnem [Mtm -> status ]);
28642883if (Mtm -> status == MTM_RECOVERY ) {
2865- recovery = true ;
2884+ mode = REPLMODE_RECOVERED ;
28662885if ((Mtm -> recoverySlot == 0 && (Mtm -> donorNodeId == MtmNodeId || Mtm -> donorNodeId == nodeId ))
28672886|| Mtm -> recoverySlot == nodeId )
28682887{
@@ -2880,13 +2899,14 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28802899/* delay opening of other slots until recovery is completed */
28812900MtmSleep (STATUS_POLL_DELAY );
28822901}
2883- if (recovery ) {
2902+ if (mode == REPLMODE_RECOVERED ) {
28842903MTM_LOG1 ("%d: Restart replication from node %d after end of recovery" ,MyProcPid ,nodeId );
2904+ }else if (mode == REPLMODE_CREATE_NEW ) {
2905+ MTM_LOG1 ("%d: Start replication from recovered node %d" ,MyProcPid ,nodeId );
28852906}else {
28862907MTM_LOG1 ("%d: Continue replication from node %d" ,MyProcPid ,nodeId );
28872908}
2888- /* After recovery completion we need to drop all other slots to avoid receive of redundant data */
2889- return recovery ?REPLMODE_RECOVERED :REPLMODE_NORMAL ;
2909+ return mode ;
28902910}
28912911
28922912static bool MtmIsBroadcast ()
@@ -2965,7 +2985,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
29652985MtmIsRecoverySession = true;
29662986}else if (strcmp (strVal (elem -> arg ),"recovered" )== 0 ) {
29672987recoveryCompleted = true;
2968- }else if (strcmp (strVal (elem -> arg ),"normal " )!= 0 ) {
2988+ }else if (strcmp (strVal (elem -> arg ),"open_existed" ) != 0 && strcmp ( strVal ( elem -> arg ), "create_new " )!= 0 ) {
29692989elog (ERROR ,"Illegal recovery mode %s" ,strVal (elem -> arg ));
29702990}
29712991}else {
@@ -2977,14 +2997,20 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
29772997}else {
29782998elog (ERROR ,"Restart position is not specified" );
29792999}
3000+ }else if (strcmp ("mtm_recovered_pos" ,elem -> defname )== 0 ) {
3001+ if (elem -> arg != NULL && strVal (elem -> arg )!= NULL ) {
3002+ sscanf (strVal (elem -> arg ),"%lx" ,& Mtm -> nodes [MtmReplicationNodeId - 1 ].recoveredLSN );
3003+ }else {
3004+ elog (ERROR ,"Recovered position is not specified" );
3005+ }
29803006}
29813007}
29823008MtmLock (LW_EXCLUSIVE );
29833009if (MtmIsRecoverySession ) {
29843010MTM_LOG1 ("%d: Node %d start recovery of node %d at position %lx" ,MyProcPid ,MtmNodeId ,MtmReplicationNodeId ,recoveryStartPos );
29853011Assert (MyReplicationSlot != NULL );
29863012if (recoveryStartPos < MyReplicationSlot -> data .restart_lsn ) {
2987- elog (ERROR ,"Specified recovery start position %lx is beyond restart lsn %lx" ,recoveryStartPos ,MyReplicationSlot -> data .restart_lsn );
3013+ elog (WARNING ,"Specified recovery start position %lx is beyond restart lsn %lx" ,recoveryStartPos ,MyReplicationSlot -> data .restart_lsn );
29883014}
29893015if (!BIT_CHECK (Mtm -> disabledNodeMask ,MtmReplicationNodeId - 1 )) {
29903016MtmDisableNode (MtmReplicationNodeId );
@@ -3133,17 +3159,17 @@ bool MtmFilterTransaction(char* record, int size)
31333159default :
31343160break ;
31353161}
3136- duplicate = Mtm -> status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <=Mtm -> nodes [origin_node - 1 ].restartLsn ;
3162+ duplicate = Mtm -> status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <=Mtm -> nodes [origin_node - 1 ].restartLSN ;
31373163
31383164MTM_LOG1 ("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx" ,
3139- duplicate ?"Ignore" :"Apply" ,gid ,replication_node ,end_lsn ,origin_node ,origin_lsn ,Mtm -> nodes [origin_node - 1 ].restartLsn );
3165+ duplicate ?"Ignore" :"Apply" ,gid ,replication_node ,end_lsn ,origin_node ,origin_lsn ,Mtm -> nodes [origin_node - 1 ].restartLSN );
31403166if (Mtm -> status == MTM_RECOVERY ) {
3141- if (Mtm -> nodes [origin_node - 1 ].restartLsn < origin_lsn ) {
3142- Mtm -> nodes [origin_node - 1 ].restartLsn = origin_lsn ;
3167+ if (Mtm -> nodes [origin_node - 1 ].restartLSN < origin_lsn ) {
3168+ Mtm -> nodes [origin_node - 1 ].restartLSN = origin_lsn ;
31433169}
31443170}else {
3145- if (Mtm -> nodes [replication_node - 1 ].restartLsn < end_lsn ) {
3146- Mtm -> nodes [replication_node - 1 ].restartLsn = end_lsn ;
3171+ if (Mtm -> nodes [replication_node - 1 ].restartLSN < end_lsn ) {
3172+ Mtm -> nodes [replication_node - 1 ].restartLSN = end_lsn ;
31473173}
31483174}
31493175return duplicate ;
@@ -3758,12 +3784,16 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
37583784/* ??? Should we do explicit rollback */
37593785}else {
37603786CommitTransactionCommand ();
3761- StartTransactionCommand ();
3762- if (MtmGetCurrentTransactionStatus ()== TRANSACTION_STATUS_ABORTED ) {
3763- FinishPreparedTransaction (x -> gid , false);
3764- elog (ERROR ,"Transaction %s is aborted by DTM" ,x -> gid );
3765- }else {
3766- FinishPreparedTransaction (x -> gid , true);
3787+ if (x -> isSuspended ) {
3788+ elog (WARNING ,"Transaction %s is left in prepared state because coordinator onde is not online" ,x -> gid );
3789+ }else {
3790+ StartTransactionCommand ();
3791+ if (x -> status == TRANSACTION_STATUS_ABORTED ) {
3792+ FinishPreparedTransaction (x -> gid , false);
3793+ elog (ERROR ,"Transaction %s is aborted by DTM" ,x -> gid );
3794+ }else {
3795+ FinishPreparedTransaction (x -> gid , true);
3796+ }
37673797}
37683798}
37693799return true;