Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit489b96e

Browse files
committed
Improve memory use in logical replication apply
Previously, the memory used by the logical replication apply worker forprocessing messages would never be freed, so that could end up using alot of memory. To improve that, change the existing ApplyContext memorycontext to ApplyMessageContext and reset that after everymessage (similar to MessageContext used elsewhere). For consistency ofnaming, rename the ApplyCacheContext to ApplyContext.Author: Stas Kelvich <s.kelvich@postgrespro.ru>
1 parente0bf160 commit489b96e

File tree

3 files changed

+40
-26
lines changed

3 files changed

+40
-26
lines changed

‎src/backend/replication/logical/worker.c

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg
101101
intattnum;
102102
}SlotErrCallbackArg;
103103

104-
staticMemoryContextApplyContext=NULL;
105-
MemoryContextApplyCacheContext=NULL;
104+
staticMemoryContextApplyMessageContext=NULL;
105+
MemoryContextApplyContext=NULL;
106106

107107
WalReceiverConn*wrconn=NULL;
108108

@@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
145145
/*
146146
* Make sure that we started local transaction.
147147
*
148-
* Also switches toApplyContext as necessary.
148+
* Also switches toApplyMessageContext as necessary.
149149
*/
150150
staticbool
151151
ensure_transaction(void)
152152
{
153153
if (IsTransactionState())
154154
{
155-
if (CurrentMemoryContext!=ApplyContext)
156-
MemoryContextSwitchTo(ApplyContext);
155+
if (CurrentMemoryContext!=ApplyMessageContext)
156+
MemoryContextSwitchTo(ApplyMessageContext);
157+
157158
return false;
158159
}
159160

@@ -162,7 +163,7 @@ ensure_transaction(void)
162163
if (!MySubscriptionValid)
163164
reread_subscription();
164165

165-
MemoryContextSwitchTo(ApplyContext);
166+
MemoryContextSwitchTo(ApplyMessageContext);
166167
return true;
167168
}
168169

@@ -961,15 +962,15 @@ store_flush_position(XLogRecPtr remote_lsn)
961962
FlushPosition*flushpos;
962963

963964
/* Need to do this in permanent context */
964-
MemoryContextSwitchTo(ApplyCacheContext);
965+
MemoryContextSwitchTo(ApplyContext);
965966

966967
/* Track commit lsn */
967968
flushpos= (FlushPosition*)palloc(sizeof(FlushPosition));
968969
flushpos->local_end=XactLastCommitEnd;
969970
flushpos->remote_end=remote_lsn;
970971

971972
dlist_push_tail(&lsn_mapping,&flushpos->node);
972-
MemoryContextSwitchTo(ApplyContext);
973+
MemoryContextSwitchTo(ApplyMessageContext);
973974
}
974975

975976

@@ -993,12 +994,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
993994
staticvoid
994995
LogicalRepApplyLoop(XLogRecPtrlast_received)
995996
{
996-
/* Init the ApplyContext which we use for easier cleanup. */
997-
ApplyContext=AllocSetContextCreate(TopMemoryContext,
998-
"ApplyContext",
999-
ALLOCSET_DEFAULT_MINSIZE,
1000-
ALLOCSET_DEFAULT_INITSIZE,
1001-
ALLOCSET_DEFAULT_MAXSIZE);
997+
/*
998+
* Init the ApplyMessageContext which we clean up after each
999+
* replication protocol message.
1000+
*/
1001+
ApplyMessageContext=AllocSetContextCreate(ApplyContext,
1002+
"ApplyMessageContext",
1003+
ALLOCSET_DEFAULT_SIZES);
10021004

10031005
/* mark as idle, before starting to loop */
10041006
pgstat_report_activity(STATE_IDLE,NULL);
@@ -1013,7 +1015,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
10131015
TimestampTzlast_recv_timestamp=GetCurrentTimestamp();
10141016
boolping_sent= false;
10151017

1016-
MemoryContextSwitchTo(ApplyContext);
1018+
MemoryContextSwitchTo(ApplyMessageContext);
10171019

10181020
len=walrcv_receive(wrconn,&buf,&fd);
10191021

@@ -1045,7 +1047,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
10451047
ping_sent= false;
10461048

10471049
/* Ensure we are reading the data into our memory context. */
1048-
MemoryContextSwitchTo(ApplyContext);
1050+
MemoryContextSwitchTo(ApplyMessageContext);
10491051

10501052
s.data=buf;
10511053
s.len=len;
@@ -1091,6 +1093,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
10911093
UpdateWorkerStats(last_received,timestamp, true);
10921094
}
10931095
/* other message types are purposefully ignored */
1096+
1097+
MemoryContextReset(ApplyMessageContext);
10941098
}
10951099

