@@ -1103,7 +1103,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
11031103static void
11041104MtmEndTransaction (MtmCurrentTrans * x ,bool commit )
11051105{
1106- MTM_LOG1 ("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s" ,
1106+ MTM_LOG2 ("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s" ,
11071107MyProcPid ,x -> xid ,x -> isPrepared ,x -> isReplicated ,x -> isDistributed ,x -> isTwoPhase ,x -> gid ,commit ?"commit" :"abort" );
11081108if (x -> status != TRANSACTION_STATUS_ABORTED && x -> isDistributed && (x -> isPrepared || x -> isReplicated )&& !x -> isTwoPhase ) {
11091109MtmTransState * ts = NULL ;
@@ -1121,7 +1121,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11211121}
11221122if (ts != NULL ) {
11231123if (* ts -> gid )
1124- MTM_LOG1 ("TRANSLOG: %s transaction %s status %d" , (commit ?"commit" :"rollback" ),ts -> gid ,ts -> status );
1124+ MTM_LOG2 ("TRANSLOG: %s transaction %s status %d" , (commit ?"commit" :"rollback" ),ts -> gid ,ts -> status );
11251125if (commit ) {
11261126if (!(ts -> status == TRANSACTION_STATUS_UNKNOWN
11271127|| (ts -> status == TRANSACTION_STATUS_IN_PROGRESS && Mtm -> status == MTM_RECOVERY )))
@@ -1176,6 +1176,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11761176Mtm -> nActiveTransactions -= 1 ;
11771177}
11781178MtmTransactionListAppend (ts );
1179+ if (* x -> gid ) {
1180+ LogLogicalMessage ("A" ,x -> gid ,strlen (x -> gid )+ 1 , false);
1181+ }
11791182}
11801183MtmSend2PCMessage (ts ,MSG_ABORTED );/* send notification to coordinator */
11811184}else if (x -> status == TRANSACTION_STATUS_ABORTED && x -> isReplicated && !x -> isPrepared ) {
@@ -1229,7 +1232,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12291232MtmSendMessage (& msg );
12301233}
12311234}
1232- }else {
1235+ }else if (! BIT_CHECK ( Mtm -> disabledNodeMask , ts -> gtid . node - 1 )) {
12331236msg .node = ts -> gtid .node ;
12341237msg .dxid = ts -> gtid .xid ;
12351238MtmSendMessage (& msg );
@@ -1435,7 +1438,7 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
14351438Assert (ts -> gid [0 ]);
14361439if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
14371440elog (LOG ,"Abort transaction %s because its coordinator is disabled and it is not prepared at node %d" ,ts -> gid ,MtmNodeId );
1438- MtmFinishPreparedTransaction (disabledNodeId , ts , false);
1441+ MtmFinishPreparedTransaction (ts , false);
14391442}else {
14401443MTM_LOG1 ("Poll state of transaction %d (%s)" ,ts -> xid ,ts -> gid );
14411444MtmBroadcastPollMessage (ts );
@@ -1458,7 +1461,9 @@ static void MtmDisableNode(int nodeId)
14581461if (nodeId != MtmNodeId ) {
14591462Mtm -> nLiveNodes -= 1 ;
14601463}
1464+ MtmUnlock ();
14611465MtmPollStatusOfPreparedTransactions (nodeId );
1466+ MtmLock (LW_EXCLUSIVE );
14621467}
14631468
14641469static void MtmEnableNode (int nodeId )
@@ -2779,34 +2784,41 @@ void MtmReleaseRecoverySlot(int nodeId)
27792784}
27802785}
27812786
2782- void MtmFinishPreparedTransaction ( int nodeId , MtmTransState * ts , bool commit )
2787+ void MtmRollbackPreparedTransaction ( char const * gid )
27832788{
2789+ MTM_LOG1 ("Abort prepared transaction %s" ,gid );
2790+ if (MtmExchangeGlobalTransactionStatus (gid ,TRANSACTION_STATUS_ABORTED )== TRANSACTION_STATUS_UNKNOWN ) {
2791+ MTM_LOG1 ("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2" ,gid );
2792+ MtmResetTransaction ();
2793+ StartTransactionCommand ();
2794+ MtmBeginSession (MtmReplicationNodeId );
2795+ MtmSetCurrentTransactionGID (gid );
2796+ FinishPreparedTransaction (gid , false);
2797+ CommitTransactionCommand ();
2798+ MtmEndSession (MtmReplicationNodeId , true);
2799+ }
2800+ }
2801+
2802+
2803+ void MtmFinishPreparedTransaction (MtmTransState * ts ,bool commit )
2804+ {
2805+ if (Mtm -> nodes [MtmNodeId - 1 ].originId == InvalidRepOriginId ) {
2806+ /* This dummy origin is used for local commits/aborts which should not be replicated */
2807+ Mtm -> nodes [MtmNodeId - 1 ].originId = replorigin_create (psprintf (MULTIMASTER_SLOT_PATTERN ,MtmNodeId ));
2808+ }
27842809Assert (ts -> votingCompleted );
27852810Assert (!IsTransactionState ());
27862811MtmResetTransaction ();
27872812StartTransactionCommand ();
2788- MtmBeginSession (nodeId );
2813+ MtmBeginSession (MtmNodeId );
27892814MtmSetCurrentTransactionCSN (ts -> csn );
27902815MtmSetCurrentTransactionGID (ts -> gid );
27912816FinishPreparedTransaction (ts -> gid ,commit );
27922817CommitTransactionCommand ();
2793- MtmEndSession (nodeId , true);
2818+ MtmEndSession (MtmNodeId , true);
27942819Assert (ts -> status == commit ?TRANSACTION_STATUS_COMMITTED :TRANSACTION_STATUS_ABORTED );
27952820}
27962821
2797- #if 0
2798- static void MtmFinishAllPreparedTransactions (void )
2799- {
2800- MtmTransState * ts ;
2801- for (ts = Mtm -> transListHead ;ts != NULL ;ts = ts -> next ) {
2802- if (ts -> status != TRANSACTION_STATUS_COMMITTED && ts -> status != TRANSACTION_STATUS_ABORTED ) {
2803- MtmFinishPreparedTransaction (MtmReplicationNodeId ,ts , false);
2804- }
2805- }
2806- }
2807- #endif
2808-
2809-
28102822/*
28112823 * Determine when and how we should open replication slot.
28122824 * Druing recovery we need to open only one replication slot from which node should receive all transactions.
@@ -2840,11 +2852,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28402852Mtm -> nodes [i ].restartLsn = InvalidXLogRecPtr ;
28412853}
28422854MtmUnlock ();
2843- #if 0
2844- MtmBeginSession (MtmReplicationNodeId );
2845- FinishAllPreparedTransactions (false);
2846- MtmEndSession (MtmReplicationNodeId , true);
2847- #endif
28482855return REPLMODE_RECOVERY ;
28492856}
28502857}