@@ -667,6 +667,29 @@ MtmBeginTransaction(MtmCurrentTrans* x)
667667 }
668668}
669669
670+
671+ static MtmTransState *
672+ MtmCreateTransState (MtmCurrentTrans * x )
673+ {
674+ bool found ;
675+ MtmTransState * ts = hash_search (MtmXid2State ,& x -> xid ,HASH_ENTER ,& found );
676+ if (!found ) {
677+ ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
678+ ts -> snapshot = x -> snapshot ;
679+ if (TransactionIdIsValid (x -> gtid .xid )) {
680+ Assert (x -> gtid .node != MtmNodeId );
681+ ts -> gtid = x -> gtid ;
682+ }else {
683+ /* I am coordinator of transaction */
684+ ts -> gtid .xid = x -> xid ;
685+ ts -> gtid .node = MtmNodeId ;
686+ }
687+ }
688+ return ts ;
689+ }
690+
691+
692+
670693/*
671694 * Prepare transaction for two-phase commit.
672695 * This code is executed by PRE_PREPARE hook before PREPARE message is sent to replicas by logical replication
@@ -675,7 +698,7 @@ static void
675698MtmPrePrepareTransaction (MtmCurrentTrans * x )
676699{
677700MtmTransState * ts ;
678- TransactionId * subxids ;
701+ TransactionId * subxids ;
679702
680703if (!x -> isDistributed ) {
681704return ;
@@ -703,14 +726,12 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
703726MtmCheckClusterLock ();
704727}
705728
706- ts = hash_search (MtmXid2State ,& x -> xid ,HASH_ENTER ,NULL );
707- ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
729+ ts = MtmCreateTransState (x );
708730/*
709731 * Invalid CSN prevent replication of transaction by logical replication
710732 */
711733ts -> snapshot = x -> isReplicated || !x -> containsDML ?INVALID_CSN :x -> snapshot ;
712734ts -> csn = MtmAssignCSN ();
713- ts -> gtid = x -> gtid ;
714735ts -> procno = MyProc -> pgprocno ;
715736ts -> nVotes = 1 ;/* I am voted myself */
716737ts -> votingCompleted = false;
@@ -722,15 +743,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
722743x -> csn = ts -> csn ;
723744
724745Mtm -> transCount += 1 ;
725-
726- if (TransactionIdIsValid (x -> gtid .xid )) {
727- Assert (x -> gtid .node != MtmNodeId );
728- ts -> gtid = x -> gtid ;
729- }else {
730- /* I am coordinator of transaction */
731- ts -> gtid .xid = x -> xid ;
732- ts -> gtid .node = MtmNodeId ;
733- }
734746MtmTransactionListAppend (ts );
735747MtmAddSubtransactions (ts ,subxids ,ts -> nSubxids );
736748MTM_TRACE ("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)\n" ,
@@ -844,7 +856,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
844856MtmTransactionListAppend (ts );
845857}
846858MtmSendNotificationMessage (ts ,MSG_ABORTED );/* send notification to coordinator */
847- }
859+ }else if (x -> status == TRANSACTION_STATUS_ABORTED && x -> isReplicated && !x -> isPrepared ) {
860+ hash_search (MtmXid2State ,& x -> xid ,HASH_REMOVE ,NULL );
861+ }
848862MtmUnlock ();
849863}
850864MtmResetTransaction (x );
@@ -868,28 +882,32 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
868882
869883void MtmJoinTransaction (GlobalTransactionId * gtid ,csn_t globalSnapshot )
870884{
885+ MtmTx .gtid = * gtid ;
886+ MtmTx .xid = GetCurrentTransactionId ();
887+ MtmTx .isReplicated = true;
888+ MtmTx .isDistributed = true;
889+ MtmTx .containsDML = true;
890+
871891if (globalSnapshot != INVALID_CSN ) {
872892MtmLock (LW_EXCLUSIVE );
873893MtmSyncClock (globalSnapshot );
894+ MtmTx .snapshot = globalSnapshot ;
895+ if (Mtm -> status != MTM_RECOVERY ) {
896+ MtmCreateTransState (& MtmTx );/* we need local->remote xid mapping for deadlock detection */
897+ }
874898MtmUnlock ();
875899}else {
876900globalSnapshot = MtmTx .snapshot ;
877901}
878902if (!TransactionIdIsValid (gtid -> xid )) {
879903/* In case of recovery InvalidTransactionId is passed */
880904if (Mtm -> status != MTM_RECOVERY ) {
881- elog (PANIC ,"Node %d tries to recover node %d which is in %s mode" ,MtmReplicationNodeId ,MtmNodeId ,MtmNodeStatusMnem [Mtm -> status ]);
905+ elog (PANIC ,"Node %d tries to recover node %d which is in %s mode" ,gtid -> node ,MtmNodeId ,MtmNodeStatusMnem [Mtm -> status ]);
882906}
883907}else if (Mtm -> status == MTM_RECOVERY ) {
884908/* When recovery is completed we get normal transaction ID and switch to normal mode */
885909MtmRecoveryCompleted ();
886910}
887- MtmTx .gtid = * gtid ;
888- MtmTx .xid = GetCurrentTransactionId ();
889- MtmTx .snapshot = globalSnapshot ;
890- MtmTx .isReplicated = true;
891- MtmTx .isDistributed = true;
892- MtmTx .containsDML = true;
893911}
894912
895913void MtmSetCurrentTransactionGID (char const * gid )