@@ -169,30 +169,41 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
169169}
170170
171171/*
172- *Make sure that we started local transaction.
172+ *Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
173173 *
174- * Also switches to ApplyMessageContext as necessary.
174+ * Start a transaction, if this is the first step (else we keep using the
175+ * existing transaction).
176+ * Also provide a global snapshot and ensure we run in ApplyMessageContext.
175177 */
176- static bool
177- ensure_transaction (void )
178+ static void
179+ begin_replication_step (void )
178180{
179- if (IsTransactionState ())
180- {
181- SetCurrentStatementStartTimestamp ();
182-
183- if (CurrentMemoryContext != ApplyMessageContext )
184- MemoryContextSwitchTo (ApplyMessageContext );
181+ SetCurrentStatementStartTimestamp ();
185182
186- return false;
183+ if (!IsTransactionState ())
184+ {
185+ StartTransactionCommand ();
186+ maybe_reread_subscription ();
187187}
188188
189- SetCurrentStatementStartTimestamp ();
190- StartTransactionCommand ();
191-
192- maybe_reread_subscription ();
189+ PushActiveSnapshot (GetTransactionSnapshot ());
193190
194191MemoryContextSwitchTo (ApplyMessageContext );
195- return true;
192+ }
193+
194+ /*
195+ * Finish up one step of a replication transaction.
196+ * Callers of begin_replication_step() must also call this.
197+ *
198+ * We don't close out the transaction here, but we should increment
199+ * the command counter to make the effects of this step visible.
200+ */
201+ static void
202+ end_replication_step (void )
203+ {
204+ PopActiveSnapshot ();
205+
206+ CommandCounterIncrement ();
196207}
197208
198209
@@ -210,13 +221,6 @@ create_edata_for_relation(LogicalRepRelMapEntry *rel)
210221ResultRelInfo * resultRelInfo ;
211222RangeTblEntry * rte ;
212223
213- /*
214- * Input functions may need an active snapshot, as may AFTER triggers
215- * invoked during finish_edata. For safety, ensure an active snapshot
216- * exists throughout all our usage of the executor.
217- */
218- PushActiveSnapshot (GetTransactionSnapshot ());
219-
220224edata = (ApplyExecutionData * )palloc0 (sizeof (ApplyExecutionData ));
221225edata -> targetRel = rel ;
222226
@@ -277,8 +281,6 @@ finish_edata(ApplyExecutionData *edata)
277281ExecResetTupleTable (estate -> es_tupleTable , false);
278282FreeExecutorState (estate );
279283pfree (edata );
280-
281- PopActiveSnapshot ();
282284}
283285
284286/*
@@ -673,7 +675,7 @@ apply_handle_insert(StringInfo s)
673675TupleTableSlot * remoteslot ;
674676MemoryContext oldctx ;
675677
676- ensure_transaction ();
678+ begin_replication_step ();
677679
678680relid = logicalrep_read_insert (s ,& newtup );
679681rel = logicalrep_rel_open (relid ,RowExclusiveLock );
@@ -684,6 +686,7 @@ apply_handle_insert(StringInfo s)
684686 * transaction so it's safe to unlock it.
685687 */
686688logicalrep_rel_close (rel ,RowExclusiveLock );
689+ end_replication_step ();
687690return ;
688691}
689692
@@ -712,7 +715,7 @@ apply_handle_insert(StringInfo s)
712715
713716logicalrep_rel_close (rel ,NoLock );
714717
715- CommandCounterIncrement ();
718+ end_replication_step ();
716719}
717720
718721/* Workhorse for apply_handle_insert() */
@@ -781,7 +784,7 @@ apply_handle_update(StringInfo s)
781784RangeTblEntry * target_rte ;
782785MemoryContext oldctx ;
783786
784- ensure_transaction ();
787+ begin_replication_step ();
785788
786789relid = logicalrep_read_update (s ,& has_oldtup ,& oldtup ,
787790& newtup );
@@ -793,6 +796,7 @@ apply_handle_update(StringInfo s)
793796 * transaction so it's safe to unlock it.
794797 */
795798logicalrep_rel_close (rel ,RowExclusiveLock );
799+ end_replication_step ();
796800return ;
797801}
798802
@@ -849,7 +853,7 @@ apply_handle_update(StringInfo s)
849853
850854logicalrep_rel_close (rel ,NoLock );
851855
852- CommandCounterIncrement ();
856+ end_replication_step ();
853857}
854858
855859/* Workhorse for apply_handle_update() */
@@ -925,7 +929,7 @@ apply_handle_delete(StringInfo s)
925929TupleTableSlot * remoteslot ;
926930MemoryContext oldctx ;
927931
928- ensure_transaction ();
932+ begin_replication_step ();
929933
930934relid = logicalrep_read_delete (s ,& oldtup );
931935rel = logicalrep_rel_open (relid ,RowExclusiveLock );
@@ -936,6 +940,7 @@ apply_handle_delete(StringInfo s)
936940 * transaction so it's safe to unlock it.
937941 */
938942logicalrep_rel_close (rel ,RowExclusiveLock );
943+ end_replication_step ();
939944return ;
940945}
941946
@@ -966,7 +971,7 @@ apply_handle_delete(StringInfo s)
966971
967972logicalrep_rel_close (rel ,NoLock );
968973
969- CommandCounterIncrement ();
974+ end_replication_step ();
970975}
971976
972977/* Workhorse for apply_handle_delete() */
@@ -1291,7 +1296,7 @@ apply_handle_truncate(StringInfo s)
12911296ListCell * lc ;
12921297LOCKMODE lockmode = AccessExclusiveLock ;
12931298
1294- ensure_transaction ();
1299+ begin_replication_step ();
12951300
12961301remote_relids = logicalrep_read_truncate (s ,& cascade ,& restart_seqs );
12971302
@@ -1379,7 +1384,7 @@ apply_handle_truncate(StringInfo s)
13791384table_close (rel ,NoLock );
13801385}
13811386
1382- CommandCounterIncrement ();
1387+ end_replication_step ();
13831388}
13841389
13851390