@@ -194,7 +194,7 @@ static char const* MtmGetName(void)
194
194
Snapshot MtmGetSnapshot (Snapshot snapshot )
195
195
{
196
196
snapshot = PgGetSnapshotData (snapshot );
197
- RecentGlobalDataXmin = RecentGlobalXmin = MtmAdjustOldestXid (RecentGlobalDataXmin );
197
+ RecentGlobalDataXmin = RecentGlobalXmin = dtm -> oldestXid ; // MtmAdjustOldestXid(RecentGlobalDataXmin);
198
198
return snapshot ;
199
199
}
200
200
@@ -499,10 +499,8 @@ void MtmSendNotificationMessage(MtmTransState* ts)
499
499
ts -> nextVoting = votingList ;
500
500
dtm -> votingTransactions = ts ;
501
501
SpinLockRelease (& dtm -> votingSpinlock );
502
- MTM_TRACE ("Register commit message\n" );
503
502
if (votingList == NULL ) {
504
503
/* singal semaphore only once for the whole list */
505
- MTM_TRACE ("Signal semaphore\n" );
506
504
PGSemaphoreUnlock (& dtm -> votingSemaphore );
507
505
}
508
506
}
@@ -519,9 +517,11 @@ MtmCommitTransaction(TransactionId xid, int nsubxids, TransactionId *subxids)
519
517
MtmTransactionListAppend (ts );
520
518
MtmAddSubtransactions (ts ,subxids ,nsubxids );
521
519
520
+ MtmVoteForTransaction (ts );
521
+
522
522
LWLockRelease (dtm -> hashLock );
523
523
524
- MtmVoteForTransaction ( ts );
524
+ MTM_TRACE ( "%d: MtmCommitTransaction status=%d\n" , getpid (), ts -> status );
525
525
526
526
return ts -> status == TRANSACTION_STATUS_COMMITTED ;
527
527
}
@@ -530,15 +530,32 @@ static void
530
530
MtmFinishTransaction (TransactionId xid ,int nsubxids ,TransactionId * subxids ,XidStatus status )
531
531
{
532
532
MtmTransState * ts ;
533
+ MtmCurrentTrans * x = & dtmTx ;
534
+ bool found ;
533
535
534
536
LWLockAcquire (dtm -> hashLock ,LW_EXCLUSIVE );
535
- ts = hash_search (xid2state ,& xid ,HASH_FIND , NULL );
536
- if (ts != NULL ) {
537
+ ts = hash_search (xid2state ,& xid ,HASH_ENTER , & found );
538
+ if (! found ) {
537
539
ts -> status = status ;
538
- MtmAdjustSubtransactions (ts );
539
- if (dtmTx .isReplicated ) {
540
- MtmSendNotificationMessage (ts );
540
+ ts -> csn = MtmAssignCSN ();
541
+ ts -> procno = MyProc -> pgprocno ;
542
+ ts -> snapshot = INVALID_CSN ;
543
+ if (!TransactionIdIsValid (x -> gtid .xid ))
544
+ {
545
+ ts -> gtid .xid = x -> xid ;
546
+ ts -> gtid .node = MtmNodeId ;
547
+ }else {
548
+ ts -> gtid = x -> gtid ;
541
549
}
550
+ MtmTransactionListAppend (ts );
551
+ MtmAddSubtransactions (ts ,subxids ,nsubxids );
552
+ }
553
+ ts -> status = status ;
554
+ MtmAdjustSubtransactions (ts );
555
+
556
+ if (dtmTx .isReplicated ) {
557
+ ts -> gtid = x -> gtid ;
558
+ MtmSendNotificationMessage (ts );
542
559
}
543
560
LWLockRelease (dtm -> hashLock );
544
561
}
@@ -548,13 +565,13 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
548
565
static void
549
566
MtmSetTransactionStatus (TransactionId xid ,int nsubxids ,TransactionId * subxids ,XidStatus status ,XLogRecPtr lsn )
550
567
{
551
- MTM_TRACE ("%d: MtmSetTransactionStatus %u = %u\n" ,getpid (),xid ,status );
568
+ MTM_TRACE ("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d \n" ,getpid (),xid ,dtmTx . xid , status , dtmTx . isDistributed );
552
569
if (xid == dtmTx .xid && dtmTx .isDistributed )
553
570
{
554
571
if (status == TRANSACTION_STATUS_ABORTED || !dtmTx .containsDML )
555
572
{
556
573
MtmFinishTransaction (xid ,nsubxids ,subxids ,status );
557
- MTM_TRACE ("Abort transaction %d\n" ,xid );
574
+ MTM_TRACE ("Abort transaction %d, status=%d, DML=%d \n" ,xid , status , dtmTx . containsDML );
558
575
}
559
576
else
560
577
{
@@ -1006,10 +1023,14 @@ MtmVoteForTransaction(MtmTransState* ts)
1006
1023
MtmSendNotificationMessage (ts );/* send READY message to coordinator */
1007
1024
}
1008
1025
1009
- MTM_TRACE ("Node %d waiting latch...\n" ,MtmNodeId );
1010
- WaitLatch (& MyProc -> procLatch ,WL_LATCH_SET ,-1 );
1011
- ResetLatch (& MyProc -> procLatch );
1012
- MTM_TRACE ("Node %d receives response...\n" ,MtmNodeId );
1026
+ MTM_TRACE ("%d: Node %d waiting latch...\n" ,getpid (),MtmNodeId );
1027
+ while (ts -> status != TRANSACTION_STATUS_COMMITTED && ts -> status != TRANSACTION_STATUS_ABORTED ) {
1028
+ LWLockRelease (dtm -> hashLock );
1029
+ WaitLatch (& MyProc -> procLatch ,WL_LATCH_SET ,-1 );
1030
+ ResetLatch (& MyProc -> procLatch );
1031
+ LWLockAcquire (dtm -> hashLock ,LW_SHARED );
1032
+ }
1033
+ MTM_TRACE ("%d: Node %d receives response...\n" ,getpid (),MtmNodeId );
1013
1034
}
1014
1035
1015
1036
HTAB * MtmCreateHash (void )