@@ -571,7 +571,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
571571MtmCheckClusterLock ();
572572
573573ts = hash_search (xid2state ,& x -> xid ,HASH_ENTER ,NULL );
574- ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
574+ ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
575+ /*
576+ * Invalid CSN prevent replication of transaction by logical replication
577+ */
575578ts -> snapshot = x -> isReplicated || !x -> containsDML ?INVALID_CSN :x -> snapshot ;
576579ts -> csn = MtmAssignCSN ();
577580ts -> gtid = x -> gtid ;
@@ -603,7 +606,7 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
603606MtmTransState * ts ;
604607
605608MtmLock (LW_EXCLUSIVE );
606- ts = hash_search (xid2state ,& x -> xid ,HASH_ENTER ,NULL );
609+ ts = hash_search (xid2state ,& x -> xid ,HASH_FIND ,NULL );
607610Assert (ts != NULL );
608611if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
609612ts -> status = TRANSACTION_STATUS_UNKNOWN ;
@@ -619,7 +622,7 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
619622}else {
620623/* wait N commits or just one ABORT */
621624ts -> nVotes += 1 ;/* I vote myself */
622- while (ts -> nVotes != dtm -> nNodes && ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
625+ while (ts -> nVotes != dtm -> nNodes && ts -> status != TRANSACTION_STATUS_ABORTED ) {
623626MtmUnlock ();
624627WaitLatch (& MyProc -> procLatch ,WL_LATCH_SET ,-1 );
625628ResetLatch (& MyProc -> procLatch );
@@ -628,6 +631,8 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
628631MtmUnlock ();
629632if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
630633elog (ERROR ,"Distributed transaction %d is rejected by DTM" ,x -> xid );
634+ }else {
635+ Assert (ts -> status == TRANSACTION_STATUS_UNKNOWN );
631636}
632637}
633638}
@@ -637,7 +642,7 @@ static void
637642MtmEndTransaction (MtmCurrentTrans * x ,bool commit )
638643{
639644MTM_TRACE ("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n" ,MyProcPid ,x -> xid ,x -> isPrepared ,x -> isDistributed ,commit ?"commit" :"abort" );
640- if (x -> isDistributed && (TransactionIdIsValid ( x -> xid ) || x -> isReplicated )) {
645+ if (x -> isDistributed && (x -> isPrepared || x -> isReplicated )) {
641646MtmTransState * ts ;
642647MtmLock (LW_EXCLUSIVE );
643648if (x -> isPrepared ) {
@@ -656,7 +661,11 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
656661}
657662}else {
658663ts -> status = TRANSACTION_STATUS_ABORTED ;
659- if (x -> isReplicated ) {
664+ if (x -> isReplicated && TransactionIdIsValid (x -> gtid .xid )) {
665+ /*
666+ * Send notification only of ABORT happens during transaction processing at replicas,
667+ * do not send notification if ABORT is receiver from master
668+ */
660669MtmSendNotificationMessage (ts );/* send notification to coordinator */
661670}
662671}
@@ -683,8 +692,6 @@ void MtmSendNotificationMessage(MtmTransState* ts)
683692}
684693}
685694
686-
687-
688695void MtmJoinTransaction (GlobalTransactionId * gtid ,csn_t globalSnapshot )
689696{
690697csn_t localSnapshot ;