Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit63c917c

Browse files
committed
[PGPRO-4074] Port LogicalDecodingCaughtUp callback.
tags: multimaster(cherry picked from commit d75ea7f66dfea0644d0bf98011f4bc2fd32bcd63)
1 parent6a640de commit63c917c

File tree

4 files changed

+47
-0
lines changed

4 files changed

+47
-0
lines changed

‎src/backend/replication/logical/logical.c

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,35 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
10401040
error_context_stack=errcallback.previous;
10411041
}
10421042

1043+
voidLogicalDecodingCaughtUp(LogicalDecodingContext*ctx)
1044+
{
1045+
LogicalErrorCallbackStatestate;
1046+
ErrorContextCallbackerrcallback;
1047+
1048+
if (ctx->callbacks.caughtup_cb==NULL)
1049+
return;
1050+
1051+
/* Push callback + info on the error context stack */
1052+
state.ctx=ctx;
1053+
state.callback_name="caughtup";
1054+
state.report_location=ctx->reader->EndRecPtr;
1055+
errcallback.callback=output_plugin_error_callback;
1056+
errcallback.arg= (void*)&state;
1057+
errcallback.previous=error_context_stack;
1058+
error_context_stack=&errcallback;
1059+
1060+
/* set output state */
1061+
ctx->accept_writes= true;
1062+
ctx->write_xid=InvalidTransactionId;
1063+
ctx->write_location=ctx->reader->EndRecPtr;
1064+
1065+
/* do the actual work: call callback */
1066+
ctx->callbacks.caughtup_cb(ctx);
1067+
1068+
/* Pop the error context stack */
1069+
error_context_stack=errcallback.previous;
1070+
}
1071+
10431072
/*
10441073
* Set the required catalog xmin horizon for historic snapshots in the current
10451074
* replication slot.

‎src/backend/replication/walsender.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,6 +1380,7 @@ WalSndWaitForWal(XLogRecPtr loc)
13801380
{
13811381
intwakeEvents;
13821382
staticXLogRecPtrRecentFlushPtr=InvalidXLogRecPtr;
1383+
boolcaughtup_cb_called= false;
13831384

13841385
/*
13851386
* Fast path to avoid acquiring the spinlock in case we already know we
@@ -1459,6 +1460,15 @@ WalSndWaitForWal(XLogRecPtr loc)
14591460

14601461
/* Waiting for new WAL. Since we need to wait, we're now caught up. */
14611462
WalSndCaughtUp= true;
1463+
/*
1464+
* Call cb only once: if it writes anyting, it'll probably call
1465+
* WalSndWriteData who sets latch, thus creating busy loop.
1466+
*/
1467+
if (!caughtup_cb_called&&logical_decoding_ctx)
1468+
{
1469+
LogicalDecodingCaughtUp(logical_decoding_ctx);
1470+
caughtup_cb_called= true;
1471+
}
14621472

14631473
/*
14641474
* Try to flush any pending output to the client.

‎src/include/replication/logical.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
120120
XLogRecPtrrestart_lsn);
121121
externvoidLogicalConfirmReceivedLocation(XLogRecPtrlsn);
122122

123+
externvoidLogicalDecodingCaughtUp(LogicalDecodingContext*ctx);
124+
123125
externboolfilter_by_origin_cb_wrapper(LogicalDecodingContext*ctx,RepOriginIdorigin_id);
124126

125127
#endif

‎src/include/replication/output_plugin.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
139139
*/
140140
typedefvoid (*LogicalDecodeShutdownCB) (structLogicalDecodingContext*ctx);
141141

142+
/*
143+
* Called when WAL sender caught up.
144+
*/
145+
typedefvoid (*LogicalDecodeCaughtUpCB) (structLogicalDecodingContext*ctx);
146+
142147
/*
143148
* Output plugin callbacks
144149
*/
@@ -157,6 +162,7 @@ typedef struct OutputPluginCallbacks
157162
LogicalDecodeAbortPreparedCBabort_prepared_cb;
158163
LogicalDecodeFilterByOriginCBfilter_by_origin_cb;
159164
LogicalDecodeShutdownCBshutdown_cb;
165+
LogicalDecodeCaughtUpCBcaughtup_cb;
160166
}OutputPluginCallbacks;
161167

162168
/* Functions in replication/logical/logical.c */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp