@@ -538,7 +538,8 @@ MtmBeginTransaction(MtmCurrentTrans* x)
538538x -> gtid .xid = InvalidTransactionId ;
539539MtmUnlock ();
540540
541- MTM_TRACE ("MtmLocalTransaction: %s transaction %u uses local snapshot %lu\n" ,x -> isDistributed ?"distributed" :"local" ,x -> xid ,x -> snapshot );
541+ MTM_TRACE ("%d: MtmLocalTransaction: %s transaction %u uses local snapshot %lu\n" ,
542+ MyProcPid ,x -> isDistributed ?"distributed" :"local" ,x -> xid ,x -> snapshot );
542543 }
543544}
544545
@@ -583,7 +584,7 @@ MtmCheckClusterLock()
583584}else {
584585/* All lockers are synchronized their logs */
585586/* Remove lock and mark them as receovered */
586- elog (WARNING ,"Complete recovery of %d nodes (node mask %llx )" ,dtm -> nLockers ,dtm -> nodeLockerMask );
587+ elog (WARNING ,"Complete recovery of %d nodes (node mask %lx )" ,dtm -> nLockers ,dtm -> nodeLockerMask );
587588Assert (dtm -> walSenderLockerMask == 0 );
588589Assert ((dtm -> nodeLockerMask & dtm -> disabledNodeMask )== dtm -> nodeLockerMask );
589590dtm -> disabledNodeMask &= ~dtm -> nodeLockerMask ;
@@ -606,7 +607,7 @@ static void MtmPrecommitTransaction(MtmCurrentTrans* x)
606607MtmTransState * ts ;
607608int i ;
608609
609- if (!x -> isDistributed ) {
610+ if (!x -> isDistributed || x -> isPrepared ) {
610611return ;
611612}
612613
@@ -657,6 +658,7 @@ static void
657658MtmPrepareTransaction (MtmCurrentTrans * x )
658659{
659660MtmPrecommitTransaction (x );
661+ MTM_TRACE ("Prepare transaction %d" ,x -> xid );
660662x -> isPrepared = true;
661663}
662664
@@ -666,6 +668,7 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
666668TransactionId * subxids ;
667669int nSubxids ;
668670nSubxids = xactGetCommittedChildren (& subxids );
671+ MTM_TRACE ("%d: Commit prepared transaction %d\n" ,MyProcPid ,x -> xid );
669672if (!MtmCommitTransaction (x -> xid ,nSubxids ,subxids ))
670673{
671674elog (ERROR ,"Commit of transaction %d is rejected by DTM" ,x -> xid );
@@ -718,8 +721,12 @@ static int64 MtmGetSlotLag(int nodeId)
718721static void
719722MtmEndTransaction (MtmCurrentTrans * x ,bool commit )
720723{
721- if (x -> isDistributed && commit ) {
724+ MTM_TRACE ("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n" ,MyProcPid ,x -> xid ,x -> isPrepared ,x -> isDistributed ,commit ?"commit" :"abort" );
725+ if (x -> isDistributed && commit ) {
722726MtmTransState * ts ;
727+ if (x -> isPrepared ) {
728+ return ;
729+ }
723730MtmLock (LW_EXCLUSIVE );
724731ts = hash_search (xid2state ,& x -> xid ,HASH_FIND ,NULL );
725732Assert (ts != NULL );
@@ -1541,8 +1548,13 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
15411548switch (stmt -> kind )
15421549{
15431550case TRANS_STMT_COMMIT :
1544- if (MtmUse2PC ) {
1551+ if (MtmUse2PC && dtmTx . isDistributed && dtmTx . containsDML ) {
15451552char * gid = MtmGenerateGid ();
1553+ if (!IsTransactionBlock ()) {
1554+ elog (WARNING ,"Start transaction block for %d" ,dtmTx .xid );
1555+ CommitTransactionCommand ();
1556+ StartTransactionCommand ();
1557+ }
15461558if (!PrepareTransactionBlock (gid ))
15471559{
15481560elog (WARNING ,"Failed to prepare transaction %s" ,gid );
@@ -1826,7 +1838,7 @@ void MtmRefreshClusterStatus(bool nowait)
18261838
18271839clique = MtmFindMaxClique (matrix ,MtmNodes ,& clique_size );
18281840if (clique_size >=MtmNodes /2 + 1 ) {/* have quorum */
1829- elog (WARNING ,"Find clique %llx , disabledNodeMask %llx " ,clique ,dtm -> disabledNodeMask );
1841+ elog (WARNING ,"Find clique %lx , disabledNodeMask %lx " ,clique ,dtm -> disabledNodeMask );
18301842MtmLock (LW_EXCLUSIVE );
18311843mask = ~clique & (((nodemask_t )1 <<MtmNodes )- 1 )& ~dtm -> disabledNodeMask ;/* new disabled nodes mask */
18321844for (i = 0 ;mask != 0 ;i ++ ,mask >>=1 ) {
@@ -1853,7 +1865,7 @@ void MtmRefreshClusterStatus(bool nowait)
18531865MtmSwitchClusterMode (MTM_RECOVERY );
18541866}
18551867}else {
1856- elog (WARNING ,"Clique %llx has no quorum" ,clique );
1868+ elog (WARNING ,"Clique %lx has no quorum" ,clique );
18571869}
18581870}
18591871