Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit84d6f47

Browse files
committed
unify commit logic; some refactor
1 parentce2e104 commit84d6f47

File tree

2 files changed

+62
-201
lines changed

2 files changed

+62
-201
lines changed

‎src/backend/access/transam/global_snapshot.c

Lines changed: 62 additions & 180 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ static HTAB *gtid2xid;
8989
staticDtmNodeState*local;
9090
staticuint64totalSleepInterrupts;
9191
staticintDtmVacuumDelay=10;/* sec */
92-
staticboolDtmRecordCommits=0;
92+
staticboolfinishing_prepared;
93+
9394

9495
DtmCurrentTransdtm_tx;// XXXX: make static
9596

@@ -103,6 +104,7 @@ static size_t DtmGetTransactionStateSize(void);
103104
staticvoidDtmSerializeTransactionState(void*ctx);
104105
staticvoidDtmDeserializeTransactionState(void*ctx);
105106

107+
staticvoidDtmLocalFinish(boolis_commit);
106108

107109
staticTransactionManagerDtmTM= {
108110
PgTransactionIdGetStatus,
@@ -207,7 +209,8 @@ GlobalSnapshotShmemSize(void)
207209
Sizesize;
208210

209211
size=MAXALIGN(sizeof(DtmNodeState));
210-
size=add_size(size, (sizeof(DtmTransId)+sizeof(DtmTransStatus)+HASH_PER_ELEM_OVERHEAD*2)*DTM_HASH_INIT_SIZE);
212+
size=add_size(size,DTM_HASH_INIT_SIZE*
213+
(sizeof(DtmTransId)+sizeof(DtmTransStatus)+HASH_PER_ELEM_OVERHEAD*2));
211214

212215
returnsize;
213216
}
@@ -222,22 +225,28 @@ dtm_xact_callback(XactEvent event, void *arg)
222225
DtmLocalBegin(&dtm_tx);
223226
break;
224227

225-
caseXACT_EVENT_ABORT:
226-
DtmLocalAbort(&dtm_tx);
227-
DtmLocalEnd(&dtm_tx);
228+
caseXACT_EVENT_ABORT_PREPARED:
229+
// DtmLocalAbortPrepared(&dtm_tx);
230+
finishing_prepared= true;
231+
DtmAdjustOldestXid();
228232
break;
229233

230-
caseXACT_EVENT_COMMIT:
231-
DtmLocalCommit(&dtm_tx);
232-
DtmLocalEnd(&dtm_tx);
234+
caseXACT_EVENT_COMMIT_PREPARED:
235+
// DtmLocalCommitPrepared(&dtm_tx);
236+
finishing_prepared= true;
237+
DtmAdjustOldestXid();
233238
break;
234239

235-
caseXACT_EVENT_ABORT_PREPARED:
236-
DtmLocalAbortPrepared(&dtm_tx);
240+
caseXACT_EVENT_COMMIT:
241+
DtmLocalFinish(true);
242+
DtmLocalEnd(&dtm_tx);
243+
finishing_prepared= false;
237244
break;
238245

239-
caseXACT_EVENT_COMMIT_PREPARED:
240-
DtmLocalCommitPrepared(&dtm_tx);
246+
caseXACT_EVENT_ABORT:
247+
DtmLocalFinish(false);
248+
DtmLocalEnd(&dtm_tx);
249+
finishing_prepared= false;
241250
break;
242251

243252
caseXACT_EVENT_PRE_PREPARE:
@@ -254,43 +263,6 @@ dtm_xact_callback(XactEvent event, void *arg)
254263
****************************************************************************
255264
*/
256265

257-
staticuint32
258-
dtm_xid_hash_fn(constvoid*key,Sizekeysize)
259-
{
260-
return (uint32)*(TransactionId*)key;
261-
}
262-
263-
staticint
264-
dtm_xid_match_fn(constvoid*key1,constvoid*key2,Sizekeysize)
265-
{
266-
return*(TransactionId*)key1-*(TransactionId*)key2;
267-
}
268-
269-
staticuint32
270-
dtm_gtid_hash_fn(constvoid*key,Sizekeysize)
271-
{
272-
GlobalTransactionIdid= (GlobalTransactionId)key;
273-
uint32h=0;
274-
275-
while (*id!=0)
276-
{
277-
h=h*31+*id++;
278-
}
279-
returnh;
280-
}
281-
282-
staticvoid*
283-
dtm_gtid_keycopy_fn(void*dest,constvoid*src,Sizekeysize)
284-
{
285-
returnstrcpy((char*)dest, (GlobalTransactionId)src);
286-
}
287-
288-
staticint
289-
dtm_gtid_match_fn(constvoid*key1,constvoid*key2,Sizekeysize)
290-
{
291-
returnstrcmp((GlobalTransactionId)key1, (GlobalTransactionId)key2);
292-
}
293-
294266
staticcharconst*
295267
DtmGetName(void)
296268
{
@@ -480,17 +452,11 @@ DtmInitialize()
480452
void
481453
DtmLocalBegin(DtmCurrentTrans*x)
482454
{
483-
if (!TransactionIdIsValid(x->xid))
484-
{
485455
SpinLockAcquire(&local->lock);
486-
// x->xid = GetCurrentTransactionIdIfAny();
487456
x->cid=INVALID_CID;
488-
x->is_global= false;
489-
x->is_prepared= false;
490457
x->snapshot=dtm_get_cid();
491458
SpinLockRelease(&local->lock);
492459
DTM_TRACE((stderr,"DtmLocalBegin: transaction %u uses local snapshot %lu\n",x->xid,x->snapshot));
493-
}
494460
}
495461

496462
/*
@@ -516,7 +482,6 @@ DtmLocalExtend(GlobalTransactionId gtid)
516482
strncpy(x->gtid,gtid,MAX_GTID_SIZE);
517483
SpinLockRelease(&local->lock);
518484
}
519-
x->is_global= true;
520485
DtmInitGlobalXmin(TransactionXmin);
521486
returnx->snapshot;
522487
}
@@ -543,7 +508,6 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
543508
}
544509
local_cid=dtm_sync(global_cid);
545510
x->snapshot=global_cid;
546-
x->is_global= true;
547511
}
548512
strncpy(x->gtid,gtid,MAX_GTID_SIZE);
549513
SpinLockRelease(&local->lock);
@@ -627,85 +591,59 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
627591
}
628592
SpinLockRelease(&local->lock);
629593

630-
/*
631-
* Record commit in pg_committed_xact table to be make it possible to
632-
* perform recovery in case of crash of some of cluster nodes
633-
*/
634-
if (DtmRecordCommits)
635-
{
636-
charstmt[MAX_GTID_SIZE+64];
637-
intrc;
638-
639-
sprintf(stmt,"insert into pg_committed_xacts values ('%s')",gtid);
640-
SPI_connect();
641-
rc=SPI_execute(stmt, true,0);
642-
SPI_finish();
643-
if (rc!=SPI_OK_INSERT)
644-
{
645-
elog(ERROR,"Failed to insert GTID %s in table pg_committed_xacts",gtid);
646-
}
647-
}
648594
}
649595

650596
/*
651-
*Mark tranasction as prepared
597+
*Set transaction status to committed
652598
*/
653599
void
654-
DtmLocalCommitPrepared(DtmCurrentTrans*x)
600+
DtmLocalFinish(boolis_commit)
655601
{
656-
if (!x->gtid[0])
657-
return;
658-
659-
Assert(x->gtid!=NULL);
602+
DtmCurrentTrans*x=&dtm_tx;
603+
TransactionIdxid=GetCurrentTransactionIdIfAny();
660604

661-
SpinLockAcquire(&local->lock);
605+
if (x->gtid[0]&&finishing_prepared)
662606
{
663-
DtmTransId*id= (DtmTransId*)hash_search(gtid2xid,x->gtid,HASH_REMOVE,NULL);
607+
// Assert(!TransactionIdIsValid(xid));
664608

665-
Assert(id!=NULL);
609+
SpinLockAcquire(&local->lock);
610+
{
611+
DtmTransId*id= (DtmTransId*)hash_search(gtid2xid,x->gtid,HASH_REMOVE,NULL);
666612

667-
x->is_global= true;
668-
x->is_prepared= true;
669-
x->xid=id->xid;
670-
free(id->subxids);
613+
Assert(id!=NULL);
614+
Assert(TransactionIdIsValid(id->xid));
671615

672-
DTM_TRACE((stderr,"Global transaction %u(%s) is precommitted\n",x->xid,gtid));
616+
xid=id->xid;
617+
free(id->subxids);
618+
}
619+
SpinLockRelease(&local->lock);
620+
}
621+
elseif (!TransactionIdIsValid(xid))
622+
{
623+
return;
673624
}
674-
SpinLockRelease(&local->lock);
675-
676-
DtmAdjustOldestXid();
677-
// elog(LOG, "DtmLocalCommitPrepared %d", x->xid);
678-
}
679-
680-
/*
681-
* Set transaction status to committed
682-
*/
683-
void
684-
DtmLocalCommit(DtmCurrentTrans*x)
685-
{
686-
// if (!x->is_global)
687-
// return;
688625

689626
SpinLockAcquire(&local->lock);
690-
if (TransactionIdIsValid(x->xid))
691627
{
692628
boolfound;
693629
DtmTransStatus*ts;
694630

695-
ts= (DtmTransStatus*)hash_search(xid2status,&x->xid,HASH_ENTER,&found);
696-
ts->status=TRANSACTION_STATUS_COMMITTED;
631+
ts= (DtmTransStatus*)hash_search(xid2status,&xid,HASH_ENTER,&found);
632+
ts->status=is_commit ?TRANSACTION_STATUS_COMMITTED :TRANSACTION_STATUS_ABORTED;
697633
if (found)
698634
{
699-
inti;
700-
DtmTransStatus*sts=ts;
701635

702-
Assert(found);
703-
Assert(x->is_global);
704-
for (i=0;i<ts->nSubxids;i++)
636+
if (is_commit)// XXX: why only for commit?
705637
{
706-
sts=sts->next;
707-
Assert(sts->cid==ts->cid);
708-
sts->status=TRANSACTION_STATUS_COMMITTED;
638+
inti;
639+
DtmTransStatus*sts=ts;
640+
641+
for (i=0;i<ts->nSubxids;i++)
642+
{
643+
sts=sts->next;
644+
Assert(sts->cid==ts->cid);
645+
sts->status=TRANSACTION_STATUS_COMMITTED;
646+
}
709647
}
710648
}
711649
else
@@ -715,86 +653,31 @@ DtmLocalCommit(DtmCurrentTrans * x)
715653
Assert(!found);
716654
ts->cid=dtm_get_cid();
717655
DtmTransactionListAppend(ts);
718-
ts->nSubxids=xactGetCommittedChildren(&subxids);
719-
DtmAddSubtransactions(ts,subxids,ts->nSubxids);
656+
if (is_commit)// XXX: why?
657+
{
658+
ts->nSubxids=xactGetCommittedChildren(&subxids);
659+
DtmAddSubtransactions(ts,subxids,ts->nSubxids);
660+
}
661+
else
662+
{
663+
ts->nSubxids=0;
664+
}
720665
}
721666
x->cid=ts->cid;
722667
DTM_TRACE((stderr,"Local transaction %u is committed at %lu\n",x->xid,x->cid));
723668
}
724669
SpinLockRelease(&local->lock);
725670

726-
DtmAdjustOldestXid();
671+
//DtmAdjustOldestXid();
727672
// elog(LOG, "DtmLocalCommit %d", x->xid);
728673
}
729674

730-
/*
731-
* Mark tranasction as prepared
732-
*/
733-
void
734-
DtmLocalAbortPrepared(DtmCurrentTrans*x)
735-
{
736-
if (!x->gtid[0])
737-
return;
738-
739-
Assert(x->gtid!=NULL);
740-
741-
SpinLockAcquire(&local->lock);
742-
{
743-
DtmTransId*id= (DtmTransId*)hash_search(gtid2xid,x->gtid,HASH_REMOVE,NULL);
744-
745-
Assert(id!=NULL);
746-
x->is_global= true;
747-
x->is_prepared= true;
748-
x->xid=id->xid;
749-
free(id->subxids);
750-
DTM_TRACE((stderr,"Global transaction %u(%s) is preaborted\n",x->xid,gtid));
751-
}
752-
SpinLockRelease(&local->lock);
753-
}
754-
755-
/*
756-
* Set transaction status to aborted
757-
*/
758-
void
759-
DtmLocalAbort(DtmCurrentTrans*x)
760-
{
761-
if (!TransactionIdIsValid(x->xid))
762-
return;
763-
764-
SpinLockAcquire(&local->lock);
765-
{
766-
boolfound;
767-
DtmTransStatus*ts;
768-
769-
Assert(TransactionIdIsValid(x->xid));
770-
ts= (DtmTransStatus*)hash_search(xid2status,&x->xid,HASH_ENTER,&found);
771-
if (found)
772-
{
773-
Assert(found);
774-
Assert(x->is_global);
775-
}
776-
else
777-
{
778-
Assert(!found);
779-
ts->cid=dtm_get_cid();
780-
ts->nSubxids=0;
781-
DtmTransactionListAppend(ts);
782-
}
783-
x->cid=ts->cid;
784-
ts->status=TRANSACTION_STATUS_ABORTED;
785-
DTM_TRACE((stderr,"Local transaction %u is aborted at %lu\n",x->xid,x->cid));
786-
}
787-
SpinLockRelease(&local->lock);
788-
}
789-
790675
/*
791676
* Cleanup dtm_tx structure
792677
*/
793678
void
794679
DtmLocalEnd(DtmCurrentTrans*x)
795680
{
796-
x->is_global= false;
797-
x->is_prepared= false;
798681
x->xid=InvalidTransactionId;
799682
x->cid=INVALID_CID;
800683
}
@@ -854,7 +737,6 @@ DtmGetCsn(TransactionId xid)
854737
void
855738
DtmLocalSavePreparedState(DtmCurrentTrans*x)
856739
{
857-
// x->is_prepared = true;
858740

859741
if (x->gtid[0])
860742
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp