@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
7979Oid relid ;
8080}LogicalRepWorkerId ;
8181
82- static List * on_commit_stop_workers = NIL ;
82+ typedef struct StopWorkersData
83+ {
84+ int nestDepth ;/* Sub-transaction nest level */
85+ List * workers ;/* List of LogicalRepWorkerId */
86+ struct StopWorkersData * parent ;/* This need not be an immediate
87+ * subtransaction parent */
88+ }StopWorkersData ;
89+
90+ /*
91+ * Stack of StopWorkersData elements. Each stack element contains the workers
92+ * to be stopped for that subtransaction.
93+ */
94+ static StopWorkersData * on_commit_stop_workers = NULL ;
8395
8496static void ApplyLauncherWakeup (void );
8597static void logicalrep_launcher_onexit (int code ,Datum arg );
@@ -558,17 +570,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
558570void
559571logicalrep_worker_stop_at_commit (Oid subid ,Oid relid )
560572{
573+ int nestDepth = GetCurrentTransactionNestLevel ();
561574LogicalRepWorkerId * wid ;
562575MemoryContext oldctx ;
563576
564577/* Make sure we store the info in context that survives until commit. */
565578oldctx = MemoryContextSwitchTo (TopTransactionContext );
566579
580+ /* Check that previous transactions were properly cleaned up. */
581+ Assert (on_commit_stop_workers == NULL ||
582+ nestDepth >=on_commit_stop_workers -> nestDepth );
583+
584+ /*
585+ * Push a new stack element if we don't already have one for the current
586+ * nestDepth.
587+ */
588+ if (on_commit_stop_workers == NULL ||
589+ nestDepth > on_commit_stop_workers -> nestDepth )
590+ {
591+ StopWorkersData * newdata = palloc (sizeof (StopWorkersData ));
592+
593+ newdata -> nestDepth = nestDepth ;
594+ newdata -> workers = NIL ;
595+ newdata -> parent = on_commit_stop_workers ;
596+ on_commit_stop_workers = newdata ;
597+ }
598+
599+ /*
600+ * Finally add a new worker into the worker list of the current
601+ * subtransaction.
602+ */
567603wid = palloc (sizeof (LogicalRepWorkerId ));
568604wid -> subid = subid ;
569605wid -> relid = relid ;
570-
571- on_commit_stop_workers = lappend (on_commit_stop_workers ,wid );
606+ on_commit_stop_workers -> workers =
607+ lappend (on_commit_stop_workers -> workers ,wid );
572608
573609MemoryContextSwitchTo (oldctx );
574610}
@@ -820,7 +856,7 @@ ApplyLauncherShmemInit(void)
820856bool
821857XactManipulatesLogicalReplicationWorkers (void )
822858{
823- return (on_commit_stop_workers != NIL );
859+ return (on_commit_stop_workers != NULL );
824860}
825861
826862/*
@@ -829,15 +865,25 @@ XactManipulatesLogicalReplicationWorkers(void)
829865void
830866AtEOXact_ApplyLauncher (bool isCommit )
831867{
868+
869+ Assert (on_commit_stop_workers == NULL ||
870+ (on_commit_stop_workers -> nestDepth == 1 &&
871+ on_commit_stop_workers -> parent == NULL ));
872+
832873if (isCommit )
833874{
834875ListCell * lc ;
835876
836- foreach ( lc , on_commit_stop_workers )
877+ if ( on_commit_stop_workers != NULL )
837878{
838- LogicalRepWorkerId * wid = lfirst (lc );
879+ List * workers = on_commit_stop_workers -> workers ;
880+
881+ foreach (lc ,workers )
882+ {
883+ LogicalRepWorkerId * wid = lfirst (lc );
839884
840- logicalrep_worker_stop (wid -> subid ,wid -> relid );
885+ logicalrep_worker_stop (wid -> subid ,wid -> relid );
886+ }
841887}
842888
843889if (on_commit_launcher_wakeup )
@@ -848,10 +894,64 @@ AtEOXact_ApplyLauncher(bool isCommit)
848894 * No need to pfree on_commit_stop_workers. It was allocated in
849895 * transaction memory context, which is going to be cleaned soon.
850896 */
851- on_commit_stop_workers = NIL ;
897+ on_commit_stop_workers = NULL ;
852898on_commit_launcher_wakeup = false;
853899}
854900
901+ /*
902+ * On commit, merge the current on_commit_stop_workers list into the
903+ * immediate parent, if present.
904+ * On rollback, discard the current on_commit_stop_workers list.
905+ * Pop out the stack.
906+ */
907+ void
908+ AtEOSubXact_ApplyLauncher (bool isCommit ,int nestDepth )
909+ {
910+ StopWorkersData * parent ;
911+
912+ /* Exit immediately if there's no work to do at this level. */
913+ if (on_commit_stop_workers == NULL ||
914+ on_commit_stop_workers -> nestDepth < nestDepth )
915+ return ;
916+
917+ Assert (on_commit_stop_workers -> nestDepth == nestDepth );
918+
919+ parent = on_commit_stop_workers -> parent ;
920+
921+ if (isCommit )
922+ {
923+ /*
924+ * If the upper stack element is not an immediate parent
925+ * subtransaction, just decrement the notional nesting depth without
926+ * doing any real work. Else, we need to merge the current workers
927+ * list into the parent.
928+ */
929+ if (!parent || parent -> nestDepth < nestDepth - 1 )
930+ {
931+ on_commit_stop_workers -> nestDepth -- ;
932+ return ;
933+ }
934+
935+ parent -> workers =
936+ list_concat (parent -> workers ,on_commit_stop_workers -> workers );
937+ }
938+ else
939+ {
940+ /*
941+ * Abandon everything that was done at this nesting level. Explicitly
942+ * free memory to avoid a transaction-lifespan leak.
943+ */
944+ list_free_deep (on_commit_stop_workers -> workers );
945+ }
946+
947+ /*
948+ * We have taken care of the current subtransaction workers list for both
949+ * abort or commit. So we are ready to pop the stack.
950+ */
951+ pfree (on_commit_stop_workers );
952+ on_commit_stop_workers = parent ;
953+ }
954+
855955/*
856956 * Request wakeup of the launcher on commit of the transaction.
857957 *