@@ -583,7 +583,9 @@ MtmAdjustOldestXid(TransactionId xid)
583583
584584for (ts = Mtm -> transListHead ;
585585ts != NULL
586+ && (ts -> status == TRANSACTION_STATUS_ABORTED || ts -> status == TRANSACTION_STATUS_COMMITTED )
586587&& ts -> csn < oldestSnapshot
588+ && !ts -> isPinned
587589&& TransactionIdPrecedes (ts -> xid ,xid );
588590prev = ts ,ts = ts -> next )
589591{
@@ -653,6 +655,7 @@ static void MtmAddSubtransactions(MtmTransState* ts, TransactionId* subxids, int
653655sts = (MtmTransState * )hash_search (MtmXid2State ,& subxids [i ],HASH_ENTER ,& found );
654656Assert (!found );
655657sts -> isActive = false;
658+ sts -> isPinned = false;
656659sts -> status = ts -> status ;
657660sts -> csn = ts -> csn ;
658661sts -> votingCompleted = true;
@@ -814,6 +817,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
814817ts -> isLocal = true;
815818ts -> isPrepared = false;
816819ts -> isTwoPhase = x -> isTwoPhase ;
820+ ts -> isPinned = false;
817821ts -> votingCompleted = false;
818822if (!found ) {
819823ts -> isEnqueued = false;
@@ -963,8 +967,13 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
963967{
964968int result = 0 ;
965969int nConfigChanges = Mtm -> nConfigChanges ;
966- timestamp_t elapsed ,start = MtmGetSystemTime ();
967- timestamp_t deadline = 0 ;
970+ timestamp_t prepareTime = ts -> csn - ts -> snapshot ;
971+ timestamp_t timeout = Max (prepareTime + MSEC_TO_USEC (MtmMin2PCTimeout ),prepareTime * MtmMax2PCRatio /100 );
972+ timestamp_t deadline = MtmGetSystemTime ()+ timeout ;
973+ timestamp_t now ;
974+
975+ Assert (ts -> csn > ts -> snapshot );
976+
968977/* Wait votes from all nodes until: */
969978while (!MtmVotingCompleted (ts )
970979&& (ts -> isPrepared || nConfigChanges == Mtm -> nConfigChanges ))
@@ -980,19 +989,16 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
980989if (result & WL_LATCH_SET ) {
981990ResetLatch (& MyProc -> procLatch );
982991}
983- elapsed = MtmGetSystemTime ()- start ;
992+ now = MtmGetSystemTime ();
984993MtmLock (LW_EXCLUSIVE );
985- if (deadline == 0 && ts -> votedMask != 0 ) {
986- deadline = Max (MSEC_TO_USEC (MtmMin2PCTimeout ),elapsed * MtmMax2PCRatio /100 );
987- }else {
994+ if (now > deadline ) {
988995if (ts -> isPrepared ) {
989996/* resend precommit message */
990997MtmSend2PCMessage (ts ,MSG_PRECOMMIT );
991998}else {
992- if (elapsed > deadline ) {
993- elog (WARNING ,"Commit of distributed transaction is canceled because of %ld msec timeout expiration" ,USEC_TO_MSEC (elapsed ));
994- MtmAbortTransaction (ts );
995- }
999+ elog (WARNING ,"Commit of distributed transaction is canceled because of %ld msec timeout expiration" ,USEC_TO_MSEC (timeout ));
1000+ MtmAbortTransaction (ts );
1001+ break ;
9961002}
9971003}
9981004}
@@ -1005,7 +1011,7 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
10051011}else {
10061012if (Mtm -> status != MTM_ONLINE ) {
10071013elog (WARNING ,"Commit of distributed transaction is canceled because node is switched to %s mode" ,MtmNodeStatusMnem [Mtm -> status ]);
1008- }else if ( nConfigChanges != Mtm -> nConfigChanges ) {
1014+ }else {
10091015elog (WARNING ,"Commit of distributed transaction is canceled because cluster configuration was changed" );
10101016}
10111017MtmAbortTransaction (ts );
@@ -1202,6 +1208,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12021208ts -> status = TRANSACTION_STATUS_ABORTED ;
12031209ts -> isLocal = true;
12041210ts -> isPrepared = false;
1211+ ts -> isPinned = false;
12051212ts -> snapshot = x -> snapshot ;
12061213ts -> isTwoPhase = x -> isTwoPhase ;
12071214ts -> csn = MtmAssignCSN ();
@@ -1280,7 +1287,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12801287}
12811288}
12821289
1283- void MtmBroadcastPollMessage (MtmTransState * ts )
1290+ static void MtmBroadcastPollMessage (MtmTransState * ts )
12841291{
12851292int i ;
12861293MtmArbiterMessage msg ;
@@ -1293,7 +1300,7 @@ void MtmBroadcastPollMessage(MtmTransState* ts)
12931300
12941301for (i = 0 ;i < Mtm -> nAllNodes ;i ++ )
12951302{
1296- if (BIT_CHECK (ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ ts -> votedMask ,i ))
1303+ if (BIT_CHECK (ts -> participantsMask & ~Mtm -> disabledNodeMask ,i ))
12971304{
12981305msg .node = i + 1 ;
12991306MTM_LOG3 ("Send request for transaction %s to node %d" ,msg .gid ,msg .node );
@@ -1480,15 +1487,17 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
14801487Assert (ts -> gid [0 ]);
14811488if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
14821489elog (LOG ,"Abort transaction %s because its coordinator is disabled and it is not prepared at node %d" ,ts -> gid ,MtmNodeId );
1483- //MtmUnlock();
1490+ ts -> isPinned = true;
1491+ MtmUnlock ();
14841492MtmFinishPreparedTransaction (ts , false);
1485- //MtmLock(LW_EXCLUSIVE);
1493+ MtmLock (LW_EXCLUSIVE );
1494+ ts -> isPinned = false;
14861495}else {
14871496MTM_LOG1 ("Poll state of transaction %d (%s)" ,ts -> xid ,ts -> gid );
14881497MtmBroadcastPollMessage (ts );
14891498}
14901499}else {
1491- MTM_LOG2 ("Skip transaction %d (%s) with status %d gtid.node=%d gtid.xid=%d votedMask=%lx" ,
1500+ MTM_LOG1 ("Skip transaction %d (%s) with status %d gtid.node=%d gtid.xid=%d votedMask=%lx" ,
14921501ts -> xid ,ts -> gid ,ts -> status ,ts -> gtid .node ,ts -> gtid .xid ,ts -> votedMask );
14931502}
14941503}
@@ -3216,8 +3225,13 @@ bool MtmFilterTransaction(char* record, int size)
32163225duplicate = true;
32173226}
32183227
3219- MTM_LOG2 ("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx" ,
3220- duplicate ?"Ignore" :"Apply" ,gid ,replication_node ,end_lsn ,flags ,origin_node ,origin_lsn ,restart_lsn );
3228+ if (duplicate ) {
3229+ MTM_LOG1 ("Ignore transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx" ,
3230+ gid ,replication_node ,end_lsn ,flags ,origin_node ,origin_lsn ,restart_lsn );
3231+ }else {
3232+ MTM_LOG2 ("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx" ,
3233+ gid ,replication_node ,end_lsn ,flags ,origin_node ,origin_lsn ,restart_lsn );
3234+ }
32213235return duplicate ;
32223236}
32233237
@@ -3831,7 +3845,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
38313845}else {
38323846CommitTransactionCommand ();
38333847if (x -> isSuspended ) {
3834- elog (WARNING ,"Transaction %s is left in prepared state because coordinatoronde is not online" ,x -> gid );
3848+ elog (WARNING ,"Transaction %s is left in prepared state because coordinatornode is not online" ,x -> gid );
38353849}else {
38363850StartTransactionCommand ();
38373851if (x -> status == TRANSACTION_STATUS_ABORTED ) {