Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit57002d1

Browse files
committed
cleanup DtmLocalFinish and get rid of x->gtid
1 parent4e2fed8 commit57002d1

File tree

4 files changed

+106
-105
lines changed

4 files changed

+106
-105
lines changed

‎contrib/postgres_fdw/connection.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -483,14 +483,14 @@ begin_remote_xact(ConnCacheEntry *entry)
483483
++two_phase_xact_count);
484484
MemoryContextSwitchTo(oldcxt);
485485

486-
current_global_cid=DtmLocalExtend(two_phase_xact_gid);
486+
current_global_cid=DtmLocalExtend();
487487
}
488488

489489
Assert(two_phase_xact_gid);
490490
/* join the new participant */
491491
res=PQexec(entry->conn,
492-
psprintf("SELECT pg_global_snaphot_join("UINT64_FORMAT", '%s')",
493-
current_global_cid,two_phase_xact_gid));
492+
psprintf("SELECT pg_global_snaphot_join("UINT64_FORMAT")",
493+
current_global_cid));
494494

495495
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
496496
{

‎src/backend/access/transam/global_snapshot.c

Lines changed: 99 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,29 @@ DtmAdjustSubtransactions(DtmTransStatus *ts)
295295
}
296296
}
297297

298+
/*
299+
* Add subtransactions to finished transactions list.
300+
* Copy CSN and status of parent transaction.
301+
*/
302+
staticvoid
303+
DtmAddSubtransactions(DtmTransStatus*ts,TransactionId*subxids,intnSubxids)
304+
{
305+
inti;
306+
307+
for (i=0;i<nSubxids;i++)
308+
{
309+
boolfound;
310+
DtmTransStatus*sts;
311+
312+
Assert(TransactionIdIsValid(subxids[i]));
313+
sts= (DtmTransStatus*)hash_search(xid2status,&subxids[i],HASH_ENTER,&found);
314+
Assert(!found);
315+
sts->cid=ts->cid;
316+
sts->nSubxids=0;
317+
DtmTransactionListInsertAfter(ts,sts);
318+
}
319+
}
320+
298321
/*
299322
* There can be different oldest XIDs at different cluster node.
300323
* Seince we do not have centralized aribiter, we have to rely in DtmVacuumDelay.
@@ -465,27 +488,19 @@ DtmLocalBegin(DtmCurrentTrans * x)
465488
* Returns snapshot of current transaction.
466489
*/
467490
cid_t
468-
DtmLocalExtend(GlobalTransactionIdgtid)
491+
DtmLocalExtend()
469492
{
470-
DtmCurrentTrans*x=&dtm_tx;
471-
472-
if (gtid!=NULL)
473-
{
474-
strncpy(x->gtid,gtid,MAX_GTID_SIZE);
475-
}
476493
DtmInitGlobalXmin(TransactionXmin);
477-
478494
dtm_tx.is_global= true;
479-
480-
returnx->snapshot;
495+
returndtm_tx.snapshot;
481496
}
482497

483498
/*
484499
* This function is executed on all nodes joining distributed transaction.
485500
* global_cid is snapshot taken from node initiated this transaction
486501
*/
487502
cid_t
488-
DtmLocalAccess(DtmCurrentTrans*x,GlobalTransactionIdgtid,cid_tglobal_cid)
503+
DtmLocalAccess(cid_tglobal_cid)
489504
{
490505
cid_tlocal_cid;
491506

@@ -494,9 +509,8 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
494509
SpinLockAcquire(&local->lock);
495510
{
496511
local_cid=dtm_sync(global_cid);
497-
x->snapshot=global_cid;
512+
dtm_tx.snapshot=global_cid;
498513
}
499-
strncpy(x->gtid,gtid,MAX_GTID_SIZE);
500514
SpinLockRelease(&local->lock);
501515

502516
dtm_tx.is_global= true;
@@ -510,6 +524,37 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
510524
returnglobal_cid;
511525
}
512526

527+
528+
/*
529+
* Save state of parepared transaction
530+
*/
531+
void
532+
DtmLocalSavePreparedState(DtmCurrentTrans*x)
533+
{
534+
535+
if (dtm_tx.is_global)
536+
{
537+
TransactionId*subxids;
538+
TransactionIdxid=GetCurrentTransactionId();
539+
intnSubxids=xactGetCommittedChildren(&subxids);
540+
541+
SpinLockAcquire(&local->lock);
542+
{
543+
DtmTransStatus*ts;
544+
boolfound;
545+
546+
ts= (DtmTransStatus*)hash_search(xid2status,&xid,HASH_ENTER,&found);
547+
Assert(!found);
548+
ts->cid=InDoubtGlobalCSN;
549+
ts->nSubxids=nSubxids;
550+
DtmTransactionListAppend(ts);
551+
DtmAddSubtransactions(ts,subxids,nSubxids);
552+
}
553+
SpinLockRelease(&local->lock);
554+
}
555+
}
556+
557+
513558
/*
514559
* Set transaction status to in-doubt. Now all transactions accessing tuples updated by this transaction have to
515560
* wait until it is either committed either aborted
@@ -587,10 +632,8 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
587632
return;// global ro tx
588633
}
589634

590-
591635
dtm_tx.xid=xid;
592636
dtm_tx.csn=cid;
593-
594637
}
595638

596639
/*
@@ -599,43 +642,57 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
599642
void
600643
DtmLocalFinish(boolis_commit)
601644
{
602-
DtmCurrentTrans*x=&dtm_tx;
603645
TransactionIdxid=GetCurrentTransactionIdIfAny();
646+
boolfound;
647+
DtmTransStatus*ts;
604648

605-
if (x->gtid[0]&&finishing_prepared)
649+
// We can't check just TransactionIdIsValid(dtm_tx.xid) because
650+
// then we catch commit of `select pg_global_snaphot_end_prepare(...)`
651+
if (TransactionIdIsValid(dtm_tx.xid)&&
652+
(finishing_prepared||// commit prepared of global
653+
TransactionIdIsValid(xid)))// ordinary commit of global
606654
{
655+
// Commit of global prepared tx
656+
607657
xid=dtm_tx.xid;
658+
Assert(GlobalCSNIsNormal(dtm_tx.csn));
659+
660+
SpinLockAcquire(&local->lock);
661+
ts= (DtmTransStatus*)hash_search(xid2status,&xid,HASH_FIND,&found);
662+
Assert(found);
663+
ts->cid=is_commit ?dtm_tx.csn :AbortedGlobalCSN;
664+
DtmAdjustSubtransactions(ts);// !
665+
SpinLockRelease(&local->lock);
666+
667+
dtm_tx.xid=InvalidTransactionId;
668+
dtm_tx.csn=InvalidGlobalCSN;
669+
dtm_tx.is_global= false;
608670
}
609-
elseif (!TransactionIdIsValid(xid))
671+
elseif (TransactionIdIsValid(xid))
610672
{
611-
return;
612-
}
673+
// Commit of local tx
613674

614-
SpinLockAcquire(&local->lock);
615-
{
616-
boolfound;
617-
DtmTransStatus*ts;
675+
TransactionId*subxids;
676+
intnSubxids=xactGetCommittedChildren(&subxids);
618677

619-
ts= (DtmTransStatus*)hash_search(xid2status,&xid,HASH_ENTER,&found);
678+
Assert(!GlobalCSNIsNormal(dtm_tx.csn));
679+
Assert(!TransactionIdIsValid(dtm_tx.xid));
620680

621-
if (found)
681+
if (dtm_tx.is_global)
622682
{
623-
Assert(GlobalCSNIsNormal(dtm_tx.csn));
624-
ts->cid=is_commit ?dtm_tx.csn :AbortedGlobalCSN;
625-
626-
dtm_tx.xid=InvalidTransactionId;
627-
dtm_tx.csn=InvalidGlobalCSN;
628-
}
629-
else
630-
{
631-
Assert(!GlobalCSNIsNormal(dtm_tx.csn));
632-
ts->cid=is_commit ?dtm_get_cid() :AbortedGlobalCSN;
633-
DtmTransactionListAppend(ts);
683+
Assert(!is_commit);
684+
dtm_tx.is_global= false;
634685
}
635-
DtmAdjustSubtransactions(ts);
636-
}
637-
SpinLockRelease(&local->lock);
638686

687+
SpinLockAcquire(&local->lock);
688+
ts= (DtmTransStatus*)hash_search(xid2status,&xid,HASH_ENTER,&found);
689+
Assert(!found);
690+
ts->cid=is_commit ?dtm_get_cid() :AbortedGlobalCSN;
691+
ts->nSubxids=nSubxids;
692+
DtmTransactionListAppend(ts);
693+
DtmAddSubtransactions(ts,subxids,nSubxids);
694+
SpinLockRelease(&local->lock);
695+
}
639696
}
640697

641698
/*
@@ -669,59 +726,6 @@ DtmDeserializeTransactionState(void* ctx)
669726
}
670727

671728

672-
/*
673-
* Save state of parepared transaction
674-
*/
675-
void
676-
DtmLocalSavePreparedState(DtmCurrentTrans*x)
677-
{
678-
679-
if (dtm_tx.is_global)
680-
{
681-
TransactionId*subxids;
682-
TransactionIdxid=GetCurrentTransactionId();
683-
intnSubxids=xactGetCommittedChildren(&subxids);
684-
685-
SpinLockAcquire(&local->lock);
686-
{
687-
DtmTransStatus*ts;
688-
boolfound;
689-
690-
ts= (DtmTransStatus*)hash_search(xid2status,&xid,HASH_ENTER,&found);
691-
Assert(!found);
692-
ts->cid=InDoubtGlobalCSN;
693-
ts->nSubxids=nSubxids;
694-
DtmTransactionListAppend(ts);
695-
DtmAddSubtransactions(ts,subxids,nSubxids);
696-
}
697-
SpinLockRelease(&local->lock);
698-
}
699-
}
700-
701-
/*
702-
* Add subtransactions to finished transactions list.
703-
* Copy CSN and status of parent transaction.
704-
*/
705-
staticvoid
706-
DtmAddSubtransactions(DtmTransStatus*ts,TransactionId*subxids,intnSubxids)
707-
{
708-
inti;
709-
710-
for (i=0;i<nSubxids;i++)
711-
{
712-
boolfound;
713-
DtmTransStatus*sts;
714-
715-
Assert(TransactionIdIsValid(subxids[i]));
716-
sts= (DtmTransStatus*)hash_search(xid2status,&subxids[i],HASH_ENTER,&found);
717-
Assert(!found);
718-
sts->cid=ts->cid;
719-
sts->nSubxids=0;
720-
DtmTransactionListInsertAfter(ts,sts);
721-
}
722-
}
723-
724-
725729
/*
726730
*
727731
* SQL functions for global snapshot mamagement.
@@ -731,8 +735,7 @@ DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids)
731735
Datum
732736
pg_global_snaphot_create(PG_FUNCTION_ARGS)
733737
{
734-
GlobalTransactionIdgtid=text_to_cstring(PG_GETARG_TEXT_PP(0));
735-
cid_tcid=DtmLocalExtend(gtid);
738+
cid_tcid=DtmLocalExtend();
736739

737740
DTM_TRACE((stderr,"Backend %d extends transaction %u(%s) to global with cid=%lu\n",getpid(),dtm_tx.xid,gtid,cid));
738741
PG_RETURN_INT64(cid);
@@ -742,10 +745,9 @@ Datum
742745
pg_global_snaphot_join(PG_FUNCTION_ARGS)
743746
{
744747
cid_tcid=PG_GETARG_INT64(0);
745-
GlobalTransactionIdgtid=text_to_cstring(PG_GETARG_TEXT_PP(1));
746748

747749
DTM_TRACE((stderr,"Backend %d joins transaction %u(%s) with cid=%lu\n",getpid(),dtm_tx.xid,gtid,cid));
748-
cid=DtmLocalAccess(&dtm_tx,gtid,cid);
750+
cid=DtmLocalAccess(cid);
749751
PG_RETURN_INT64(cid);
750752
}
751753

‎src/include/access/global_snapshot.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ typedef struct
1111
{
1212
cid_tsnapshot;
1313
GlobalCSNcsn;
14-
chargtid[MAX_GTID_SIZE];
1514
TransactionIdxid;
1615
boolis_global;
1716
}DtmCurrentTrans;
@@ -36,10 +35,10 @@ voidDtmInitialize(void);
3635
voidDtmLocalBegin(DtmCurrentTrans*x);
3736

3837
/* Extend local transaction to global by assigning upper bound CSN which is returned to coordinator */
39-
externcid_tDtmLocalExtend(GlobalTransactionIdgtid);
38+
externcid_tDtmLocalExtend(void);
4039

4140
/* Function called at first access to any datanode except first one involved in distributed transaction */
42-
cid_tDtmLocalAccess(DtmCurrentTrans*x,GlobalTransactionIdgtid,cid_tsnapshot);
41+
cid_tDtmLocalAccess(cid_tsnapshot);
4342

4443
/* Mark transaction as in-doubt */
4544
voidDtmLocalBeginPrepare(GlobalTransactionIdgtid);

‎src/include/catalog/pg_proc.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5534,9 +5534,9 @@ DATA(insert OID = 5028 ( satisfies_hash_partition PGNSP PGUID 12 1 0 2276 0 f f
55345534
DESCR("hash partition CHECK constraint");
55355535

55365536
/* global snapshot management functions */
5537-
DATA(insert OID = 3434 ( pg_global_snaphot_create PGNSP PGUID 12 1 0 0 0 f f f f t f v u1 0 20 "25" _null_ _null_ _null_ _null_ _null_ pg_global_snaphot_create _null_ _null_ _null_ ));
5537+
DATA(insert OID = 3434 ( pg_global_snaphot_create PGNSP PGUID 12 1 0 0 0 f f f f t f v u0 0 20 "" _null_ _null_ _null_ _null_ _null_ pg_global_snaphot_create _null_ _null_ _null_ ));
55385538
DESCR("create global transaction snapshot");
5539-
DATA(insert OID = 3435 ( pg_global_snaphot_join PGNSP PGUID 12 1 0 0 0 f f f f t f v u2 0 20 "20 25" _null_ _null_ _null_ _null_ _null_ pg_global_snaphot_join _null_ _null_ _null_ ));
5539+
DATA(insert OID = 3435 ( pg_global_snaphot_join PGNSP PGUID 12 1 0 0 0 f f f f t f v u1 0 20 "20" _null_ _null_ _null_ _null_ _null_ pg_global_snaphot_join _null_ _null_ _null_ ));
55405540
DESCR("set given global snapshot for current transaction");
55415541
DATA(insert OID = 3436 ( pg_global_snaphot_begin_prepare PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "25" _null_ _null_ _null_ _null_ _null_ pg_global_snaphot_begin_prepare _null_ _null_ _null_ ));
55425542
DESCR("start prepare of global transaction");

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp