Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit99affcb

Browse files
committed
Add active snapshot
1 parent3ddd266 commit99affcb

File tree

5 files changed

+58
-37
lines changed

5 files changed

+58
-37
lines changed

‎contrib/pg_xtm/libdtm.h‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,6 @@ XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait);
3636
// of xids reserved, and sets the 'first' xid accordingly. The number of xids
3737
// reserved is guaranteed to be at least nXids.
3838
// In other words, *first ≥ xid and result ≥ nXids.
39-
intDtmGlobalReserve(TransactionIdxid,intnXids,TransactionId*first);
39+
intDtmGlobalReserve(TransactionIdxid,intnXids,TransactionId*first,Snapshotactive);
4040

4141
#endif

‎contrib/pg_xtm/pg_dtm.c‎

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ typedef struct
4848
LWLockIdxidLock;
4949
TransactionIdnextXid;
5050
size_tnReservedXids;
51+
SnapshotDataactiveSnapshot;
5152
}DtmState;
5253

5354

@@ -61,9 +62,11 @@ void _PG_fini(void);
6162

6263
staticSnapshotDtmGetSnapshot(Snapshotsnapshot);
6364
staticvoidDtmMergeSnapshots(Snapshotdst,Snapshotsrc);
65+
staticvoidDtmMergeWithActiveSnapshot(Snapshotsnapshot);
66+
staticvoidDtmMergeWithGlobalSnapshot(Snapshotsnapshot);
6467
staticXidStatusDtmGetTransactionStatus(TransactionIdxid,XLogRecPtr*lsn);
6568
staticvoidDtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn);
66-
staticvoidDtmUpdateRecentXmin(void);
69+
staticvoidDtmUpdateRecentXmin(Snapshotsnapshot);
6770
staticvoidDtmInitialize(void);
6871
staticvoidDtmXactCallback(XactEventevent,void*arg);
6972
staticTransactionIdDtmGetNextXid(void);
@@ -144,10 +147,44 @@ static bool TransactionIdIsInDoubt(TransactionId xid)
144147
return false;
145148
}
146149

150+
147151
staticvoidDtmMergeSnapshots(Snapshotdst,Snapshotsrc)
148152
{
149153
inti,j,n;
154+
TransactionIdprev;
155+
156+
if (src->xmin<dst->xmin) {
157+
dst->xmin=src->xmin;
158+
}
159+
160+
n=dst->xcnt;
161+
Assert(src->xcnt+n <=GetMaxSnapshotXidCount());
162+
memcpy(dst->xip+n,src->xip,src->xcnt*sizeof(TransactionId));
163+
n+=src->xcnt;
164+
165+
qsort(dst->xip,n,sizeof(TransactionId),xidComparator);
166+
prev=InvalidTransactionId;
167+
168+
for (i=0,j=0;i<n&&dst->xip[i]<dst->xmax;i++) {
169+
if (dst->xip[i]!=prev) {
170+
dst->xip[j++]=prev=dst->xip[i];
171+
}
172+
}
173+
dst->xcnt=j;
174+
}
175+
176+
staticvoidDtmMergeWithActiveSnapshot(Snapshotdst)
177+
{
178+
LWLockAcquire(dtm->xidLock,LW_EXCLUSIVE);
179+
DtmMergeSnapshots(dst,&dtm->activeSnapshot);
180+
LWLockRelease(dtm->xidLock);
181+
}
182+
183+
staticvoidDtmMergeWithGlobalSnapshot(Snapshotdst)
184+
{
185+
inti;
150186
TransactionIdxid;
187+
Snapshotsrc=&DtmSnapshot;
151188

152189
Assert(TransactionIdIsValid(src->xmin)&&TransactionIdIsValid(src->xmax));
153190

@@ -166,35 +203,15 @@ static void DtmMergeSnapshots(Snapshot dst, Snapshot src)
166203
DumpSnapshot(dst,"local");
167204
DumpSnapshot(src,"DTM");
168205

169-
/* Merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
170-
if (src->xmin<dst->xmin) {
171-
dst->xmin=src->xmin;
172-
ProcArrayInstallImportedXmin(src->xmin,DtmNextXid);
173-
//MyPgXact->xmin = TransactionXmin = src->xmin;
174-
}
175206
if (src->xmax<dst->xmax)dst->xmax=src->xmax;
176207

177-
178-
n=dst->xcnt;
179-
for (xid=dst->xmax;xid <=src->xmin;xid++) {
180-
dst->xip[n++]=xid;
181-
}
182-
memcpy(dst->xip+n,src->xip,src->xcnt*sizeof(TransactionId));
183-
n+=src->xcnt;
184-
Assert(n <=GetMaxSnapshotXidCount());
185-
186-
qsort(dst->xip,n,sizeof(TransactionId),xidComparator);
187-
xid=InvalidTransactionId;
208+
DtmMergeSnapshots(dst,src);
188209

189-
for (i=0,j=0;i<n&&dst->xip[i]<dst->xmax;i++) {
190-
if (dst->xip[i]!=xid) {
191-
dst->xip[j++]=xid=dst->xip[i];
192-
}
193-
}
194-
dst->xcnt=j;
195210
DumpSnapshot(dst,"merged");
196211
}
197212

213+
214+
198215
staticTransactionIdDtmGetOldestXmin(Relationrel,boolignoreVacuum)
199216
{
200217
TransactionIdlocalXmin=GetOldestLocalXmin(rel,ignoreVacuum);
@@ -211,7 +228,7 @@ static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
211228
returnlocalXmin;
212229
}
213230

214-
staticvoidDtmUpdateRecentXmin(void)
231+
staticvoidDtmUpdateRecentXmin(Snapshotsnapshot)
215232
{
216233
TransactionIdxmin=DtmMinXid;//DtmSnapshot.xmin;
217234
XTM_INFO("XTM: DtmUpdateRecentXmin global xmin=%d, snapshot xmin %d\n",DtmMinXid,DtmSnapshot.xmin);
@@ -228,9 +245,10 @@ static void DtmUpdateRecentXmin(void)
228245
if (TransactionIdFollows(RecentGlobalXmin,xmin)) {
229246
RecentGlobalXmin=xmin;
230247
}
231-
if (TransactionIdFollows(RecentXmin,xmin)) {
232-
RecentXmin=xmin;
233-
}
248+
}
249+
if (TransactionIdFollows(RecentXmin,snapshot->xmin)) {
250+
ProcArrayInstallImportedXmin(snapshot->xmin,GetCurrentTransactionId());
251+
RecentXmin=snapshot->xmin;
234252
}
235253
}
236254

@@ -253,7 +271,7 @@ static TransactionId DtmGetNextXid()
253271
}
254272
}else {
255273
if (dtm->nReservedXids==0) {
256-
dtm->nReservedXids=DtmGlobalReserve(ShmemVariableCache->nextXid,DtmLocalXidReserve,&dtm->nextXid);
274+
dtm->nReservedXids=DtmGlobalReserve(ShmemVariableCache->nextXid,DtmLocalXidReserve,&dtm->nextXid,&dtm->activeSnapshot);
257275
Assert(dtm->nReservedXids>0);
258276
Assert(TransactionIdFollowsOrEquals(dtm->nextXid,ShmemVariableCache->nextXid));
259277

@@ -488,14 +506,15 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
488506
}
489507
DtmCurcid=snapshot->curcid;
490508
DtmLastSnapshot=snapshot;
491-
DtmMergeSnapshots(snapshot,&DtmSnapshot);
509+
DtmMergeWithGlobalSnapshot(snapshot);
492510
if (!IsolationUsesXactSnapshot()) {
493511
DtmHasGlobalSnapshot= false;
494512
}
495513
}else {
496514
snapshot=GetLocalSnapshotData(snapshot);
497515
}
498-
DtmUpdateRecentXmin();
516+
DtmMergeWithActiveSnapshot(snapshot);
517+
DtmUpdateRecentXmin(snapshot);
499518
CurrentTransactionSnapshot=snapshot;
500519
returnsnapshot;
501520
}
@@ -566,6 +585,8 @@ static void DtmInitialize()
566585
dtm->hashLock=LWLockAssign();
567586
dtm->xidLock=LWLockAssign();
568587
dtm->nReservedXids=0;
588+
dtm->activeSnapshot.xip= (TransactionId*)ShmemAlloc(GetMaxSnapshotXidCount()*sizeof(TransactionId));
589+
dtm->activeSnapshot.subxip= (TransactionId*)ShmemAlloc(GetMaxSnapshotSubxidCount()*sizeof(TransactionId));
569590
}
570591
LWLockRelease(AddinShmemInitLock);
571592

‎contrib/pg_xtm/tests/transfers.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
125125

126126
start:=time.Now()
127127
formyCommits<N_ITERATIONS {
128-
amount:=2*rand.Intn(2000)-1
129-
//amount := 1
128+
//amount := 2*rand.Intn(2000) - 1
129+
amount:=1
130130
account1:=rand.Intn(N_ACCOUNTS)
131131
account2:=rand.Intn(N_ACCOUNTS)
132132

‎src/backend/access/heap/visibilitymap.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
254254
Pagepage;
255255
char*map;
256256

257-
#if0
257+
#if1
258258
fprintf(stderr,"Visibilitymap cutoff %d, RecentLocalDataXmin=%d\n",cutoff_xid,RecentGlobalDataXmin);
259259
// return;
260260
#endif

‎src/backend/utils/time/tqual.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,8 +1165,8 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11651165
HeapTupleHeadertuple=htup->t_data;
11661166
TransactionIdcurxid=GetCurrentTransactionId();
11671167
if (TransactionIdIsNormal(curxid)) {
1168-
fprintf(stderr,"pid=%d Transaction %d, [%d,%d) visibility check for tuple {%d,%d) = %d\n",
1169-
getpid(),curxid,snapshot->xmin,snapshot->xmax,HeapTupleHeaderGetRawXmin(tuple),HeapTupleHeaderGetRawXmax(tuple),result);
1168+
fprintf(stderr,"pid=%d Transaction %d, [%d,%d) visibility check for tuple {%d,%d} %x = %d\n",
1169+
getpid(),curxid,snapshot->xmin,snapshot->xmax,HeapTupleHeaderGetRawXmin(tuple),HeapTupleHeaderGetRawXmax(tuple),tuple->t_infomask,result);
11701170
}
11711171
returnresult;
11721172
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp