@@ -154,30 +154,41 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
154
154
}
155
155
156
156
/*
157
- *Make sure that we started local transaction.
157
+ *Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
158
158
*
159
- * Also switches to ApplyMessageContext as necessary.
159
+ * Start a transaction, if this is the first step (else we keep using the
160
+ * existing transaction).
161
+ * Also provide a global snapshot and ensure we run in ApplyMessageContext.
160
162
*/
161
- static bool
162
- ensure_transaction (void )
163
+ static void
164
+ begin_replication_step (void )
163
165
{
164
- if (IsTransactionState ())
165
- {
166
- SetCurrentStatementStartTimestamp ();
167
-
168
- if (CurrentMemoryContext != ApplyMessageContext )
169
- MemoryContextSwitchTo (ApplyMessageContext );
166
+ SetCurrentStatementStartTimestamp ();
170
167
171
- return false;
168
+ if (!IsTransactionState ())
169
+ {
170
+ StartTransactionCommand ();
171
+ maybe_reread_subscription ();
172
172
}
173
173
174
- SetCurrentStatementStartTimestamp ();
175
- StartTransactionCommand ();
176
-
177
- maybe_reread_subscription ();
174
+ PushActiveSnapshot (GetTransactionSnapshot ());
178
175
179
176
MemoryContextSwitchTo (ApplyMessageContext );
180
- return true;
177
+ }
178
+
179
+ /*
180
+ * Finish up one step of a replication transaction.
181
+ * Callers of begin_replication_step() must also call this.
182
+ *
183
+ * We don't close out the transaction here, but we should increment
184
+ * the command counter to make the effects of this step visible.
185
+ */
186
+ static void
187
+ end_replication_step (void )
188
+ {
189
+ PopActiveSnapshot ();
190
+
191
+ CommandCounterIncrement ();
181
192
}
182
193
183
194
@@ -194,13 +205,6 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
194
205
ResultRelInfo * resultRelInfo ;
195
206
RangeTblEntry * rte ;
196
207
197
- /*
198
- * Input functions may need an active snapshot, as may AFTER triggers
199
- * invoked during finish_estate. For safety, ensure an active snapshot
200
- * exists throughout all our usage of the executor.
201
- */
202
- PushActiveSnapshot (GetTransactionSnapshot ());
203
-
204
208
estate = CreateExecutorState ();
205
209
206
210
rte = makeNode (RangeTblEntry );
@@ -241,7 +245,6 @@ finish_estate(EState *estate)
241
245
/* Cleanup. */
242
246
ExecResetTupleTable (estate -> es_tupleTable , false);
243
247
FreeExecutorState (estate );
244
- PopActiveSnapshot ();
245
248
}
246
249
247
250
/*
@@ -631,7 +634,7 @@ apply_handle_insert(StringInfo s)
631
634
TupleTableSlot * remoteslot ;
632
635
MemoryContext oldctx ;
633
636
634
- ensure_transaction ();
637
+ begin_replication_step ();
635
638
636
639
relid = logicalrep_read_insert (s ,& newtup );
637
640
rel = logicalrep_rel_open (relid ,RowExclusiveLock );
@@ -642,6 +645,7 @@ apply_handle_insert(StringInfo s)
642
645
* transaction so it's safe to unlock it.
643
646
*/
644
647
logicalrep_rel_close (rel ,RowExclusiveLock );
648
+ end_replication_step ();
645
649
return ;
646
650
}
647
651
@@ -668,7 +672,7 @@ apply_handle_insert(StringInfo s)
668
672
669
673
logicalrep_rel_close (rel ,NoLock );
670
674
671
- CommandCounterIncrement ();
675
+ end_replication_step ();
672
676
}
673
677
674
678
/*
@@ -727,7 +731,7 @@ apply_handle_update(StringInfo s)
727
731
bool found ;
728
732
MemoryContext oldctx ;
729
733
730
- ensure_transaction ();
734
+ begin_replication_step ();
731
735
732
736
relid = logicalrep_read_update (s ,& has_oldtup ,& oldtup ,
733
737
& newtup );
@@ -739,6 +743,7 @@ apply_handle_update(StringInfo s)
739
743
* transaction so it's safe to unlock it.
740
744
*/
741
745
logicalrep_rel_close (rel ,RowExclusiveLock );
746
+ end_replication_step ();
742
747
return ;
743
748
}
744
749
@@ -840,7 +845,7 @@ apply_handle_update(StringInfo s)
840
845
841
846
logicalrep_rel_close (rel ,NoLock );
842
847
843
- CommandCounterIncrement ();
848
+ end_replication_step ();
844
849
}
845
850
846
851
/*
@@ -862,7 +867,7 @@ apply_handle_delete(StringInfo s)
862
867
bool found ;
863
868
MemoryContext oldctx ;
864
869
865
- ensure_transaction ();
870
+ begin_replication_step ();
866
871
867
872
relid = logicalrep_read_delete (s ,& oldtup );
868
873
rel = logicalrep_rel_open (relid ,RowExclusiveLock );
@@ -873,6 +878,7 @@ apply_handle_delete(StringInfo s)
873
878
* transaction so it's safe to unlock it.
874
879
*/
875
880
logicalrep_rel_close (rel ,RowExclusiveLock );
881
+ end_replication_step ();
876
882
return ;
877
883
}
878
884
@@ -934,7 +940,7 @@ apply_handle_delete(StringInfo s)
934
940
935
941
logicalrep_rel_close (rel ,NoLock );
936
942
937
- CommandCounterIncrement ();
943
+ end_replication_step ();
938
944
}
939
945
940
946
/*
@@ -955,7 +961,7 @@ apply_handle_truncate(StringInfo s)
955
961
ListCell * lc ;
956
962
LOCKMODE lockmode = AccessExclusiveLock ;
957
963
958
- ensure_transaction ();
964
+ begin_replication_step ();
959
965
960
966
remote_relids = logicalrep_read_truncate (s ,& cascade ,& restart_seqs );
961
967
@@ -996,7 +1002,7 @@ apply_handle_truncate(StringInfo s)
996
1002
logicalrep_rel_close (rel ,NoLock );
997
1003
}
998
1004
999
- CommandCounterIncrement ();
1005
+ end_replication_step ();
1000
1006
}
1001
1007
1002
1008