10961100
len=walrcv_receive(wrconn,&buf,&fd);
@@ -1115,7 +1119,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
11151119
}
11161120

11171121
/* Cleanup the memory. */
1118-
MemoryContextResetAndDeleteChildren(ApplyContext);
1122+
MemoryContextResetAndDeleteChildren(ApplyMessageContext);
11191123
MemoryContextSwitchTo(TopMemoryContext);
11201124

11211125
/* Check if we need to exit the streaming loop. */
@@ -1258,7 +1262,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
12581262

12591263
if (!reply_message)
12601264
{
1261-
MemoryContextoldctx=MemoryContextSwitchTo(ApplyCacheContext);
1265+
MemoryContextoldctx=MemoryContextSwitchTo(ApplyContext);
12621266
reply_message=makeStringInfo();
12631267
MemoryContextSwitchTo(oldctx);
12641268
}
@@ -1308,7 +1312,7 @@ reread_subscription(void)
13081312
}
13091313

13101314
/* Ensure allocations in permanent context. */
1311-
oldctx=MemoryContextSwitchTo(ApplyCacheContext);
1315+
oldctx=MemoryContextSwitchTo(ApplyContext);
13121316

13131317
newsub=GetSubscription(MyLogicalRepWorker->subid, true);
13141318

@@ -1483,12 +1487,11 @@ ApplyWorkerMain(Datum main_arg)
14831487
MyLogicalRepWorker->userid);
14841488

14851489
/* Load the subscription into persistent memory context. */
1486-
CreateCacheMemoryContext();
1487-
ApplyCacheContext=AllocSetContextCreate(CacheMemoryContext,
1488-
"ApplyCacheContext",
1490+
ApplyContext=AllocSetContextCreate(TopMemoryContext,
1491+
"ApplyContext",
14891492
ALLOCSET_DEFAULT_SIZES);
14901493
StartTransactionCommand();
1491-
oldctx=MemoryContextSwitchTo(ApplyCacheContext);
1494+
oldctx=MemoryContextSwitchTo(ApplyContext);
14921495
MySubscription=GetSubscription(MyLogicalRepWorker->subid, false);
14931496
MySubscriptionValid= true;
14941497
MemoryContextSwitchTo(oldctx);
@@ -1533,7 +1536,7 @@ ApplyWorkerMain(Datum main_arg)
15331536
syncslotname=LogicalRepSyncTableStart(&origin_startpos);
15341537

15351538
/* The slot name needs to be allocated in permanent memory context. */
1536-
oldctx=MemoryContextSwitchTo(ApplyCacheContext);
1539+
oldctx=MemoryContextSwitchTo(ApplyContext);
15371540
myslotname=pstrdup(syncslotname);
15381541
MemoryContextSwitchTo(oldctx);
15391542

‎src/backend/utils/mmgr/README

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,17 @@ from prepared statements simply reference the prepared statements' trees,
265265
and don't actually need any storage allocated in their private contexts.
266266

267267

268+
Logical Replication Worker Contexts
269+
-----------------------------------
270+
271+
ApplyContext --- permanent during whole lifetime of apply worker. It
272+
is possible to use TopMemoryContext here as well, but for simplicity
273+
of memory usage analysis we spin up different context.
274+
275+
ApplyMessageContext --- short-lived context that is reset after each
276+
logical replication protocol message is processed.
277+
278+
268279
Transient Contexts During Execution
269280
-----------------------------------
270281

‎src/include/replication/worker_internal.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ typedef struct LogicalRepWorker
5656
TimestampTzreply_time;
5757
}LogicalRepWorker;
5858

59-
/*Memorycontext forcached variables in apply worker. */
60-
externMemoryContextApplyCacheContext;
59+
/*Main memorycontext forapply worker. Permanent during worker lifetime. */
60+
externMemoryContextApplyContext;
6161

6262
/* libpqreceiver connection */
6363
externstructWalReceiverConn*wrconn;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp