Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit024711b

Browse files
Lag tracking for logical replication
Lag tracking is called for each commit, but we introducea pacing delay to ensure we don't swamp the lag tracker.Author: Petr Jelinek, with minor pacing delay code from me
1 parentefa2c18 commit024711b

File tree

7 files changed

+79
-23
lines changed

7 files changed

+79
-23
lines changed

‎src/backend/replication/logical/logical.c

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
117117
boolneed_full_snapshot,
118118
XLogPageReadCBread_page,
119119
LogicalOutputPluginWriterPrepareWriteprepare_write,
120-
LogicalOutputPluginWriterWritedo_write)
120+
LogicalOutputPluginWriterWritedo_write,
121+
LogicalOutputPluginWriterUpdateProgressupdate_progress)
121122
{
122123
ReplicationSlot*slot;
123124
MemoryContextcontext,
@@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options,
186187
ctx->out=makeStringInfo();
187188
ctx->prepare_write=prepare_write;
188189
ctx->write=do_write;
190+
ctx->update_progress=update_progress;
189191

190192
ctx->output_plugin_options=output_plugin_options;
191193

@@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options,
199201
*
200202
* plugin contains the name of the output plugin
201203
* output_plugin_options contains options passed to the output plugin
202-
* read_page, prepare_write, do_write are callbacks that have to be filled to
203-
*perform the use-case dependent, actual, work.
204+
* read_page, prepare_write, do_write, update_progress
205+
* callbacks that have to be filled to perform the use-case dependent,
206+
* actual, work.
204207
*
205208
* Needs to be called while in a memory context that's at least as long lived
206209
* as the decoding context because further memory contexts will be created
@@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin,
215218
boolneed_full_snapshot,
216219
XLogPageReadCBread_page,
217220
LogicalOutputPluginWriterPrepareWriteprepare_write,
218-
LogicalOutputPluginWriterWritedo_write)
221+
LogicalOutputPluginWriterWritedo_write,
222+
LogicalOutputPluginWriterUpdateProgressupdate_progress)
219223
{
220224
TransactionIdxmin_horizon=InvalidTransactionId;
221225
ReplicationSlot*slot;
@@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin,
300304

301305
ctx=StartupDecodingContext(NIL,InvalidXLogRecPtr,xmin_horizon,
302306
need_full_snapshot,read_page,prepare_write,
303-
do_write);
307+
do_write,update_progress);
304308

305309
/* call output plugin initialization callback */
306310
old_context=MemoryContextSwitchTo(ctx->context);
@@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
324328
* output_plugin_options
325329
*contains options passed to the output plugin.
326330
*
327-
* read_page, prepare_write, do_write
331+
* read_page, prepare_write, do_write, update_progress
328332
*callbacks that have to be filled to perform the use-case dependent,
329333
*actual work.
330334
*
@@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
340344
List*output_plugin_options,
341345
XLogPageReadCBread_page,
342346
LogicalOutputPluginWriterPrepareWriteprepare_write,
343-
LogicalOutputPluginWriterWritedo_write)
347+
LogicalOutputPluginWriterWritedo_write,
348+
LogicalOutputPluginWriterUpdateProgressupdate_progress)
344349
{
345350
LogicalDecodingContext*ctx;
346351
ReplicationSlot*slot;
@@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
390395

391396
ctx=StartupDecodingContext(output_plugin_options,
392397
start_lsn,InvalidTransactionId, false,
393-
read_page,prepare_write,do_write);
398+
read_page,prepare_write,do_write,
399+
update_progress);
394400

395401
/* call output plugin initialization callback */
396402
old_context=MemoryContextSwitchTo(ctx->context);
@@ -503,6 +509,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
503509
ctx->prepared_write= false;
504510
}
505511

512+
/*
513+
* Update progress tracking (if supported).
514+
*/
515+
void
516+
OutputPluginUpdateProgress(structLogicalDecodingContext*ctx)
517+
{
518+
if (!ctx->update_progress)
519+
return;
520+
521+
ctx->update_progress(ctx,ctx->write_location,ctx->write_xid);
522+
}
523+
506524
/*
507525
* Load the output plugin, lookup its output plugin init function, and check
508526
* that it provides the required callbacks.

‎src/backend/replication/logical/logicalfuncs.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
253253
options,
254254
logical_read_local_xlog_page,
255255
LogicalOutputPrepareWrite,
256-
LogicalOutputWrite);
256+
LogicalOutputWrite,NULL);
257257

258258
MemoryContextSwitchTo(oldcontext);
259259

‎src/backend/replication/pgoutput/pgoutput.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ static void
244244
pgoutput_commit_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
245245
XLogRecPtrcommit_lsn)
246246
{
247+
OutputPluginUpdateProgress(ctx);
248+
247249
OutputPluginPrepareWrite(ctx, true);
248250
logicalrep_write_commit(ctx->out,txn,commit_lsn);
249251
OutputPluginWrite(ctx, true);

‎src/backend/replication/slotfuncs.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
133133
*/
134134
ctx=CreateInitDecodingContext(NameStr(*plugin),NIL,
135135
false,/* do not build snapshot */
136-
logical_read_local_xlog_page,NULL,NULL);
136+
logical_read_local_xlog_page,NULL,NULL,
137+
NULL);
137138

138139
/* build initial snapshot, might take a while */
139140
DecodingContextFindStartpoint(ctx);

‎src/backend/replication/walsender.c

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,9 @@ static void WalSndCheckTimeOut(TimestampTz now);
245245
staticlongWalSndComputeSleeptime(TimestampTznow);
246246
staticvoidWalSndPrepareWrite(LogicalDecodingContext*ctx,XLogRecPtrlsn,TransactionIdxid,boollast_write);
247247
staticvoidWalSndWriteData(LogicalDecodingContext*ctx,XLogRecPtrlsn,TransactionIdxid,boollast_write);
248+
staticvoidWalSndUpdateProgress(LogicalDecodingContext*ctx,XLogRecPtrlsn,TransactionIdxid);
248249
staticXLogRecPtrWalSndWaitForWal(XLogRecPtrloc);
250+
staticvoidLagTrackerWrite(XLogRecPtrlsn,TimestampTzlocal_flush_time);
249251
staticTimeOffsetLagTrackerRead(inthead,XLogRecPtrlsn,TimestampTznow);
250252
staticboolTransactionIdInRecentPast(TransactionIdxid,uint32epoch);
251253

@@ -923,7 +925,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
923925

924926
ctx=CreateInitDecodingContext(cmd->plugin,NIL,need_full_snapshot,
925927
logical_read_xlog_page,
926-
WalSndPrepareWrite,WalSndWriteData);
928+
WalSndPrepareWrite,WalSndWriteData,
929+
WalSndUpdateProgress);
927930

928931
/*
929932
* Signal that we don't need the timeout mechanism. We're just
@@ -1077,10 +1080,11 @@ StartLogicalReplication(StartReplicationCmd *cmd)
10771080
* Initialize position to the last ack'ed one, then the xlog records begin
10781081
* to be shipped from that position.
10791082
*/
1080-
logical_decoding_ctx=CreateDecodingContext(
1081-
cmd->startpoint,cmd->options,
1083+
logical_decoding_ctx=CreateDecodingContext(cmd->startpoint,cmd->options,
10821084
logical_read_xlog_page,
1083-
WalSndPrepareWrite,WalSndWriteData);
1085+
WalSndPrepareWrite,
1086+
WalSndWriteData,
1087+
WalSndUpdateProgress);
10841088

10851089
/* Start reading WAL from the oldest required WAL. */
10861090
logical_startptr=MyReplicationSlot->data.restart_lsn;
@@ -1239,6 +1243,30 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
12391243
SetLatch(MyLatch);
12401244
}
12411245

1246+
/*
1247+
* LogicalDecodingContext 'progress_update' callback.
1248+
*
1249+
* Write the current position to the log tracker (see XLogSendPhysical).
1250+
*/
1251+
staticvoid
1252+
WalSndUpdateProgress(LogicalDecodingContext*ctx,XLogRecPtrlsn,TransactionIdxid)
1253+
{
1254+
staticTimestampTzsendTime=0;
1255+
TimestampTznow=GetCurrentTimestamp();
1256+
1257+
/*
1258+
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
1259+
* to avoid flooding the lag tracker when we commit frequently.
1260+
*/
1261+
#defineWALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1262+
if (!TimestampDifferenceExceeds(sendTime,now,
1263+
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1264+
return;
1265+
1266+
LagTrackerWrite(lsn,now);
1267+
sendTime=now;
1268+
}
1269+
12421270
/*
12431271
* Wait till WAL < loc is flushed to disk so it can be safely read.
12441272
*/
@@ -2730,9 +2758,9 @@ XLogSendLogical(void)
27302758
if (record!=NULL)
27312759
{
27322760
/*
2733-
* Note the lack of any call to LagTrackerWrite() which isthe responsibility
2734-
*of the logical decoding plugin. Response messages are handled normally,
2735-
*so this responsibility does not extend to needing to call LagTrackerRead().
2761+
* Note the lack of any call to LagTrackerWrite() which ishandled
2762+
*by WalSndUpdateProgress which is called by output plugin through
2763+
*logical decoding write api.
27362764
*/
27372765
LogicalDecodingProcessRecord(logical_decoding_ctx,logical_decoding_ctx->reader);
27382766

@@ -3328,9 +3356,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
33283356
* LagTrackerRead can compute the elapsed time (lag) when this WAL position is
33293357
* eventually reported to have been written, flushed and applied by the
33303358
* standby in a reply message.
3331-
* Exported to allow logical decoding plugins to call this when they choose.
33323359
*/
3333-
void
3360+
staticvoid
33343361
LagTrackerWrite(XLogRecPtrlsn,TimestampTzlocal_flush_time)
33353362
{
33363363
boolbuffer_full;

‎src/include/replication/logical.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ typedef void (*LogicalOutputPluginWriterWrite) (
2626

2727
typedefLogicalOutputPluginWriterWriteLogicalOutputPluginWriterPrepareWrite;
2828

29+
typedefvoid (*LogicalOutputPluginWriterUpdateProgress) (
30+
structLogicalDecodingContext*lr,
31+
XLogRecPtrPtr,
32+
TransactionIdxid
33+
);
34+
2935
typedefstructLogicalDecodingContext
3036
{
3137
/* memory context this is all allocated in */
@@ -52,6 +58,7 @@ typedef struct LogicalDecodingContext
5258
*/
5359
LogicalOutputPluginWriterPrepareWriteprepare_write;
5460
LogicalOutputPluginWriterWritewrite;
61+
LogicalOutputPluginWriterUpdateProgressupdate_progress;
5562

5663
/*
5764
* Output buffer.
@@ -85,13 +92,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
8592
boolneed_full_snapshot,
8693
XLogPageReadCBread_page,
8794
LogicalOutputPluginWriterPrepareWriteprepare_write,
88-
LogicalOutputPluginWriterWritedo_write);
95+
LogicalOutputPluginWriterWritedo_write,
96+
LogicalOutputPluginWriterUpdateProgressupdate_progress);
8997
externLogicalDecodingContext*CreateDecodingContext(
9098
XLogRecPtrstart_lsn,
9199
List*output_plugin_options,
92100
XLogPageReadCBread_page,
93101
LogicalOutputPluginWriterPrepareWriteprepare_write,
94-
LogicalOutputPluginWriterWritedo_write);
102+
LogicalOutputPluginWriterWritedo_write,
103+
LogicalOutputPluginWriterUpdateProgressupdate_progress);
95104
externvoidDecodingContextFindStartpoint(LogicalDecodingContext*ctx);
96105
externboolDecodingContextReady(LogicalDecodingContext*ctx);
97106
externvoidFreeDecodingContext(LogicalDecodingContext*ctx);
@@ -101,8 +110,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
101110
XLogRecPtrrestart_lsn);
102111
externvoidLogicalConfirmReceivedLocation(XLogRecPtrlsn);
103112

104-
externvoidLagTrackerWrite(XLogRecPtrlsn,TimestampTzlocal_flush_time);
105-
106113
externboolfilter_by_origin_cb_wrapper(LogicalDecodingContext*ctx,RepOriginIdorigin_id);
107114

108115
#endif

‎src/include/replication/output_plugin.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,5 +106,6 @@ typedef struct OutputPluginCallbacks
106106
/* Functions in replication/logical/logical.c */
107107
externvoidOutputPluginPrepareWrite(structLogicalDecodingContext*ctx,boollast_write);
108108
externvoidOutputPluginWrite(structLogicalDecodingContext*ctx,boollast_write);
109+
externvoidOutputPluginUpdateProgress(structLogicalDecodingContext*ctx);
109110

110111
#endif/* OUTPUT_PLUGIN_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp