Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd9c18ab

Browse files
committed
Add comments to DTM and remove deteriorated code
1 parent2d626c2 commitd9c18ab

File tree

7 files changed

+81
-181
lines changed

7 files changed

+81
-181
lines changed

‎contrib/pg_xtm/dtmd/src/main.c‎

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,7 @@ static char *onreserve(void *stream, void *clientdata, cmd_t *cmd) {
224224
charhead[1+16+16+1];
225225
sprintf(head,"+%016llx%016llx",minxid,maxxid);
226226

227-
Snapshots;
228-
gen_snapshot(&s);
229-
char*snapser=snapshot_serialize(&s);
230-
231-
returndestructive_concat(strdup(head),snapser);
227+
returnstrdup(head);
232228
}
233229

234230
staticxid_tget_global_xmin() {

‎contrib/pg_xtm/libdtm.c‎

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -407,9 +407,7 @@ XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait)
407407
// reserved, and sets the 'first' xid accordingly. The number of xids reserved
408408
// is guaranteed to be at least nXids.
409409
// In other words, *first ≥ xid and result ≥ nXids.
410-
// Also sets the 'active' snapshot, which is used as a container for the list
411-
// of active global transactions.
412-
intDtmGlobalReserve(TransactionIdxid,intnXids,TransactionId*first,Snapshotactive)
410+
intDtmGlobalReserve(TransactionIdxid,intnXids,TransactionId*first)
413411
{
414412
boolok;
415413
xid_txmin,xmax;
@@ -423,7 +421,6 @@ int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first, Snapsho
423421

424422
if (!dtm_read_hex16(dtm,&xmin)) gotofailure;
425423
if (!dtm_read_hex16(dtm,&xmax)) gotofailure;
426-
if (!dtm_read_snapshot(dtm,active)) gotofailure;
427424

428425
*first=xmin;
429426
count=xmax-xmin+1;

‎contrib/pg_xtm/libdtm.h‎

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait);
3636
// reserved, and sets the 'first' xid accordingly. The number of xids reserved
3737
// is guaranteed to be at least nXids.
3838
// In other words, *first ≥ xid and result ≥ nXids.
39-
// Also sets the 'active' snapshot, which is used as a container for the list
40-
// of active global transactions.
41-
intDtmGlobalReserve(TransactionIdxid,intnXids,TransactionId*first,Snapshotactive);
39+
intDtmGlobalReserve(TransactionIdxid,intnXids,TransactionId*first);
4240

4341
#endif

‎contrib/pg_xtm/pg_dtm.c‎

Lines changed: 76 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,9 @@ typedef struct
4646
{
4747
LWLockIdhashLock;
4848
LWLockIdxidLock;
49-
TransactionIdminXid;
50-
TransactionIdnextXid;
51-
size_tnReservedXids;
52-
SnapshotDataactiveSnapshot;
49+
TransactionIdminXid;/* XID of oldest transaction visible by any active transaction (local or global) */
50+
TransactionIdnextXid;/* next XID for local transaction */
51+
size_tnReservedXids;/* number of XIDs reserved for local transactions */
5352
}DtmState;
5453

5554

@@ -60,15 +59,12 @@ void _PG_init(void);
6059
void_PG_fini(void);
6160

6261
staticSnapshotDtmGetSnapshot(Snapshotsnapshot);
63-
staticvoidDtmMergeSnapshots(Snapshotdst,Snapshotsrc);
64-
staticvoidDtmMergeWithActiveSnapshot(Snapshotsnapshot);
6562
staticvoidDtmMergeWithGlobalSnapshot(Snapshotsnapshot);
6663
staticXidStatusDtmGetTransactionStatus(TransactionIdxid,XLogRecPtr*lsn);
6764
staticvoidDtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn);
6865
staticvoidDtmUpdateRecentXmin(Snapshotsnapshot);
6966
staticvoidDtmInitialize(void);
7067
staticvoidDtmXactCallback(XactEventevent,void*arg);
71-
staticboolDtmTransactionIdIsInProgress(TransactionIdxid);
7268
staticTransactionIdDtmGetNextXid(void);
7369
staticTransactionIdDtmGetNewTransactionId(boolisSubXact);
7470
staticTransactionIdDtmGetOldestXmin(Relationrel,boolignoreVacuum);
@@ -91,7 +87,7 @@ static bool DtmGlobalXidAssigned;
9187
staticintDtmLocalXidReserve;
9288
staticintDtmCurcid;
9389
staticSnapshotDtmLastSnapshot;
94-
staticTransactionManagerDtmTM= {DtmGetTransactionStatus,DtmSetTransactionStatus,DtmGetSnapshot,DtmGetNewTransactionId,DtmGetOldestXmin,DtmTransactionIdIsInProgress,DtmGetGlobalTransactionId };
90+
staticTransactionManagerDtmTM= {DtmGetTransactionStatus,DtmSetTransactionStatus,DtmGetSnapshot,DtmGetNewTransactionId,DtmGetOldestXmin,TransactionIdIsRunning,DtmGetGlobalTransactionId };
9591

9692

9793
#defineXTM_TRACE(fmt, ...)
@@ -156,63 +152,23 @@ static bool TransactionIdIsInDoubt(TransactionId xid)
156152
return false;
157153
}
158154

159-
160-
staticvoidDtmMergeSnapshots(Snapshotdst,Snapshotsrc)
161-
{
162-
inti,j,n;
163-
TransactionIdprev;
164-
165-
if (src->xmin<dst->xmin) {
166-
dst->xmin=src->xmin;
167-
}
168-
169-
n=dst->xcnt;
170-
Assert(src->xcnt+n <=GetMaxSnapshotXidCount());
171-
memcpy(dst->xip+n,src->xip,src->xcnt*sizeof(TransactionId));
172-
n+=src->xcnt;
173-
174-
qsort(dst->xip,n,sizeof(TransactionId),xidComparator);
175-
prev=InvalidTransactionId;
176-
177-
for (i=0,j=0;i<n&&dst->xip[i]<dst->xmax;i++) {
178-
if (dst->xip[i]!=prev) {
179-
dst->xip[j++]=prev=dst->xip[i];
180-
}
181-
}
182-
dst->xcnt=j;
183-
}
184-
185-
staticvoidDtmMergeWithActiveSnapshot(Snapshotdst)
186-
{
187-
inti,j;
188-
XLogRecPtrlsn;
189-
Snapshotsrc=&dtm->activeSnapshot;
190-
191-
LWLockAcquire(dtm->xidLock,LW_EXCLUSIVE);
192-
for (i=0,j=0;i<src->xcnt;i++) {
193-
if (!TransactionIdIsInSnapshot(src->xip[i],dst)
194-
&&DtmGetTransactionStatus(src->xip[i],&lsn)==TRANSACTION_STATUS_IN_PROGRESS)
195-
{
196-
src->xip[j++]=src->xip[i];
197-
}
198-
}
199-
src->xcnt=j;
200-
if (j!=0) {
201-
src->xmin=src->xip[0];
202-
DtmMergeSnapshots(dst,src);
203-
}
204-
LWLockRelease(dtm->xidLock);
205-
}
206-
155+
/* Merge local and global snapshots.
156+
* Produce most restricted (conservative) snapshot which treate transaction as in-progress if is is marked as in-progress
157+
* either in local, either in global snapshots
158+
*/
207159
staticvoidDtmMergeWithGlobalSnapshot(Snapshotdst)
208160
{
209-
inti;
161+
inti,j,n;
210162
TransactionIdxid;
211163
Snapshotsrc=&DtmSnapshot;
212164

213165
Assert(TransactionIdIsValid(src->xmin)&&TransactionIdIsValid(src->xmax));
214166

215-
GetLocalSnapshot:
167+
GetLocalSnapshot:
168+
/*
169+
* Check that global and local snapshots are consistent: transactions marked as completed in global snapohsot
170+
* should be completed locally
171+
*/
216172
dst=GetLocalSnapshotData(dst);
217173
for (i=0;i<dst->xcnt;i++) {
218174
if (TransactionIdIsInDoubt(dst->xip[i])) {
@@ -229,11 +185,32 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
229185

230186
if (src->xmax<dst->xmax)dst->xmax=src->xmax;
231187

232-
DtmMergeSnapshots(dst,src);
188+
if (src->xmin<dst->xmin) {
189+
dst->xmin=src->xmin;
190+
}
191+
192+
n=dst->xcnt;
193+
Assert(src->xcnt+n <=GetMaxSnapshotXidCount());
194+
memcpy(dst->xip+n,src->xip,src->xcnt*sizeof(TransactionId));
195+
n+=src->xcnt;
196+
197+
qsort(dst->xip,n,sizeof(TransactionId),xidComparator);
198+
xid=InvalidTransactionId;
199+
200+
for (i=0,j=0;i<n&&dst->xip[i]<dst->xmax;i++) {
201+
if (dst->xip[i]!=xid) {
202+
dst->xip[j++]=xid=dst->xip[i];
203+
}
204+
}
205+
dst->xcnt=j;
233206

234207
DumpSnapshot(dst,"merged");
235208
}
236209

210+
/*
211+
* Get oldest Xid visible by any active transaction (global or local)
212+
* Take in account global Xmin received from DTMD
213+
*/
237214
staticTransactionIdDtmGetOldestXmin(Relationrel,boolignoreVacuum)
238215
{
239216
TransactionIdlocalXmin=GetOldestLocalXmin(rel,ignoreVacuum);
@@ -253,14 +230,16 @@ static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
253230
returnlocalXmin;
254231
}
255232

233+
/*
234+
* Update local Recent*Xmin variables taken in account MinXmin received from DTMD
235+
*/
256236
staticvoidDtmUpdateRecentXmin(Snapshotsnapshot)
257237
{
258-
TransactionIdxmin=dtm->minXid;//DtmSnapshot.xmin;
238+
TransactionIdxmin=dtm->minXid;
259239
XTM_INFO("XTM: DtmUpdateRecentXmin global xmin=%d, snapshot xmin %d\n",dtm->minXid,DtmSnapshot.xmin);
260240

261241
if (TransactionIdIsValid(xmin)) {
262242
xmin-=vacuum_defer_cleanup_age;
263-
//xmin = FirstNormalTransactionId;
264243
if (!TransactionIdIsNormal(xmin)) {
265244
xmin=FirstNormalTransactionId;
266245
}
@@ -277,6 +256,10 @@ static void DtmUpdateRecentXmin(Snapshot snapshot)
277256
}
278257
}
279258

259+
/*
260+
* Get new XID. For global transaction is it previsly set by dtm_begin_transaction or dtm_join_transaction.
261+
* Local transactions are using range of local Xids obtains from DTM.
262+
*/
280263
staticTransactionIdDtmGetNextXid()
281264
{
282265
TransactionIdxid;
@@ -285,18 +268,8 @@ static TransactionId DtmGetNextXid()
285268
XTM_INFO("Use global XID %d\n",DtmNextXid);
286269
xid=DtmNextXid;
287270

288-
#ifdefSUPPORT_LOCAL_TRANSACTIONS
289-
{
290-
TransactionId*p;
291-
p=bsearch(&DtmNextXid,dtm->activeSnapshot.xip,dtm->activeSnapshot.xcnt,sizeof(TransactionId),xidComparator);
292-
if (p!=NULL) {
293-
dtm->activeSnapshot.xcnt-=1;
294-
memcpy(p,p+1, (dtm->activeSnapshot.xcnt- (p-dtm->activeSnapshot.xip))*sizeof(TransactionId));
295-
}
296-
}
297-
#endif
298-
299271
if (TransactionIdPrecedesOrEquals(ShmemVariableCache->nextXid,xid)) {
272+
/* Advance ShmemVariableCache->nextXid formward until new Xid */
300273
while (TransactionIdPrecedes(ShmemVariableCache->nextXid,xid)) {
301274
XTM_INFO("Extend CLOG for global transaction to %d\n",ShmemVariableCache->nextXid);
302275
ExtendCLOG(ShmemVariableCache->nextXid);
@@ -308,21 +281,20 @@ static TransactionId DtmGetNextXid()
308281
}
309282
}else {
310283
if (dtm->nReservedXids==0) {
311-
dtm->nReservedXids=DtmGlobalReserve(ShmemVariableCache->nextXid,DtmLocalXidReserve,&dtm->nextXid,&dtm->activeSnapshot);
284+
dtm->nReservedXids=DtmGlobalReserve(ShmemVariableCache->nextXid,DtmLocalXidReserve,&dtm->nextXid);
312285
Assert(dtm->nReservedXids>0);
313286
Assert(TransactionIdFollowsOrEquals(dtm->nextXid,ShmemVariableCache->nextXid));
314287

288+
/* Advance ShmemVariableCache->nextXid formward until new Xid */
315289
while (TransactionIdPrecedes(ShmemVariableCache->nextXid,dtm->nextXid)) {
316290
XTM_INFO("Extend CLOG for local transaction to %d\n",ShmemVariableCache->nextXid);
317291
ExtendCLOG(ShmemVariableCache->nextXid);
318292
ExtendCommitTs(ShmemVariableCache->nextXid);
319293
ExtendSUBTRANS(ShmemVariableCache->nextXid);
320294
TransactionIdAdvance(ShmemVariableCache->nextXid);
321295
}
322-
Assert(ShmemVariableCache->nextXid==dtm->nextXid);
323-
}else {
324-
Assert(ShmemVariableCache->nextXid==dtm->nextXid);
325-
}
296+
}
297+
Assert(ShmemVariableCache->nextXid==dtm->nextXid);
326298
xid=dtm->nextXid++;
327299
dtm->nReservedXids-=1;
328300
XTM_INFO("Obtain new local XID %d\n",xid);
@@ -337,13 +309,16 @@ DtmGetGlobalTransactionId()
337309
returnDtmNextXid;
338310
}
339311

312+
/*
313+
* We have to cut&paste copde of GetNewTransactionId from varsup because we change way of advancing ShmemVariableCache->nextXid
314+
*/
340315
TransactionId
341316
DtmGetNewTransactionId(boolisSubXact)
342317
{
343318
TransactionIdxid;
344319

345320
XTM_INFO("%d: GetNewTransactionId\n",getpid());
346-
Assert(!DtmGlobalXidAssigned);
321+
Assert(!DtmGlobalXidAssigned);/* We should not assign new Xid if we do not use previous one */
347322

348323
/*
349324
* Workers synchronize transaction state at the beginning of each parallel
@@ -544,53 +519,42 @@ DtmGetNewTransactionId(bool isSubXact)
544519
}
545520

546521

547-
staticboolDtmTransactionIdIsInProgress(TransactionIdxid)
548-
{
549-
XLogRecPtrlsn;
550-
if (TransactionIdIsRunning(xid)) {
551-
return true;
552-
}
553-
#ifdefSUPPORT_LOCAL_TRANSACTIONS
554-
elseif (DtmGetTransactionStatus(xid,&lsn)==TRANSACTION_STATUS_IN_PROGRESS) {
555-
boolgloballyStarted;
556-
LWLockAcquire(dtm->xidLock,LW_SHARED);
557-
globallyStarted=bsearch(&xid,dtm->activeSnapshot.xip,dtm->activeSnapshot.xcnt,sizeof(TransactionId),xidComparator)!=NULL;
558-
LWLockRelease(dtm->xidLock);
559-
returngloballyStarted;
560-
}
561-
#endif
562-
return false;
563-
}
564-
565-
566522
staticSnapshotDtmGetSnapshot(Snapshotsnapshot)
567523
{
568524
if (DtmGlobalXidAssigned) {
525+
/* If DtmGlobalXidAssigned is set, we are in transaction performing dtm_begin_transaction or dtm_join_transaction
526+
* which PRECEDS actual transaction for which Xid is received.
527+
* This transaction doesn't need to take in accountn global snapshot
528+
*/
569529
returnGetLocalSnapshotData(snapshot);
570530
}
571-
if (TransactionIdIsValid(DtmNextXid)/*&& IsMVCCSnapshot(snapshot)*/&&snapshot!=&CatalogSnapshotData) {
531+
if (TransactionIdIsValid(DtmNextXid)&&snapshot!=&CatalogSnapshotData) {
572532
if (!DtmHasGlobalSnapshot&& (snapshot!=DtmLastSnapshot||DtmCurcid!=snapshot->curcid)) {
573533
DtmGlobalGetSnapshot(DtmNextXid,&DtmSnapshot,&dtm->minXid);
574534
}
575535
DtmCurcid=snapshot->curcid;
576536
DtmLastSnapshot=snapshot;
577537
DtmMergeWithGlobalSnapshot(snapshot);
578538
if (!IsolationUsesXactSnapshot()) {
539+
/* Use single global snapshot during all transaction for repeatable read isolation level,
540+
* but obtain new global snapshot each time it is requested for read committed isolation level
541+
*/
579542
DtmHasGlobalSnapshot= false;
580543
}
581544
}else {
545+
/* For local transactions and catalog snapshots use default GetSnapshotData implementation */
582546
snapshot=GetLocalSnapshotData(snapshot);
583547
}
584-
#ifdefSUPPORT_LOCAL_TRANSACTIONS
585-
DtmMergeWithActiveSnapshot(snapshot);
586-
#endif
587548
DtmUpdateRecentXmin(snapshot);
588549
CurrentTransactionSnapshot=snapshot;
589550
returnsnapshot;
590551
}
591552

592553
staticXidStatusDtmGetTransactionStatus(TransactionIdxid,XLogRecPtr*lsn)
593554
{
555+
/* Because of global snapshots we can ask for status of transaction which is not yet started locally: so we have
556+
* to compare xid with ShmemVariableCache->nextXid before accessing CLOG
557+
*/
594558
XidStatusstatus=xid >=ShmemVariableCache->nextXid
595559
?TRANSACTION_STATUS_IN_PROGRESS
596560
:CLOGTransactionIdGetStatus(xid,lsn);
@@ -603,8 +567,6 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
603567
XTM_INFO("%d: DtmSetTransactionStatus %u = %u\n",getpid(),xid,status);
604568
if (!RecoveryInProgress()) {
605569
if (!DtmGlobalXidAssigned&&TransactionIdIsValid(DtmNextXid)) {
606-
/* Already should be IN_PROGRESS */
607-
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
608570
CurrentTransactionSnapshot=NULL;
609571
if (status==TRANSACTION_STATUS_ABORTED) {
610572
CLOGTransactionIdSetTreeStatus(xid,nsubxids,subxids,status,lsn);
@@ -613,6 +575,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
613575
return;
614576
}else {
615577
XTM_INFO("Begin commit transaction %d\n",xid);
578+
/* Mark transaction as on-doubt in xid_in_doubt hash table */
616579
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
617580
hash_search(xid_in_doubt,&DtmNextXid,HASH_ENTER,NULL);
618581
LWLockRelease(dtm->hashLock);
@@ -656,9 +619,6 @@ static void DtmInitialize()
656619
dtm->xidLock=LWLockAssign();
657620
dtm->nReservedXids=0;
658621
dtm->minXid=InvalidTransactionId;
659-
dtm->activeSnapshot.xip= (TransactionId*)ShmemAlloc(GetMaxSnapshotXidCount()*sizeof(TransactionId));
660-
dtm->activeSnapshot.subxip= (TransactionId*)ShmemAlloc(GetMaxSnapshotSubxidCount()*sizeof(TransactionId));
661-
662622
RegisterXactCallback(DtmXactCallback,NULL);
663623
}
664624
LWLockRelease(AddinShmemInitLock);
@@ -684,13 +644,23 @@ DtmXactCallback(XactEvent event, void *arg)
684644
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n",getpid(),event,DtmGlobalXidAssigned,DtmNextXid);
685645
if (event==XACT_EVENT_COMMIT||event==XACT_EVENT_ABORT) {
686646
if (DtmGlobalXidAssigned) {
647+
/* DtmGlobalXidAssigned is set when Xid for global transaction is recieved.
648+
* But it happens in separate local transaction preceding this global transaction at this backend.
649+
* So this variable is used as indicator that we are still in local transaction preceeding global transaction.
650+
* When this local transaction is completed we are ready to assign Xid to global transaction.
651+
*/
687652
DtmGlobalXidAssigned= false;
688653
}elseif (TransactionIdIsValid(DtmNextXid)) {
689654
if (event==XACT_EVENT_COMMIT) {
655+
/* Now transaction status is already written in CLOG, so we can remove information about it from hash table */
690656
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
691657
hash_search(xid_in_doubt,&DtmNextXid,HASH_REMOVE,NULL);
692658
LWLockRelease(dtm->hashLock);
693659
}else {
660+
/* Transaction at the node can be aborted because of transaction failure at some other node
661+
* before it starts doing anything and assigned Xid, in this case Postgres is not calling SetTransactionStatus,
662+
* so we have to send report to DTMD here
663+
*/
694664
if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) {
695665
DtmGlobalSetTransStatus(DtmNextXid,TRANSACTION_STATUS_ABORTED, false);
696666
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp