Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite101dfa

Browse files
committed
For cascading replication, wake physical and logical walsenders separately
Physical walsenders can't send data until it's been flushed; logicalwalsenders can't decode and send data until it's been applied. On thestandby, the WAL is flushed first, which will only wake up physicalwalsenders; and then applied, which will only wake up logicalwalsenders.Previously, all walsenders were awakened when the WAL was flushed. Thatwas fine for logical walsenders on the primary; but on the standby theflushed WAL would have been not applied yet, so logical walsenders wereawakened too early.Per idea from Jeff Davis and Amit Kapila.Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>Reviewed-By: Jeff Davis <pgsql@j-davis.com>Reviewed-By: Robert Haas <robertmhaas@gmail.com>Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>Discussion:https://postgr.es/m/CAA4eK1+zO5LUeisabX10c81LU-fWMKO4M9Wyg1cdkbW7Hqh6vQ@mail.gmail.com
1 parent2666975 commite101dfa

File tree

7 files changed

+84
-29
lines changed

7 files changed

+84
-29
lines changed

‎src/backend/access/transam/xlog.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
26452645
END_CRIT_SECTION();
26462646

26472647
/* wake up walsenders now that we've released heavily contended locks */
2648-
WalSndWakeupProcessRequests();
2648+
WalSndWakeupProcessRequests(true, !RecoveryInProgress());
26492649

26502650
/*
26512651
* If we still haven't flushed to the request point then we have a
@@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
28162816
END_CRIT_SECTION();
28172817

28182818
/* wake up walsenders now that we've released heavily contended locks */
2819-
WalSndWakeupProcessRequests();
2819+
WalSndWakeupProcessRequests(true, !RecoveryInProgress());
28202820

28212821
/*
28222822
* Great, done. To take some work off the critical path, try to initialize
@@ -5765,7 +5765,7 @@ StartupXLOG(void)
57655765
* If there were cascading standby servers connected to us, nudge any wal
57665766
* sender processes to notice that we've been promoted.
57675767
*/
5768-
WalSndWakeup();
5768+
WalSndWakeup(true, true);
57695769

57705770
/*
57715771
* If this was a promotion, request an (online) checkpoint now. This isn't

‎src/backend/access/transam/xlogarchive.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
421421
* if we restored something other than a WAL segment, but it does no harm
422422
* either.
423423
*/
424-
WalSndWakeup();
424+
WalSndWakeup(true, false);
425425
}
426426

427427
/*

‎src/backend/access/transam/xlogrecovery.c

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1935,6 +1935,31 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
19351935
XLogRecoveryCtl->lastReplayedTLI=*replayTLI;
19361936
SpinLockRelease(&XLogRecoveryCtl->info_lck);
19371937

1938+
/* ------
1939+
* Wakeup walsenders:
1940+
*
1941+
* On the standby, the WAL is flushed first (which will only wake up
1942+
* physical walsenders) and then applied, which will only wake up logical
1943+
* walsenders.
1944+
*
1945+
* Indeed, logical walsenders on standby can't decode and send data until
1946+
* it's been applied.
1947+
*
1948+
* Physical walsenders don't need to be woken up during replay unless
1949+
* cascading replication is allowed and time line change occurred (so that
1950+
* they can notice that they are on a new time line).
1951+
*
1952+
* That's why the wake up conditions are for:
1953+
*
1954+
* - physical walsenders in case of new time line and cascade
1955+
* replication is allowed
1956+
* - logical walsenders in case cascade replication is allowed (could not
1957+
* be created otherwise)
1958+
* ------
1959+
*/
1960+
if (AllowCascadeReplication())
1961+
WalSndWakeup(switchedTLI, true);
1962+
19381963
/*
19391964
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
19401965
* receiver so that it notices the updated lastReplayedEndRecPtr and sends
@@ -1958,12 +1983,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
19581983
*/
19591984
RemoveNonParentXlogFiles(xlogreader->EndRecPtr,*replayTLI);
19601985

1961-
/*
1962-
* Wake up any walsenders to notice that we are on a new timeline.
1963-
*/
1964-
if (AllowCascadeReplication())
1965-
WalSndWakeup();
1966-
19671986
/* Reset the prefetcher. */
19681987
XLogPrefetchReconfigure();
19691988
}
@@ -3050,9 +3069,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
30503069
{
30513070
/*
30523071
* When we find that WAL ends in an incomplete record, keep track
3053-
* of that record. After recovery is done, we'll write a record to
3054-
* indicate to downstream WAL readers that that portion is to be
3055-
* ignored.
3072+
* of that record. After recovery is done, we'll write a record
3073+
*toindicate to downstream WAL readers that that portion is to
3074+
*beignored.
30563075
*
30573076
* However, when ArchiveRecoveryRequested = true, we're going to
30583077
* switch to a new timeline at the end of recovery. We will only

‎src/backend/replication/walreceiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
10101010
/* Signal the startup process and walsender that new WAL has arrived */
10111011
WakeupRecovery();
10121012
if (AllowCascadeReplication())
1013-
WalSndWakeup();
1013+
WalSndWakeup(true, false);
10141014

10151015
/* Report XLOG streaming progress in PS display */
10161016
if (update_process_title)

‎src/backend/replication/walsender.c

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2603,6 +2603,23 @@ InitWalSenderSlot(void)
26032603
walsnd->sync_standby_priority=0;
26042604
walsnd->latch=&MyProc->procLatch;
26052605
walsnd->replyTime=0;
2606+
2607+
/*
2608+
* The kind assignment is done here and not in StartReplication()
2609+
* and StartLogicalReplication(). Indeed, the logical walsender
2610+
* needs to read WAL records (like snapshot of running
2611+
* transactions) during the slot creation. So it needs to be woken
2612+
* up based on its kind.
2613+
*
2614+
* The kind assignment could also be done in StartReplication(),
2615+
* StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
2616+
* seems better to set it on one place.
2617+
*/
2618+
if (MyDatabaseId==InvalidOid)
2619+
walsnd->kind=REPLICATION_KIND_PHYSICAL;
2620+
else
2621+
walsnd->kind=REPLICATION_KIND_LOGICAL;
2622+
26062623
SpinLockRelease(&walsnd->mutex);
26072624
/* don't need the lock anymore */
26082625
MyWalSnd= (WalSnd*)walsnd;
@@ -3280,30 +3297,46 @@ WalSndShmemInit(void)
32803297
}
32813298

32823299
/*
3283-
* Wake up all walsenders
3300+
* Wake up physical, logical or both kinds of walsenders
3301+
*
3302+
* The distinction between physical and logical walsenders is done, because:
3303+
* - physical walsenders can't send data until it's been flushed
3304+
* - logical walsenders on standby can't decode and send data until it's been
3305+
* applied
3306+
*
3307+
* For cascading replication we need to wake up physical walsenders separately
3308+
* from logical walsenders (see the comment before calling WalSndWakeup() in
3309+
* ApplyWalRecord() for more details).
32843310
*
32853311
* This will be called inside critical sections, so throwing an error is not
32863312
* advisable.
32873313
*/
32883314
void
3289-
WalSndWakeup(void)
3315+
WalSndWakeup(boolphysical,boollogical)
32903316
{
32913317
inti;
32923318

32933319
for (i=0;i<max_wal_senders;i++)
32943320
{
32953321
Latch*latch;
3322+
ReplicationKindkind;
32963323
WalSnd*walsnd=&WalSndCtl->walsnds[i];
32973324

32983325
/*
32993326
* Get latch pointer with spinlock held, for the unlikely case that
3300-
* pointer reads aren't atomic (as they're 8 bytes).
3327+
* pointer reads aren't atomic (as they're 8 bytes). While at it, also
3328+
* get kind.
33013329
*/
33023330
SpinLockAcquire(&walsnd->mutex);
33033331
latch=walsnd->latch;
3332+
kind=walsnd->kind;
33043333
SpinLockRelease(&walsnd->mutex);
33053334

3306-
if (latch!=NULL)
3335+
if (latch==NULL)
3336+
continue;
3337+
3338+
if ((physical&&kind==REPLICATION_KIND_PHYSICAL)||
3339+
(logical&&kind==REPLICATION_KIND_LOGICAL))
33073340
SetLatch(latch);
33083341
}
33093342
}

‎src/include/replication/walsender.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
4242
externvoidWalSndSignals(void);
4343
externSizeWalSndShmemSize(void);
4444
externvoidWalSndShmemInit(void);
45-
externvoidWalSndWakeup(void);
45+
externvoidWalSndWakeup(boolphysical,boollogical);
4646
externvoidWalSndInitStopping(void);
4747
externvoidWalSndWaitStopping(void);
4848
externvoidHandleWalSndInitStopping(void);
@@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
6060
/*
6161
* wakeup walsenders if there is work to be done
6262
*/
63-
#defineWalSndWakeupProcessRequests()\
64-
do\
65-
{\
66-
if (wake_wal_senders)\
67-
{\
68-
wake_wal_senders = false;\
69-
if (max_wal_senders > 0)\
70-
WalSndWakeup();\
71-
}\
72-
} while (0)
63+
staticinlinevoid
64+
WalSndWakeupProcessRequests(boolphysical,boollogical)
65+
{
66+
if (wake_wal_senders)
67+
{
68+
wake_wal_senders= false;
69+
if (max_wal_senders>0)
70+
WalSndWakeup(physical,logical);
71+
}
72+
}
7373

7474
#endif/* _WALSENDER_H */

‎src/include/replication/walsender_private.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include"access/xlog.h"
1616
#include"lib/ilist.h"
1717
#include"nodes/nodes.h"
18+
#include"nodes/replnodes.h"
1819
#include"replication/syncrep.h"
1920
#include"storage/latch.h"
2021
#include"storage/shmem.h"
@@ -79,6 +80,8 @@ typedef struct WalSnd
7980
* Timestamp of the last message received from standby.
8081
*/
8182
TimestampTzreplyTime;
83+
84+
ReplicationKindkind;
8285
}WalSnd;
8386

8487
externPGDLLIMPORTWalSnd*MyWalSnd;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp