Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit32df1c9

Browse files
committed
Add subtransaction handling for table synchronization workers.
Since the old logic was completely unaware of subtransactions, achange made in a subsequently-aborted subtransaction would still causeworkers to be stopped at toplevel transaction commit. Fix that bymanaging a stack of worker lists rather than just one.Amit Khandekar and Robert HaasDiscussion:http://postgr.es/m/CAJ3gD9eaG_mWqiOTA2LfAug-VRNn1hrhf50Xi1YroxL37QkZNg@mail.gmail.com
1 parentf7cb284 commit32df1c9

File tree

4 files changed

+112
-8
lines changed

4 files changed

+112
-8
lines changed

‎src/backend/access/transam/xact.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4637,6 +4637,7 @@ CommitSubTransaction(void)
46374637
AtEOSubXact_HashTables(true,s->nestingLevel);
46384638
AtEOSubXact_PgStat(true,s->nestingLevel);
46394639
AtSubCommit_Snapshot(s->nestingLevel);
4640+
AtEOSubXact_ApplyLauncher(true,s->nestingLevel);
46404641

46414642
/*
46424643
* We need to restore the upper transaction's read-only state, in case the
@@ -4790,6 +4791,7 @@ AbortSubTransaction(void)
47904791
AtEOSubXact_HashTables(false,s->nestingLevel);
47914792
AtEOSubXact_PgStat(false,s->nestingLevel);
47924793
AtSubAbort_Snapshot(s->nestingLevel);
4794+
AtEOSubXact_ApplyLauncher(false,s->nestingLevel);
47934795
}
47944796

47954797
/*

‎src/backend/replication/logical/launcher.c

Lines changed: 108 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
7979
Oidrelid;
8080
}LogicalRepWorkerId;
8181

82-
staticList*on_commit_stop_workers=NIL;
82+
typedefstructStopWorkersData
83+
{
84+
intnestDepth;/* Sub-transaction nest level */
85+
List*workers;/* List of LogicalRepWorkerId */
86+
structStopWorkersData*parent;/* This need not be an immediate
87+
* subtransaction parent */
88+
}StopWorkersData;
89+
90+
/*
91+
* Stack of StopWorkersData elements. Each stack element contains the workers
92+
* to be stopped for that subtransaction.
93+
*/
94+
staticStopWorkersData*on_commit_stop_workers=NULL;
8395

8496
staticvoidApplyLauncherWakeup(void);
8597
staticvoidlogicalrep_launcher_onexit(intcode,Datumarg);
@@ -559,17 +571,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
559571
void
560572
logicalrep_worker_stop_at_commit(Oidsubid,Oidrelid)
561573
{
574+
intnestDepth=GetCurrentTransactionNestLevel();
562575
LogicalRepWorkerId*wid;
563576
MemoryContextoldctx;
564577

565578
/* Make sure we store the info in context that survives until commit. */
566579
oldctx=MemoryContextSwitchTo(TopTransactionContext);
567580

581+
/* Check that previous transactions were properly cleaned up. */
582+
Assert(on_commit_stop_workers==NULL||
583+
nestDepth >=on_commit_stop_workers->nestDepth);
584+
585+
/*
586+
* Push a new stack element if we don't already have one for the current
587+
* nestDepth.
588+
*/
589+
if (on_commit_stop_workers==NULL||
590+
nestDepth>on_commit_stop_workers->nestDepth)
591+
{
592+
StopWorkersData*newdata=palloc(sizeof(StopWorkersData));
593+
594+
newdata->nestDepth=nestDepth;
595+
newdata->workers=NIL;
596+
newdata->parent=on_commit_stop_workers;
597+
on_commit_stop_workers=newdata;
598+
}
599+
600+
/*
601+
* Finally add a new worker into the worker list of the current
602+
* subtransaction.
603+
*/
568604
wid=palloc(sizeof(LogicalRepWorkerId));
569605
wid->subid=subid;
570606
wid->relid=relid;
571-
572-
on_commit_stop_workers=lappend(on_commit_stop_workers,wid);
607+
on_commit_stop_workers->workers=
608+
lappend(on_commit_stop_workers->workers,wid);
573609

574610
MemoryContextSwitchTo(oldctx);
575611
}
@@ -823,7 +859,7 @@ ApplyLauncherShmemInit(void)
823859
bool
824860
XactManipulatesLogicalReplicationWorkers(void)
825861
{
826-
return (on_commit_stop_workers!=NIL);
862+
return (on_commit_stop_workers!=NULL);
827863
}
828864

829865
/*
@@ -832,15 +868,25 @@ XactManipulatesLogicalReplicationWorkers(void)
832868
void
833869
AtEOXact_ApplyLauncher(boolisCommit)
834870
{
871+
872+
Assert(on_commit_stop_workers==NULL||
873+
(on_commit_stop_workers->nestDepth==1&&
874+
on_commit_stop_workers->parent==NULL));
875+
835876
if (isCommit)
836877
{
837878
ListCell*lc;
838879

839-
foreach(lc,on_commit_stop_workers)
880+
if (on_commit_stop_workers!=NULL)
840881
{
841-
LogicalRepWorkerId*wid=lfirst(lc);
882+
List*workers=on_commit_stop_workers->workers;
883+
884+
foreach(lc,workers)
885+
{
886+
LogicalRepWorkerId*wid=lfirst(lc);
842887

843-
logicalrep_worker_stop(wid->subid,wid->relid);
888+
logicalrep_worker_stop(wid->subid,wid->relid);
889+
}
844890
}
845891

846892
if (on_commit_launcher_wakeup)
@@ -851,10 +897,64 @@ AtEOXact_ApplyLauncher(bool isCommit)
851897
* No need to pfree on_commit_stop_workers. It was allocated in
852898
* transaction memory context, which is going to be cleaned soon.
853899
*/
854-
on_commit_stop_workers=NIL;
900+
on_commit_stop_workers=NULL;
855901
on_commit_launcher_wakeup= false;
856902
}
857903

904+
/*
905+
* On commit, merge the current on_commit_stop_workers list into the
906+
* immediate parent, if present.
907+
* On rollback, discard the current on_commit_stop_workers list.
908+
* Pop out the stack.
909+
*/
910+
void
911+
AtEOSubXact_ApplyLauncher(boolisCommit,intnestDepth)
912+
{
913+
StopWorkersData*parent;
914+
915+
/* Exit immediately if there's no work to do at this level. */
916+
if (on_commit_stop_workers==NULL||
917+
on_commit_stop_workers->nestDepth<nestDepth)
918+
return;
919+
920+
Assert(on_commit_stop_workers->nestDepth==nestDepth);
921+
922+
parent=on_commit_stop_workers->parent;
923+
924+
if (isCommit)
925+
{
926+
/*
927+
* If the upper stack element is not an immediate parent
928+
* subtransaction, just decrement the notional nesting depth without
929+
* doing any real work. Else, we need to merge the current workers
930+
* list into the parent.
931+
*/
932+
if (!parent||parent->nestDepth<nestDepth-1)
933+
{
934+
on_commit_stop_workers->nestDepth--;
935+
return;
936+
}
937+
938+
parent->workers=
939+
list_concat(parent->workers,on_commit_stop_workers->workers);
940+
}
941+
else
942+
{
943+
/*
944+
* Abandon everything that was done at this nesting level. Explicitly
945+
* free memory to avoid a transaction-lifespan leak.
946+
*/
947+
list_free_deep(on_commit_stop_workers->workers);
948+
}
949+
950+
/*
951+
* We have taken care of the current subtransaction workers list for both
952+
* abort or commit. So we are ready to pop the stack.
953+
*/
954+
pfree(on_commit_stop_workers);
955+
on_commit_stop_workers=parent;
956+
}
957+
858958
/*
859959
* Request wakeup of the launcher on commit of the transaction.
860960
*

‎src/include/replication/logicallauncher.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void);
2424
externvoidApplyLauncherWakeupAtCommit(void);
2525
externboolXactManipulatesLogicalReplicationWorkers(void);
2626
externvoidAtEOXact_ApplyLauncher(boolisCommit);
27+
externvoidAtEOSubXact_ApplyLauncher(boolisCommit,intnestDepth);
2728

2829
externboolIsLogicalLauncher(void);
2930

‎src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2227,6 +2227,7 @@ StdAnalyzeData
22272227
StdRdOptions
22282228
Step
22292229
StopList
2230+
StopWorkersData
22302231
StrategyNumber
22312232
StreamCtl
22322233
StringInfo

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp