Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9a7b7ad

Browse files
committed
Make logical WAL sender report streaming state appropriately
WAL senders sending logically-decoded data fail to properly report in"streaming" state when starting up, hence as long as one extra record isnot replayed, such WAL senders would remain in a "catchup" state, whichis inconsistent with the physical cousin.This can be easily reproduced by for example using pg_recvlogical andrestarting the upstream server. The TAP tests have been slightlymodified to detect the failure and strengthened so as future tests alsomake sure that a node is in streaming state when waiting for itscatchup.Backpatch down to 9.4 where this code has been introduced.Reported-by: Sawada MasahikoAuthor: Simon Riggs, Sawada MasahikoReviewed-by: Petr Jelinek, Michael Paquier, Vaishnavi PrabakaranDiscussion:https://postgr.es/m/CAD21AoB2ZbCCqOx=bgKMcLrAvs1V0ZMqzs7wBTuDySezTGtMZA@mail.gmail.com
1 parent39a9651 commit9a7b7ad

File tree

3 files changed

+23
-7
lines changed

3 files changed

+23
-7
lines changed

‎src/backend/replication/walsender.c

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2169,7 +2169,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
21692169
if (MyWalSnd->state==WALSNDSTATE_CATCHUP)
21702170
{
21712171
ereport(DEBUG1,
2172-
(errmsg("standby\"%s\" has now caught up withprimary",
2172+
(errmsg("\"%s\" has now caught up withupstream server",
21732173
application_name)));
21742174
WalSndSetState(WALSNDSTATE_STREAMING);
21752175
}
@@ -2758,10 +2758,10 @@ XLogSendLogical(void)
27582758
char*errm;
27592759

27602760
/*
2761-
* Don't know whether we've caught up yet. We'll setit to true in
2762-
* WalSndWaitForWal, if we're actually waiting. We also set to true if
2763-
* XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2764-
* i.e. when we're shutting down.
2761+
* Don't know whether we've caught up yet. We'll setWalSndCaughtUp to
2762+
*true inWalSndWaitForWal, if we're actually waiting. We also set to
2763+
*true ifXLogReadRecord() had to stop reading but WalSndWaitForWal
2764+
*didn't wait -i.e. when we're shutting down.
27652765
*/
27662766
WalSndCaughtUp= false;
27672767

@@ -2774,6 +2774,9 @@ XLogSendLogical(void)
27742774

27752775
if (record!=NULL)
27762776
{
2777+
/* XXX: Note that logical decoding cannot be used while in recovery */
2778+
XLogRecPtrflushPtr=GetFlushRecPtr();
2779+
27772780
/*
27782781
* Note the lack of any call to LagTrackerWrite() which is handled by
27792782
* WalSndUpdateProgress which is called by output plugin through
@@ -2782,6 +2785,13 @@ XLogSendLogical(void)
27822785
LogicalDecodingProcessRecord(logical_decoding_ctx,logical_decoding_ctx->reader);
27832786

27842787
sentPtr=logical_decoding_ctx->reader->EndRecPtr;
2788+
2789+
/*
2790+
* If we have sent a record that is at or beyond the flushed point, we
2791+
* have caught up.
2792+
*/
2793+
if (sentPtr >=flushPtr)
2794+
WalSndCaughtUp= true;
27852795
}
27862796
else
27872797
{

‎src/test/perl/PostgresNode.pm

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,7 +1535,8 @@ also works for logical subscriptions)
15351535
until its replication location in pg_stat_replication equals or passes the
15361536
upstream's WAL insert point at the time this function is called. By default
15371537
the replay_lsn is waited for, but 'mode' may be specified to wait for any of
1538-
sent|write|flush|replay.
1538+
sent|write|flush|replay. The connection catching up must be in a streaming
1539+
state.
15391540
15401541
If there is no active replication connection from this peer, waits until
15411542
poll_query_until timeout.
@@ -1580,7 +1581,7 @@ sub wait_for_catchup
15801581
.$lsn_expr ." on"
15811582
.$self->name ."\n";
15821583
my$query =
1583-
qq[SELECT$lsn_expr <=${mode}_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
1584+
qq[SELECT$lsn_expr <=${mode}_lsnAND state = 'streaming'FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
15841585
$self->poll_query_until('postgres',$query)
15851586
or croak"timed out waiting for catchup";
15861587
print"done\n";

‎src/test/subscription/t/001_rep_changes.pl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,11 @@
188188
"INSERT INTO tab_ins SELECT generate_series(1001,1100)");
189189
$node_publisher->safe_psql('postgres',"DELETE FROM tab_rep");
190190

191+
# Restart the publisher and check the state of the subscriber which
192+
# should be in a streaming state after catching up.
193+
$node_publisher->stop('fast');
194+
$node_publisher->start;
195+
191196
$node_publisher->wait_for_catchup($appname);
192197

193198
$result =$node_subscriber->safe_psql('postgres',

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp