Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4beb25c

Browse files
committed
Add subtransaction handling for table synchronization workers.
Since the old logic was completely unaware of subtransactions, achange made in a subsequently-aborted subtransaction would still causeworkers to be stopped at toplevel transaction commit. Fix that bymanaging a stack of worker lists rather than just one.Amit Khandekar and Robert HaasDiscussion:http://postgr.es/m/CAJ3gD9eaG_mWqiOTA2LfAug-VRNn1hrhf50Xi1YroxL37QkZNg@mail.gmail.com
1 parent0bb28ca commit4beb25c

File tree

4 files changed

+112
-8
lines changed

4 files changed

+112
-8
lines changed

‎src/backend/access/transam/xact.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4542,6 +4542,7 @@ CommitSubTransaction(void)
45424542
AtEOSubXact_HashTables(true,s->nestingLevel);
45434543
AtEOSubXact_PgStat(true,s->nestingLevel);
45444544
AtSubCommit_Snapshot(s->nestingLevel);
4545+
AtEOSubXact_ApplyLauncher(true,s->nestingLevel);
45454546

45464547
/*
45474548
* We need to restore the upper transaction's read-only state, in case the
@@ -4695,6 +4696,7 @@ AbortSubTransaction(void)
46954696
AtEOSubXact_HashTables(false,s->nestingLevel);
46964697
AtEOSubXact_PgStat(false,s->nestingLevel);
46974698
AtSubAbort_Snapshot(s->nestingLevel);
4699+
AtEOSubXact_ApplyLauncher(false,s->nestingLevel);
46984700
}
46994701

47004702
/*

‎src/backend/replication/logical/launcher.c

Lines changed: 108 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
7979
Oidrelid;
8080
}LogicalRepWorkerId;
8181

82-
staticList*on_commit_stop_workers=NIL;
82+
typedefstructStopWorkersData
83+
{
84+
intnestDepth;/* Sub-transaction nest level */
85+
List*workers;/* List of LogicalRepWorkerId */
86+
structStopWorkersData*parent;/* This need not be an immediate
87+
* subtransaction parent */
88+
}StopWorkersData;
89+
90+
/*
91+
* Stack of StopWorkersData elements. Each stack element contains the workers
92+
* to be stopped for that subtransaction.
93+
*/
94+
staticStopWorkersData*on_commit_stop_workers=NULL;
8395

8496
staticvoidApplyLauncherWakeup(void);
8597
staticvoidlogicalrep_launcher_onexit(intcode,Datumarg);
@@ -558,17 +570,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
558570
void
559571
logicalrep_worker_stop_at_commit(Oidsubid,Oidrelid)
560572
{
573+
intnestDepth=GetCurrentTransactionNestLevel();
561574
LogicalRepWorkerId*wid;
562575
MemoryContextoldctx;
563576

564577
/* Make sure we store the info in context that survives until commit. */
565578
oldctx=MemoryContextSwitchTo(TopTransactionContext);
566579

580+
/* Check that previous transactions were properly cleaned up. */
581+
Assert(on_commit_stop_workers==NULL||
582+
nestDepth >=on_commit_stop_workers->nestDepth);
583+
584+
/*
585+
* Push a new stack element if we don't already have one for the current
586+
* nestDepth.
587+
*/
588+
if (on_commit_stop_workers==NULL||
589+
nestDepth>on_commit_stop_workers->nestDepth)
590+
{
591+
StopWorkersData*newdata=palloc(sizeof(StopWorkersData));
592+
593+
newdata->nestDepth=nestDepth;
594+
newdata->workers=NIL;
595+
newdata->parent=on_commit_stop_workers;
596+
on_commit_stop_workers=newdata;
597+
}
598+
599+
/*
600+
* Finally add a new worker into the worker list of the current
601+
* subtransaction.
602+
*/
567603
wid=palloc(sizeof(LogicalRepWorkerId));
568604
wid->subid=subid;
569605
wid->relid=relid;
570-
571-
on_commit_stop_workers=lappend(on_commit_stop_workers,wid);
606+
on_commit_stop_workers->workers=
607+
lappend(on_commit_stop_workers->workers,wid);
572608

573609
MemoryContextSwitchTo(oldctx);
574610
}
@@ -820,7 +856,7 @@ ApplyLauncherShmemInit(void)
820856
bool
821857
XactManipulatesLogicalReplicationWorkers(void)
822858
{
823-
return (on_commit_stop_workers!=NIL);
859+
return (on_commit_stop_workers!=NULL);
824860
}
825861

826862
/*
@@ -829,15 +865,25 @@ XactManipulatesLogicalReplicationWorkers(void)
829865
void
830866
AtEOXact_ApplyLauncher(boolisCommit)
831867
{
868+
869+
Assert(on_commit_stop_workers==NULL||
870+
(on_commit_stop_workers->nestDepth==1&&
871+
on_commit_stop_workers->parent==NULL));
872+
832873
if (isCommit)
833874
{
834875
ListCell*lc;
835876

836-
foreach(lc,on_commit_stop_workers)
877+
if (on_commit_stop_workers!=NULL)
837878
{
838-
LogicalRepWorkerId*wid=lfirst(lc);
879+
List*workers=on_commit_stop_workers->workers;
880+
881+
foreach(lc,workers)
882+
{
883+
LogicalRepWorkerId*wid=lfirst(lc);
839884

840-
logicalrep_worker_stop(wid->subid,wid->relid);
885+
logicalrep_worker_stop(wid->subid,wid->relid);
886+
}
841887
}
842888

843889
if (on_commit_launcher_wakeup)
@@ -848,10 +894,64 @@ AtEOXact_ApplyLauncher(bool isCommit)
848894
* No need to pfree on_commit_stop_workers. It was allocated in
849895
* transaction memory context, which is going to be cleaned soon.
850896
*/
851-
on_commit_stop_workers=NIL;
897+
on_commit_stop_workers=NULL;
852898
on_commit_launcher_wakeup= false;
853899
}
854900

901+
/*
902+
* On commit, merge the current on_commit_stop_workers list into the
903+
* immediate parent, if present.
904+
* On rollback, discard the current on_commit_stop_workers list.
905+
* Pop out the stack.
906+
*/
907+
void
908+
AtEOSubXact_ApplyLauncher(boolisCommit,intnestDepth)
909+
{
910+
StopWorkersData*parent;
911+
912+
/* Exit immediately if there's no work to do at this level. */
913+
if (on_commit_stop_workers==NULL||
914+
on_commit_stop_workers->nestDepth<nestDepth)
915+
return;
916+
917+
Assert(on_commit_stop_workers->nestDepth==nestDepth);
918+
919+
parent=on_commit_stop_workers->parent;
920+
921+
if (isCommit)
922+
{
923+
/*
924+
* If the upper stack element is not an immediate parent
925+
* subtransaction, just decrement the notional nesting depth without
926+
* doing any real work. Else, we need to merge the current workers
927+
* list into the parent.
928+
*/
929+
if (!parent||parent->nestDepth<nestDepth-1)
930+
{
931+
on_commit_stop_workers->nestDepth--;
932+
return;
933+
}
934+
935+
parent->workers=
936+
list_concat(parent->workers,on_commit_stop_workers->workers);
937+
}
938+
else
939+
{
940+
/*
941+
* Abandon everything that was done at this nesting level. Explicitly
942+
* free memory to avoid a transaction-lifespan leak.
943+
*/
944+
list_free_deep(on_commit_stop_workers->workers);
945+
}
946+
947+
/*
948+
* We have taken care of the current subtransaction workers list for both
949+
* abort or commit. So we are ready to pop the stack.
950+
*/
951+
pfree(on_commit_stop_workers);
952+
on_commit_stop_workers=parent;
953+
}
954+
855955
/*
856956
* Request wakeup of the launcher on commit of the transaction.
857957
*

‎src/include/replication/logicallauncher.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void);
2424
externvoidApplyLauncherWakeupAtCommit(void);
2525
externboolXactManipulatesLogicalReplicationWorkers(void);
2626
externvoidAtEOXact_ApplyLauncher(boolisCommit);
27+
externvoidAtEOSubXact_ApplyLauncher(boolisCommit,intnestDepth);
2728

2829
externboolIsLogicalLauncher(void);
2930

‎src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2112,6 +2112,7 @@ StdAnalyzeData
21122112
StdRdOptions
21132113
Step
21142114
StopList
2115+
StopWorkersData
21152116
StrategyNumber
21162117
StreamCtl
21172118
StringInfo

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp