@@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg
101101int attnum ;
102102}SlotErrCallbackArg ;
103103
104- static MemoryContext ApplyContext = NULL ;
105- MemoryContext ApplyCacheContext = NULL ;
104+ static MemoryContext ApplyMessageContext = NULL ;
105+ MemoryContext ApplyContext = NULL ;
106106
107107WalReceiverConn * wrconn = NULL ;
108108
@@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
145145/*
146146 * Make sure that we started local transaction.
147147 *
148- * Also switches toApplyContext as necessary.
148+ * Also switches toApplyMessageContext as necessary.
149149 */
150150static bool
151151ensure_transaction (void )
152152{
153153if (IsTransactionState ())
154154{
155- if (CurrentMemoryContext != ApplyContext )
156- MemoryContextSwitchTo (ApplyContext );
155+ if (CurrentMemoryContext != ApplyMessageContext )
156+ MemoryContextSwitchTo (ApplyMessageContext );
157+
157158return false;
158159}
159160
@@ -162,7 +163,7 @@ ensure_transaction(void)
162163if (!MySubscriptionValid )
163164reread_subscription ();
164165
165- MemoryContextSwitchTo (ApplyContext );
166+ MemoryContextSwitchTo (ApplyMessageContext );
166167return true;
167168}
168169
@@ -961,15 +962,15 @@ store_flush_position(XLogRecPtr remote_lsn)
961962FlushPosition * flushpos ;
962963
963964/* Need to do this in permanent context */
964- MemoryContextSwitchTo (ApplyCacheContext );
965+ MemoryContextSwitchTo (ApplyContext );
965966
966967/* Track commit lsn */
967968flushpos = (FlushPosition * )palloc (sizeof (FlushPosition ));
968969flushpos -> local_end = XactLastCommitEnd ;
969970flushpos -> remote_end = remote_lsn ;
970971
971972dlist_push_tail (& lsn_mapping ,& flushpos -> node );
972- MemoryContextSwitchTo (ApplyContext );
973+ MemoryContextSwitchTo (ApplyMessageContext );
973974}
974975
975976
@@ -993,12 +994,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
993994static void
994995LogicalRepApplyLoop (XLogRecPtr last_received )
995996{
996- /* Init the ApplyContext which we use for easier cleanup. */
997- ApplyContext = AllocSetContextCreate (TopMemoryContext ,
998- "ApplyContext" ,
999- ALLOCSET_DEFAULT_MINSIZE ,
1000- ALLOCSET_DEFAULT_INITSIZE ,
1001- ALLOCSET_DEFAULT_MAXSIZE );
997+ /*
998+ * Init the ApplyMessageContext which we clean up after each
999+ * replication protocol message.
1000+ */
1001+ ApplyMessageContext = AllocSetContextCreate (ApplyContext ,
1002+ "ApplyMessageContext" ,
1003+ ALLOCSET_DEFAULT_SIZES );
10021004
10031005/* mark as idle, before starting to loop */
10041006pgstat_report_activity (STATE_IDLE ,NULL );
@@ -1013,7 +1015,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
10131015TimestampTz last_recv_timestamp = GetCurrentTimestamp ();
10141016bool ping_sent = false;
10151017
1016- MemoryContextSwitchTo (ApplyContext );
1018+ MemoryContextSwitchTo (ApplyMessageContext );
10171019
10181020len = walrcv_receive (wrconn ,& buf ,& fd );
10191021
@@ -1045,7 +1047,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
10451047ping_sent = false;
10461048
10471049/* Ensure we are reading the data into our memory context. */
1048- MemoryContextSwitchTo (ApplyContext );
1050+ MemoryContextSwitchTo (ApplyMessageContext );
10491051
10501052s .data = buf ;
10511053s .len = len ;
@@ -1091,6 +1093,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
10911093UpdateWorkerStats (last_received ,timestamp , true);
10921094}
10931095/* other message types are purposefully ignored */
1096+
1097+ MemoryContextReset (ApplyMessageContext );
10941098}
10951099
10961100len = walrcv_receive (wrconn ,& buf ,& fd );
@@ -1115,7 +1119,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
11151119}
11161120
11171121/* Cleanup the memory. */
1118- MemoryContextResetAndDeleteChildren (ApplyContext );
1122+ MemoryContextResetAndDeleteChildren (ApplyMessageContext );
11191123MemoryContextSwitchTo (TopMemoryContext );
11201124
11211125/* Check if we need to exit the streaming loop. */
@@ -1258,7 +1262,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
12581262
12591263if (!reply_message )
12601264{
1261- MemoryContext oldctx = MemoryContextSwitchTo (ApplyCacheContext );
1265+ MemoryContext oldctx = MemoryContextSwitchTo (ApplyContext );
12621266reply_message = makeStringInfo ();
12631267MemoryContextSwitchTo (oldctx );
12641268}
@@ -1308,7 +1312,7 @@ reread_subscription(void)
13081312}
13091313
13101314/* Ensure allocations in permanent context. */
1311- oldctx = MemoryContextSwitchTo (ApplyCacheContext );
1315+ oldctx = MemoryContextSwitchTo (ApplyContext );
13121316
13131317newsub = GetSubscription (MyLogicalRepWorker -> subid , true);
13141318
@@ -1483,12 +1487,11 @@ ApplyWorkerMain(Datum main_arg)
14831487MyLogicalRepWorker -> userid );
14841488
14851489/* Load the subscription into persistent memory context. */
1486- CreateCacheMemoryContext ();
1487- ApplyCacheContext = AllocSetContextCreate (CacheMemoryContext ,
1488- "ApplyCacheContext" ,
1490+ ApplyContext = AllocSetContextCreate (TopMemoryContext ,
1491+ "ApplyContext" ,
14891492ALLOCSET_DEFAULT_SIZES );
14901493StartTransactionCommand ();
1491- oldctx = MemoryContextSwitchTo (ApplyCacheContext );
1494+ oldctx = MemoryContextSwitchTo (ApplyContext );
14921495MySubscription = GetSubscription (MyLogicalRepWorker -> subid , false);
14931496MySubscriptionValid = true;
14941497MemoryContextSwitchTo (oldctx );
@@ -1533,7 +1536,7 @@ ApplyWorkerMain(Datum main_arg)
15331536syncslotname = LogicalRepSyncTableStart (& origin_startpos );
15341537
15351538/* The slot name needs to be allocated in permanent memory context. */
1536- oldctx = MemoryContextSwitchTo (ApplyCacheContext );
1539+ oldctx = MemoryContextSwitchTo (ApplyContext );
15371540myslotname = pstrdup (syncslotname );
15381541MemoryContextSwitchTo (oldctx );
15391542