@@ -63,7 +63,7 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot);
63
63
static void DtmMergeSnapshots (Snapshot dst ,Snapshot src );
64
64
static XidStatus DtmGetTransactionStatus (TransactionId xid ,XLogRecPtr * lsn );
65
65
static void DtmSetTransactionStatus (TransactionId xid ,int nsubxids ,TransactionId * subxids ,XidStatus status ,XLogRecPtr lsn );
66
- static void DtmUpdateRecentXmin (void );
66
+ static void DtmUpdateRecentXmin (TransactionId xmin );
67
67
static void DtmInitialize (void );
68
68
static void DtmXactCallback (XactEvent event ,void * arg );
69
69
static TransactionId DtmGetNextXid (void );
@@ -186,10 +186,8 @@ static void DtmMergeSnapshots(Snapshot dst, Snapshot src)
186
186
DumpSnapshot (dst ,"merged" );
187
187
}
188
188
189
- static void DtmUpdateRecentXmin (void )
189
+ static void DtmUpdateRecentXmin (TransactionId xmin )
190
190
{
191
- TransactionId xmin = DtmSnapshot .xmin ;
192
-
193
191
XTM_TRACE ("XTM: DtmUpdateRecentXmin \n" );
194
192
195
193
if (TransactionIdIsValid (xmin )) {
@@ -207,6 +205,9 @@ static void DtmUpdateRecentXmin(void)
207
205
if (TransactionIdFollows (RecentXmin ,xmin )) {
208
206
RecentXmin = xmin ;
209
207
}
208
+ if (TransactionIdFollows (MyPgXact -> xmin ,xmin )) {
209
+ MyPgXact -> xmin = xmin ;
210
+ }
210
211
}
211
212
}
212
213
@@ -459,19 +460,21 @@ DtmGetNewTransactionId(bool isSubXact)
459
460
460
461
static Snapshot DtmGetSnapshot (Snapshot snapshot )
461
462
{
462
-
463
463
if (TransactionIdIsValid (DtmNextXid )) {
464
464
if (!DtmHasGlobalSnapshot ) {
465
- DtmGlobalGetSnapshot (DtmNextXid ,& DtmSnapshot );
465
+ TransactionId gxmin = InvalidTransactionId ;
466
+ DtmGlobalGetSnapshot (DtmNextXid ,& DtmSnapshot ,& gxmin );
467
+ if (TransactionIdIsValid (gxmin )) {
468
+ DtmUpdateRecentXmin (gxmin );
469
+ }
466
470
}
467
471
DtmMergeSnapshots (snapshot ,& DtmSnapshot );
468
- if (!IsolationUsesXactSnapshot ()) {
469
- DtmHasGlobalSnapshot = false;
470
- }
472
+ if (!IsolationUsesXactSnapshot ()) {
473
+ DtmHasGlobalSnapshot = false;
474
+ }
471
475
}else {
472
- snapshot = GetLocalSnapshotData (snapshot );
473
- }
474
- DtmUpdateRecentXmin ();
476
+ snapshot = GetLocalSnapshotData (snapshot );
477
+ }
475
478
CurrentTransactionSnapshot = snapshot ;
476
479
return snapshot ;
477
480
}
@@ -672,10 +675,16 @@ dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
672
675
Datum
673
676
dtm_begin_transaction (PG_FUNCTION_ARGS )
674
677
{
678
+ TransactionId gxmin ;
675
679
int nParticipants = PG_GETARG_INT32 (0 );
676
680
Assert (!TransactionIdIsValid (DtmNextXid ));
677
681
678
- DtmNextXid = DtmGlobalStartTransaction (nParticipants ,& DtmSnapshot );
682
+ gxmin = InvalidTransactionId ;
683
+ DtmNextXid = DtmGlobalStartTransaction (nParticipants ,& DtmSnapshot ,& gxmin );
684
+ if (TransactionIdIsValid (gxmin )) {
685
+ DtmUpdateRecentXmin (gxmin );
686
+ }
687
+
679
688
Assert (TransactionIdIsValid (DtmNextXid ));
680
689
XTM_INFO ("%d: Start global transaction %d\n" ,getpid (),DtmNextXid );
681
690
@@ -687,12 +696,17 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
687
696
688
697
Datum dtm_join_transaction (PG_FUNCTION_ARGS )
689
698
{
699
+ TransactionId gxmin ;
690
700
Assert (!TransactionIdIsValid (DtmNextXid ));
691
701
DtmNextXid = PG_GETARG_INT32 (0 );
692
702
Assert (TransactionIdIsValid (DtmNextXid ));
693
703
XTM_INFO ("%d: Join global transaction %d\n" ,getpid (),DtmNextXid );
694
704
695
- DtmGlobalGetSnapshot (DtmNextXid ,& DtmSnapshot );
705
+ gxmin = InvalidTransactionId ;
706
+ DtmGlobalGetSnapshot (DtmNextXid ,& DtmSnapshot ,& gxmin );
707
+ if (TransactionIdIsValid (gxmin )) {
708
+ DtmUpdateRecentXmin (gxmin );
709
+ }
696
710
697
711
DtmHasGlobalSnapshot = true;
698
712
DtmIsGlobalTransaction = true;