@@ -48,6 +48,7 @@ typedef struct
48
48
LWLockId xidLock ;
49
49
TransactionId nextXid ;
50
50
size_t nReservedXids ;
51
+ SnapshotData activeSnapshot ;
51
52
}DtmState ;
52
53
53
54
@@ -61,9 +62,11 @@ void _PG_fini(void);
61
62
62
63
static Snapshot DtmGetSnapshot (Snapshot snapshot );
63
64
static void DtmMergeSnapshots (Snapshot dst ,Snapshot src );
65
+ static void DtmMergeWithActiveSnapshot (Snapshot snapshot );
66
+ static void DtmMergeWithGlobalSnapshot (Snapshot snapshot );
64
67
static XidStatus DtmGetTransactionStatus (TransactionId xid ,XLogRecPtr * lsn );
65
68
static void DtmSetTransactionStatus (TransactionId xid ,int nsubxids ,TransactionId * subxids ,XidStatus status ,XLogRecPtr lsn );
66
- static void DtmUpdateRecentXmin (void );
69
+ static void DtmUpdateRecentXmin (Snapshot snapshot );
67
70
static void DtmInitialize (void );
68
71
static void DtmXactCallback (XactEvent event ,void * arg );
69
72
static TransactionId DtmGetNextXid (void );
@@ -144,10 +147,44 @@ static bool TransactionIdIsInDoubt(TransactionId xid)
144
147
return false;
145
148
}
146
149
150
+
147
151
static void DtmMergeSnapshots (Snapshot dst ,Snapshot src )
148
152
{
149
153
int i ,j ,n ;
154
+ TransactionId prev ;
155
+
156
+ if (src -> xmin < dst -> xmin ) {
157
+ dst -> xmin = src -> xmin ;
158
+ }
159
+
160
+ n = dst -> xcnt ;
161
+ Assert (src -> xcnt + n <=GetMaxSnapshotXidCount ());
162
+ memcpy (dst -> xip + n ,src -> xip ,src -> xcnt * sizeof (TransactionId ));
163
+ n += src -> xcnt ;
164
+
165
+ qsort (dst -> xip ,n ,sizeof (TransactionId ),xidComparator );
166
+ prev = InvalidTransactionId ;
167
+
168
+ for (i = 0 ,j = 0 ;i < n && dst -> xip [i ]< dst -> xmax ;i ++ ) {
169
+ if (dst -> xip [i ]!= prev ) {
170
+ dst -> xip [j ++ ]= prev = dst -> xip [i ];
171
+ }
172
+ }
173
+ dst -> xcnt = j ;
174
+ }
175
+
176
+ static void DtmMergeWithActiveSnapshot (Snapshot dst )
177
+ {
178
+ LWLockAcquire (dtm -> xidLock ,LW_EXCLUSIVE );
179
+ DtmMergeSnapshots (dst ,& dtm -> activeSnapshot );
180
+ LWLockRelease (dtm -> xidLock );
181
+ }
182
+
183
+ static void DtmMergeWithGlobalSnapshot (Snapshot dst )
184
+ {
185
+ int i ;
150
186
TransactionId xid ;
187
+ Snapshot src = & DtmSnapshot ;
151
188
152
189
Assert (TransactionIdIsValid (src -> xmin )&& TransactionIdIsValid (src -> xmax ));
153
190
@@ -166,35 +203,15 @@ static void DtmMergeSnapshots(Snapshot dst, Snapshot src)
166
203
DumpSnapshot (dst ,"local" );
167
204
DumpSnapshot (src ,"DTM" );
168
205
169
- /* Merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
170
- if (src -> xmin < dst -> xmin ) {
171
- dst -> xmin = src -> xmin ;
172
- ProcArrayInstallImportedXmin (src -> xmin ,DtmNextXid );
173
- //MyPgXact->xmin = TransactionXmin = src->xmin;
174
- }
175
206
if (src -> xmax < dst -> xmax )dst -> xmax = src -> xmax ;
176
207
177
-
178
- n = dst -> xcnt ;
179
- for (xid = dst -> xmax ;xid <=src -> xmin ;xid ++ ) {
180
- dst -> xip [n ++ ]= xid ;
181
- }
182
- memcpy (dst -> xip + n ,src -> xip ,src -> xcnt * sizeof (TransactionId ));
183
- n += src -> xcnt ;
184
- Assert (n <=GetMaxSnapshotXidCount ());
185
-
186
- qsort (dst -> xip ,n ,sizeof (TransactionId ),xidComparator );
187
- xid = InvalidTransactionId ;
208
+ DtmMergeSnapshots (dst ,src );
188
209
189
- for (i = 0 ,j = 0 ;i < n && dst -> xip [i ]< dst -> xmax ;i ++ ) {
190
- if (dst -> xip [i ]!= xid ) {
191
- dst -> xip [j ++ ]= xid = dst -> xip [i ];
192
- }
193
- }
194
- dst -> xcnt = j ;
195
210
DumpSnapshot (dst ,"merged" );
196
211
}
197
212
213
+
214
+
198
215
static TransactionId DtmGetOldestXmin (Relation rel ,bool ignoreVacuum )
199
216
{
200
217
TransactionId localXmin = GetOldestLocalXmin (rel ,ignoreVacuum );
@@ -211,7 +228,7 @@ static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
211
228
return localXmin ;
212
229
}
213
230
214
- static void DtmUpdateRecentXmin (void )
231
+ static void DtmUpdateRecentXmin (Snapshot snapshot )
215
232
{
216
233
TransactionId xmin = DtmMinXid ;//DtmSnapshot.xmin;
217
234
XTM_INFO ("XTM: DtmUpdateRecentXmin global xmin=%d, snapshot xmin %d\n" ,DtmMinXid ,DtmSnapshot .xmin );
@@ -228,9 +245,10 @@ static void DtmUpdateRecentXmin(void)
228
245
if (TransactionIdFollows (RecentGlobalXmin ,xmin )) {
229
246
RecentGlobalXmin = xmin ;
230
247
}
231
- if (TransactionIdFollows (RecentXmin ,xmin )) {
232
- RecentXmin = xmin ;
233
- }
248
+ }
249
+ if (TransactionIdFollows (RecentXmin ,snapshot -> xmin )) {
250
+ ProcArrayInstallImportedXmin (snapshot -> xmin ,GetCurrentTransactionId ());
251
+ RecentXmin = snapshot -> xmin ;
234
252
}
235
253
}
236
254
@@ -253,7 +271,7 @@ static TransactionId DtmGetNextXid()
253
271
}
254
272
}else {
255
273
if (dtm -> nReservedXids == 0 ) {
256
- dtm -> nReservedXids = DtmGlobalReserve (ShmemVariableCache -> nextXid ,DtmLocalXidReserve ,& dtm -> nextXid );
274
+ dtm -> nReservedXids = DtmGlobalReserve (ShmemVariableCache -> nextXid ,DtmLocalXidReserve ,& dtm -> nextXid , & dtm -> activeSnapshot );
257
275
Assert (dtm -> nReservedXids > 0 );
258
276
Assert (TransactionIdFollowsOrEquals (dtm -> nextXid ,ShmemVariableCache -> nextXid ));
259
277
@@ -488,14 +506,15 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
488
506
}
489
507
DtmCurcid = snapshot -> curcid ;
490
508
DtmLastSnapshot = snapshot ;
491
- DtmMergeSnapshots (snapshot , & DtmSnapshot );
509
+ DtmMergeWithGlobalSnapshot (snapshot );
492
510
if (!IsolationUsesXactSnapshot ()) {
493
511
DtmHasGlobalSnapshot = false;
494
512
}
495
513
}else {
496
514
snapshot = GetLocalSnapshotData (snapshot );
497
515
}
498
- DtmUpdateRecentXmin ();
516
+ DtmMergeWithActiveSnapshot (snapshot );
517
+ DtmUpdateRecentXmin (snapshot );
499
518
CurrentTransactionSnapshot = snapshot ;
500
519
return snapshot ;
501
520
}
@@ -566,6 +585,8 @@ static void DtmInitialize()
566
585
dtm -> hashLock = LWLockAssign ();
567
586
dtm -> xidLock = LWLockAssign ();
568
587
dtm -> nReservedXids = 0 ;
588
+ dtm -> activeSnapshot .xip = (TransactionId * )ShmemAlloc (GetMaxSnapshotXidCount ()* sizeof (TransactionId ));
589
+ dtm -> activeSnapshot .subxip = (TransactionId * )ShmemAlloc (GetMaxSnapshotSubxidCount ()* sizeof (TransactionId ));
569
590
}
570
591
LWLockRelease (AddinShmemInitLock );
571
592