1515#include "miscadmin.h"
1616
1717#include "libpq-fe.h"
18+ #include "lib/stringinfo.h"
19+ #include "libpq/pqformat.h"
1820#include "common/username.h"
1921
2022#include "postmaster/postmaster.h"
@@ -925,7 +927,9 @@ MtmVotingCompleted(MtmTransState* ts)
925927ts -> votingCompleted = true;
926928ts -> status = TRANSACTION_STATUS_UNKNOWN ;
927929return true;
928- }else {
930+ }else {
931+ MTM_LOG1 ("Transaction %s is considered as prepared (status=%d participants=%lx disabled=%lx, voted=%lx)" ,
932+ ts -> gid ,ts -> status ,ts -> participantsMask ,Mtm -> disabledNodeMask ,ts -> votedMask );
929933ts -> isPrepared = true;
930934if (ts -> isTwoPhase ) {
931935ts -> votingCompleted = true;
@@ -979,9 +983,10 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
979983MtmResetTransaction ();
980984}else {
981985int result = 0 ;
982-
986+ int nConfigChanges = Mtm -> nConfigChanges ;
983987/* Wait votes from all nodes until: */
984- while (!MtmVotingCompleted (ts ))
988+ while (!MtmVotingCompleted (ts )
989+ && (ts -> isPrepared || nConfigChanges == Mtm -> nConfigChanges ))
985990{
986991MtmUnlock ();
987992MTM_TXTRACE (x ,"PostPrepareTransaction WaitLatch Start" );
@@ -997,8 +1002,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
9971002MtmLock (LW_EXCLUSIVE );
9981003}
9991004if (ts -> status != TRANSACTION_STATUS_ABORTED && !ts -> votingCompleted ) {
1005+ if (ts -> isPrepared ) {
1006+ elog (ERROR ,"Commit of distributed transaction %s is suspended because node is switched to %s mode" ,ts -> gid ,MtmNodeStatusMnem [Mtm -> status ]);
1007+ }
1008+ if (Mtm -> status != MTM_ONLINE ) {
1009+ elog (WARNING ,"Commit of distributed transaction is canceled because node is switched to %s mode" ,MtmNodeStatusMnem [Mtm -> status ]);
1010+ }else {
1011+ elog (WARNING ,"Commit of distributed transaction is canceled because cluster configuration was changed" );
1012+ }
10001013MtmAbortTransaction (ts );
1001- elog (WARNING ,"Commit of distributed transaction is canceled because node is switched to %s mode" ,MtmNodeStatusMnem [Mtm -> status ]);
10021014}
10031015x -> status = ts -> status ;
10041016MTM_LOG3 ("%d: Result of vote: %d" ,MyProcPid ,ts -> status );
@@ -1031,6 +1043,7 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10311043elog (WARNING ,"Global transaciton ID '%s' is not found" ,x -> gid );
10321044}else {
10331045int result = 0 ;
1046+ int nConfigChanges = Mtm -> nConfigChanges ;
10341047
10351048Assert (tm -> state != NULL );
10361049MTM_LOG3 ("Commit prepared transaction %d with gid='%s'" ,x -> xid ,x -> gid );
@@ -1045,7 +1058,8 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10451058MtmSend2PCMessage (ts ,MSG_PRECOMMIT );
10461059
10471060/* Wait votes from all nodes until: */
1048- while (!MtmVotingCompleted (ts ))
1061+ while (!MtmVotingCompleted (ts )
1062+ && (ts -> isPrepared || nConfigChanges == Mtm -> nConfigChanges ))
10491063{
10501064MtmUnlock ();
10511065MTM_TXTRACE (x ,"CommitPreparedTransaction WaitLatch Start" );
@@ -1062,8 +1076,15 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10621076}
10631077}
10641078if (ts -> status != TRANSACTION_STATUS_ABORTED && !ts -> votingCompleted ) {
1079+ if (ts -> isPrepared ) {
1080+ elog (ERROR ,"Commit of distributed transaction %s is suspended because node is switched to %s mode" ,ts -> gid ,MtmNodeStatusMnem [Mtm -> status ]);
1081+ }
1082+ if (Mtm -> status != MTM_ONLINE ) {
1083+ elog (WARNING ,"Commit of distributed transaction is canceled because node is switched to %s mode" ,MtmNodeStatusMnem [Mtm -> status ]);
1084+ }else {
1085+ elog (WARNING ,"Commit of distributed transaction is canceled because cluster configuration was changed" );
1086+ }
10651087MtmAbortTransaction (ts );
1066- elog (WARNING ,"Commit of distributed transaction is canceled because node is switched to %s mode" ,MtmNodeStatusMnem [Mtm -> status ]);
10671088}
10681089x -> status = ts -> status ;
10691090x -> xid = ts -> xid ;
@@ -1165,11 +1186,14 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11651186}
11661187ts -> status = TRANSACTION_STATUS_ABORTED ;
11671188ts -> isLocal = true;
1189+ ts -> isPrepared = false;
11681190ts -> snapshot = x -> snapshot ;
1191+ ts -> isTwoPhase = x -> isTwoPhase ;
11691192ts -> csn = MtmAssignCSN ();
11701193ts -> gtid = x -> gtid ;
11711194ts -> nSubxids = 0 ;
11721195ts -> votingCompleted = true;
1196+ strcpy (ts -> gid ,x -> gid );
11731197if (ts -> isActive ) {
11741198ts -> isActive = false;
11751199Assert (Mtm -> nActiveTransactions != 0 );
@@ -1225,8 +1249,9 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12251249int i ;
12261250for (i = 0 ;i < Mtm -> nAllNodes ;i ++ )
12271251{
1228- if (BIT_CHECK (ts -> participantsMask & ~Mtm -> disabledNodeMask ,i )&& TransactionIdIsValid ( ts -> xids [ i ]) )
1252+ if (BIT_CHECK (ts -> participantsMask & ~Mtm -> disabledNodeMask ,i ))
12291253{
1254+ Assert (TransactionIdIsValid (ts -> xids [i ]));
12301255msg .node = i + 1 ;
12311256msg .dxid = ts -> xids [i ];
12321257MtmSendMessage (& msg );
@@ -1654,7 +1679,7 @@ MtmCheckClusterLock()
16541679continue ;
16551680}else {
16561681/* All lockers are synchronized their logs */
1657- /* Remove lock and mark them asrceovered */
1682+ /* Remove lock and mark them asrecovered */
16581683MTM_LOG1 ("Complete recovery of %d nodes (node mask %lx)" ,Mtm -> nLockers , (long )Mtm -> nodeLockerMask );
16591684Assert (Mtm -> walSenderLockerMask == 0 );
16601685Assert ((Mtm -> nodeLockerMask & Mtm -> disabledNodeMask )== Mtm -> nodeLockerMask );
@@ -2081,6 +2106,8 @@ static void MtmInitialize()
20812106Mtm -> nodes [i ].timeline = 0 ;
20822107}
20832108Mtm -> nodes [MtmNodeId - 1 ].originId = DoNotReplicateId ;
2109+ /* All transaction originated from the current node should be ignored during recovery */
2110+ Mtm -> nodes [MtmNodeId - 1 ].restartLsn = (XLogRecPtr )PG_UINT64_MAX ;
20842111PGSemaphoreCreate (& Mtm -> sendSemaphore );
20852112PGSemaphoreReset (& Mtm -> sendSemaphore );
20862113SpinLockInit (& Mtm -> spinlock );
@@ -2807,12 +2834,7 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28072834Assert (!IsTransactionState ());
28082835MtmResetTransaction ();
28092836StartTransactionCommand ();
2810- #if 0
2811- if (Mtm -> nodes [MtmNodeId - 1 ].originId == InvalidRepOriginId ) {
2812- /* This dummy origin is used for local commits/aborts which should not be replicated */
2813- Mtm -> nodes [MtmNodeId - 1 ].originId = replorigin_create (psprintf (MULTIMASTER_SLOT_PATTERN ,MtmNodeId ));
2814- }
2815- #endif
2837+
28162838MtmBeginSession (MtmNodeId );
28172839MtmSetCurrentTransactionCSN (ts -> csn );
28182840MtmSetCurrentTransactionGID (ts -> gid );
@@ -2829,7 +2851,6 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28292851 */
28302852MtmReplicationMode MtmGetReplicationMode (int nodeId ,sig_atomic_t volatile * shutdown )
28312853{
2832- int i ;
28332854bool recovery = false;
28342855
28352856while (Mtm -> status != MTM_CONNECTED && Mtm -> status != MTM_ONLINE )
@@ -2851,9 +2872,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28512872Mtm -> nReceivers = 0 ;
28522873Mtm -> recoveryCount += 1 ;
28532874Mtm -> pglogicalNodeMask = 0 ;
2854- for (i = 0 ;i < Mtm -> nAllNodes ;i ++ ) {
2855- Mtm -> nodes [i ].restartLsn = InvalidXLogRecPtr ;
2856- }
28572875MtmUnlock ();
28582876return REPLMODE_RECOVERY ;
28592877}
@@ -3070,6 +3088,67 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
30703088return isDistributed ;
30713089}
30723090
3091+ bool MtmFilterTransaction (char * record ,int size )
3092+ {
3093+ StringInfoData s ;
3094+ uint8 flags ;
3095+ XLogRecPtr origin_lsn ;
3096+ XLogRecPtr end_lsn ;
3097+ int replication_node ;
3098+ int origin_node ;
3099+ char const * gid = "" ;
3100+ bool duplicate ;
3101+
3102+ s .data = record ;
3103+ s .len = size ;
3104+ s .maxlen = -1 ;
3105+ s .cursor = 0 ;
3106+
3107+ Assert (pq_getmsgbyte (& s )== 'C' );
3108+ flags = pq_getmsgbyte (& s );/* flags */
3109+ replication_node = pq_getmsgbyte (& s );
3110+
3111+ /* read fields */
3112+ pq_getmsgint64 (& s );/* commit_lsn */
3113+ end_lsn = pq_getmsgint64 (& s );/* end_lsn */
3114+ pq_getmsgint64 (& s );/* commit_time */
3115+
3116+ origin_node = pq_getmsgbyte (& s );
3117+ origin_lsn = pq_getmsgint64 (& s );
3118+
3119+ Assert (replication_node == MtmReplicationNodeId &&
3120+ origin_node != 0 &&
3121+ (Mtm -> status == MTM_RECOVERY || origin_node == replication_node ));
3122+
3123+ switch (PGLOGICAL_XACT_EVENT (flags ))
3124+ {
3125+ case PGLOGICAL_PREPARE :
3126+ case PGLOGICAL_ABORT_PREPARED :
3127+ gid = pq_getmsgstring (& s );
3128+ break ;
3129+ case PGLOGICAL_COMMIT_PREPARED :
3130+ pq_getmsgint64 (& s );/* CSN */
3131+ gid = pq_getmsgstring (& s );
3132+ break ;
3133+ default :
3134+ break ;
3135+ }
3136+ duplicate = Mtm -> status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <=Mtm -> nodes [origin_node - 1 ].restartLsn ;
3137+
3138+ MTM_LOG1 ("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx" ,
3139+ duplicate ?"Ignore" :"Apply" ,gid ,replication_node ,end_lsn ,origin_node ,origin_lsn ,Mtm -> nodes [origin_node - 1 ].restartLsn );
3140+ if (Mtm -> status == MTM_RECOVERY ) {
3141+ if (Mtm -> nodes [origin_node - 1 ].restartLsn < origin_lsn ) {
3142+ Mtm -> nodes [origin_node - 1 ].restartLsn = origin_lsn ;
3143+ }
3144+ }else {
3145+ if (Mtm -> nodes [replication_node - 1 ].restartLsn < end_lsn ) {
3146+ Mtm -> nodes [replication_node - 1 ].restartLsn = end_lsn ;
3147+ }
3148+ }
3149+ return duplicate ;
3150+ }
3151+
30733152void MtmSetupReplicationHooks (struct PGLogicalHooks * hooks )
30743153{
30753154hooks -> startup_hook = MtmReplicationStartupHook ;