@@ -194,7 +194,7 @@ static char const* MtmGetName(void)
194194Snapshot MtmGetSnapshot (Snapshot snapshot )
195195{
196196snapshot = PgGetSnapshotData (snapshot );
197- RecentGlobalDataXmin = RecentGlobalXmin = MtmAdjustOldestXid (RecentGlobalDataXmin );
197+ RecentGlobalDataXmin = RecentGlobalXmin = dtm -> oldestXid ; // MtmAdjustOldestXid(RecentGlobalDataXmin);
198198return snapshot ;
199199}
200200
@@ -499,10 +499,8 @@ void MtmSendNotificationMessage(MtmTransState* ts)
499499ts -> nextVoting = votingList ;
500500dtm -> votingTransactions = ts ;
501501SpinLockRelease (& dtm -> votingSpinlock );
502- MTM_TRACE ("Register commit message\n" );
503502if (votingList == NULL ) {
504503/* singal semaphore only once for the whole list */
505- MTM_TRACE ("Signal semaphore\n" );
506504PGSemaphoreUnlock (& dtm -> votingSemaphore );
507505}
508506}
@@ -519,9 +517,11 @@ MtmCommitTransaction(TransactionId xid, int nsubxids, TransactionId *subxids)
519517MtmTransactionListAppend (ts );
520518MtmAddSubtransactions (ts ,subxids ,nsubxids );
521519
520+ MtmVoteForTransaction (ts );
521+
522522LWLockRelease (dtm -> hashLock );
523523
524- MtmVoteForTransaction ( ts );
524+ MTM_TRACE ( "%d: MtmCommitTransaction status=%d\n" , getpid (), ts -> status );
525525
526526return ts -> status == TRANSACTION_STATUS_COMMITTED ;
527527}
@@ -530,15 +530,32 @@ static void
530530MtmFinishTransaction (TransactionId xid ,int nsubxids ,TransactionId * subxids ,XidStatus status )
531531{
532532MtmTransState * ts ;
533+ MtmCurrentTrans * x = & dtmTx ;
534+ bool found ;
533535
534536LWLockAcquire (dtm -> hashLock ,LW_EXCLUSIVE );
535- ts = hash_search (xid2state ,& xid ,HASH_FIND , NULL );
536- if (ts != NULL ) {
537+ ts = hash_search (xid2state ,& xid ,HASH_ENTER , & found );
538+ if (! found ) {
537539ts -> status = status ;
538- MtmAdjustSubtransactions (ts );
539- if (dtmTx .isReplicated ) {
540- MtmSendNotificationMessage (ts );
540+ ts -> csn = MtmAssignCSN ();
541+ ts -> procno = MyProc -> pgprocno ;
542+ ts -> snapshot = INVALID_CSN ;
543+ if (!TransactionIdIsValid (x -> gtid .xid ))
544+ {
545+ ts -> gtid .xid = x -> xid ;
546+ ts -> gtid .node = MtmNodeId ;
547+ }else {
548+ ts -> gtid = x -> gtid ;
541549}
550+ MtmTransactionListAppend (ts );
551+ MtmAddSubtransactions (ts ,subxids ,nsubxids );
552+ }
553+ ts -> status = status ;
554+ MtmAdjustSubtransactions (ts );
555+
556+ if (dtmTx .isReplicated ) {
557+ ts -> gtid = x -> gtid ;
558+ MtmSendNotificationMessage (ts );
542559}
543560LWLockRelease (dtm -> hashLock );
544561}
@@ -548,13 +565,13 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
548565static void
549566MtmSetTransactionStatus (TransactionId xid ,int nsubxids ,TransactionId * subxids ,XidStatus status ,XLogRecPtr lsn )
550567{
551- MTM_TRACE ("%d: MtmSetTransactionStatus %u = %u\n" ,getpid (),xid ,status );
568+ MTM_TRACE ("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d \n" ,getpid (),xid ,dtmTx . xid , status , dtmTx . isDistributed );
552569if (xid == dtmTx .xid && dtmTx .isDistributed )
553570{
554571if (status == TRANSACTION_STATUS_ABORTED || !dtmTx .containsDML )
555572{
556573MtmFinishTransaction (xid ,nsubxids ,subxids ,status );
557- MTM_TRACE ("Abort transaction %d\n" ,xid );
574+ MTM_TRACE ("Abort transaction %d, status=%d, DML=%d \n" ,xid , status , dtmTx . containsDML );
558575}
559576else
560577{
@@ -1006,10 +1023,14 @@ MtmVoteForTransaction(MtmTransState* ts)
10061023MtmSendNotificationMessage (ts );/* send READY message to coordinator */
10071024}
10081025
1009- MTM_TRACE ("Node %d waiting latch...\n" ,MtmNodeId );
1010- WaitLatch (& MyProc -> procLatch ,WL_LATCH_SET ,-1 );
1011- ResetLatch (& MyProc -> procLatch );
1012- MTM_TRACE ("Node %d receives response...\n" ,MtmNodeId );
1026+ MTM_TRACE ("%d: Node %d waiting latch...\n" ,getpid (),MtmNodeId );
1027+ while (ts -> status != TRANSACTION_STATUS_COMMITTED && ts -> status != TRANSACTION_STATUS_ABORTED ) {
1028+ LWLockRelease (dtm -> hashLock );
1029+ WaitLatch (& MyProc -> procLatch ,WL_LATCH_SET ,-1 );
1030+ ResetLatch (& MyProc -> procLatch );
1031+ LWLockAcquire (dtm -> hashLock ,LW_SHARED );
1032+ }
1033+ MTM_TRACE ("%d: Node %d receives response...\n" ,getpid (),MtmNodeId );
10131034}
10141035
10151036HTAB * MtmCreateHash (void )