@@ -116,14 +116,15 @@ static TransactionId MtmAdjustOldestXid(TransactionId xid);
116116static bool MtmDetectGlobalDeadLock (PGPROC * proc );
117117static void MtmAddSubtransactions (MtmTransState * ts ,TransactionId * subxids ,int nSubxids );
118118static char const * MtmGetName (void );
119- static void MtmCheckClusterLock ()
119+ static void MtmCheckClusterLock (void );
120+ static void MtmCheckSlots (void );
121+ static void MtmAddSubtransactions (MtmTransState * ts ,TransactionId * subxids ,int nSubxids );
120122
121123static void MtmShmemStartup (void );
122124
123125static BgwPool * MtmPoolConstructor (void );
124126static bool MtmRunUtilityStmt (PGconn * conn ,char const * sql );
125127static void MtmBroadcastUtilityStmt (char const * sql ,bool ignoreError );
126- static void MtmVoteForTransaction (MtmTransState * ts );
127128
128129static HTAB * xid2state ;
129130static HTAB * gid2xid ;
@@ -543,10 +544,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
543544 * Prepare transaction for two-phase commit.
544545 * This code is executed by PRE_PREPARE hook before PREPARE message is sent to replicas by logical replication
545546 */
547+ static void
546548MtmPrePrepareTransaction (MtmCurrentTrans * x )
547549{
548550MtmTransState * ts ;
549- int i ;
551+ TransactionId * subxids ;
550552
551553if (!x -> isDistributed ) {
552554return ;
@@ -575,9 +577,9 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
575577ts -> gtid = x -> gtid ;
576578ts -> procno = MyProc -> pgprocno ;
577579ts -> nVotes = 0 ;
578-
580+ ts -> nSubxids = xactGetCommittedChildren ( & subxids );
579581x -> isPrepared = true;
580- x -> csn = csn ;
582+ x -> csn = ts -> csn ;
581583
582584dtm -> transCount += 1 ;
583585
@@ -588,34 +590,36 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
588590ts -> gtid .node = MtmNodeId ;
589591}
590592MtmTransactionListAppend (ts );
593+ MtmAddSubtransactions (ts ,subxids ,ts -> nSubxids );
591594
592595MtmUnlock ();
593596
594597MTM_TRACE ("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n" ,MyProcPid ,x -> xid ,ts -> csn );
595598}
596599
600+ static void
597601MtmPrepareTransaction (MtmCurrentTrans * x )
598602{
599603MtmTransState * ts ;
600604
601605MtmLock (LW_EXCLUSIVE );
602606ts = hash_search (xid2state ,& x -> xid ,HASH_ENTER ,NULL );
603607Assert (ts != NULL );
604- if (ts -> status = TRANSACTION_STATUS_IN_PROGRESS ) {
608+ if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
605609ts -> status = TRANSACTION_STATUS_UNKNOWN ;
606610MtmAdjustSubtransactions (ts );
607611}
608612
609613if (!MtmIsCoordinator (ts )) {
610- MtmHashMap * hm = (MtmHashMap * )hash_search (gid2xid ,x -> gid ,HASH_ENTER ,NULL );
614+ MtmTransMap * hm = (MtmTransMap * )hash_search (gid2xid ,x -> gid ,HASH_ENTER ,NULL );
611615Assert (x -> gid [0 ]);
612616hm -> state = ts ;
613617MtmSendNotificationMessage (ts );/* send notification to coordinator */
614618MtmUnlock ();
615619}else {
616620/* wait N commits or just one ABORT */
617- ts -> nVotes += 1 ;
618- while (ts -> nVotes != dtm -> nNodes && ts -> status == TRANSACTION_STATUS_PROGRESS ) {
621+ ts -> nVotes += 1 ;/* I vote myself */
622+ while (ts -> nVotes != dtm -> nNodes && ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
619623MtmUnlock ();
620624WaitLatch (& MyProc -> procLatch ,WL_LATCH_SET ,-1 );
621625ResetLatch (& MyProc -> procLatch );
@@ -633,14 +637,14 @@ static void
633637MtmEndTransaction (MtmCurrentTrans * x ,bool commit )
634638{
635639MTM_TRACE ("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n" ,MyProcPid ,x -> xid ,x -> isPrepared ,x -> isDistributed ,commit ?"commit" :"abort" );
636- if (x -> isDistributed ) {
640+ if (x -> isDistributed && ( TransactionIdIsValid ( x -> xid ) || x -> isReplicated ) ) {
637641MtmTransState * ts ;
638642MtmLock (LW_EXCLUSIVE );
639643if (x -> isPrepared ) {
640644ts = hash_search (xid2state ,& x -> xid ,HASH_FIND ,NULL );
641645Assert (ts != NULL );
642646}else {
643- MtmHashMap * hm = (MtmHashMap * )hash_search (gid2xid ,x -> gid ,HASH_REMOVE ,NULL );
647+ MtmTransMap * hm = (MtmTransMap * )hash_search (gid2xid ,x -> gid ,HASH_REMOVE ,NULL );
644648Assert (hm != NULL );
645649ts = hm -> state ;
646650}
@@ -712,12 +716,18 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
712716
713717void MtmSetCurrentTransactionGID (char const * gid )
714718{
719+ MTM_TRACE ("Set current transaction GID %s\n" ,gid );
715720strcpy (dtmTx .gid ,gid );
721+ dtmTx .isDistributed = true;
722+ dtmTx .isReplicated = true;
716723}
717724
718725void MtmSetCurrentTransactionCSN (csn_t csn )
719726{
727+ MTM_TRACE ("Set current transaction CSN %ld\n" ,csn );
720728dtmTx .csn = csn ;
729+ dtmTx .isDistributed = true;
730+ dtmTx .isReplicated = true;
721731}
722732
723733/*
@@ -731,7 +741,8 @@ void MtmSetCurrentTransactionCSN(csn_t csn)
731741 * Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
732742 * WAL overflow
733743 */
734- static void MtmCheckSlots ()
744+ static void
745+ MtmCheckSlots ()
735746{
736747if (MtmMaxRecoveryLag != 0 && dtm -> disabledNodeMask != 0 )
737748{
@@ -1682,14 +1693,14 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
16821693{
16831694case TRANS_STMT_COMMIT :
16841695if (dtmTx .isDistributed && dtmTx .containsDML ) {
1685- char gid { MUTLIMASTER_MAX_GID_SIZE ];
1686- MtmGenerateGid (& gid );
1696+ char gid [ MULTIMASTER_MAX_GID_SIZE ];
1697+ MtmGenerateGid (gid );
16871698if (!IsTransactionBlock ()) {
16881699elog (WARNING ,"Start transaction block for %d" ,dtmTx .xid );
16891700CommitTransactionCommand ();
16901701StartTransactionCommand ();
16911702}
1692- if (!PrepareTransactionBlock (& gid ))
1703+ if (!PrepareTransactionBlock (gid ))
16931704{
16941705elog (WARNING ,"Failed to prepare transaction %s" ,gid );
16951706/* report unsuccessful commit in completionTag */