Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0fdab27

Browse files
committed
Allow logical decoding on standbys
Unsurprisingly, this requires wal_level = logical to be set on the primary andstandby. The infrastructure added in2666975 ensures that slots areinvalidated if the primary's wal_level is lowered.Creating a slot on a standby waits for a xl_running_xact record to beprocessed. If the primary is idle (and thus not emitting xl_running_xactrecords), that can take a while. To make that faster, this commit alsointroduces the pg_log_standby_snapshot() function. By executing it on theprimary, completion of slot creation on the standby can be accelerated.Note that logical decoding on a standby does not itself enforce that requiredcatalog rows are not removed. The user has to use physical replication slots +hot_standby_feedback or other measures to prevent that. If catalog rowsrequired for a slot are removed, the slot is invalidated.See6af1793 for an overall design of logical decoding on a standby.Bumps catversion, for the addition of the pg_log_standby_snapshot() function.Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>Author: Andres Freund <andres@anarazel.de> (in an older version)Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version)Reviewed-by: Andres Freund <andres@anarazel.de>Reviewed-by: FabrÌzio de Royes Mello <fabriziomello@gmail.com>Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>Reviewed-By: Robert Haas <robertmhaas@gmail.com>
1 parente101dfa commit0fdab27

File tree

12 files changed

+202
-61
lines changed

12 files changed

+202
-61
lines changed

‎doc/src/sgml/func.sgml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27074,6 +27074,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
2707427074
prepared with <xref linkend="sql-prepare-transaction"/>.
2707527075
</para></entry>
2707627076
</row>
27077+
<row>
27078+
<entry role="func_table_entry"><para role="func_signature">
27079+
<indexterm>
27080+
<primary>pg_log_standby_snapshot</primary>
27081+
</indexterm>
27082+
<function>pg_log_standby_snapshot</function> ()
27083+
<returnvalue>pg_lsn</returnvalue>
27084+
</para>
27085+
<para>
27086+
Take a snapshot of running transactions and write it to WAL, without
27087+
having to wait bgwriter or checkpointer to log one. This is useful for
27088+
logical decoding on standby, as logical slot creation has to wait
27089+
until such a record is replayed on the standby.
27090+
</para></entry>
27091+
</row>
2707727092
</tbody>
2707827093
</tgroup>
2707927094
</table>

‎doc/src/sgml/logicaldecoding.sgml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,33 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
316316
may consume changes from a slot at any given time.
317317
</para>
318318

319+
<para>
320+
A logical replication slot can also be created on a hot standby. To prevent
321+
<command>VACUUM</command> from removing required rows from the system
322+
catalogs, <varname>hot_standby_feedback</varname> should be set on the
323+
standby. In spite of that, if any required rows get removed, the slot gets
324+
invalidated. It's highly recommended to use a physical slot between the primary
325+
and the standby. Otherwise, hot_standby_feedback will work, but only while the
326+
connection is alive (for example a node restart would break it). Then, the
327+
primary may delete system catalog rows that could be needed by the logical
328+
decoding on the standby (as it does not know about the catalog_xmin on the
329+
standby). Existing logical slots on standby also get invalidated if wal_level
330+
on primary is reduced to less than 'logical'. This is done as soon as the
331+
standby detects such a change in the WAL stream. It means, that for walsenders
332+
that are lagging (if any), some WAL records up to the wal_level parameter change
333+
on the primary won't be decoded.
334+
</para>
335+
336+
<para>
337+
Creation of a logical slot requires information about all the currently
338+
running transactions. On the primary, this information is available
339+
directly, but on a standby, this information has to be obtained from
340+
primary. Thus, slot creation may need to wait for some activity to happen
341+
on the primary. If the primary is idle, creating a logical slot on
342+
standby may take noticeable time. This can be sped up by calling the
343+
<function>pg_log_standby_snapshot</function> on the primary.
344+
</para>
345+
319346
<caution>
320347
<para>
321348
Replication slots persist across crashes and know nothing about the state

‎src/backend/access/transam/xlog.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4469,6 +4469,17 @@ LocalProcessControlFile(bool reset)
44694469
ReadControlFile();
44704470
}
44714471

4472+
/*
4473+
* Get the wal_level from the control file. For a standby, this value should be
4474+
* considered as its active wal_level, because it may be different from what
4475+
* was originally configured on standby.
4476+
*/
4477+
WalLevel
4478+
GetActiveWalLevelOnStandby(void)
4479+
{
4480+
returnControlFile->wal_level;
4481+
}
4482+
44724483
/*
44734484
* Initialization of shared memory for XLOG
44744485
*/

‎src/backend/access/transam/xlogfuncs.c

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include"storage/fd.h"
3232
#include"storage/ipc.h"
3333
#include"storage/smgr.h"
34+
#include"storage/standby.h"
3435
#include"utils/builtins.h"
3536
#include"utils/guc.h"
3637
#include"utils/memutils.h"
@@ -196,6 +197,36 @@ pg_switch_wal(PG_FUNCTION_ARGS)
196197
PG_RETURN_LSN(switchpoint);
197198
}
198199

200+
/*
201+
* pg_log_standby_snapshot: call LogStandbySnapshot()
202+
*
203+
* Permission checking for this function is managed through the normal
204+
* GRANT system.
205+
*/
206+
Datum
207+
pg_log_standby_snapshot(PG_FUNCTION_ARGS)
208+
{
209+
XLogRecPtrrecptr;
210+
211+
if (RecoveryInProgress())
212+
ereport(ERROR,
213+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
214+
errmsg("recovery is in progress"),
215+
errhint("pg_log_standby_snapshot() cannot be executed during recovery.")));
216+
217+
if (!XLogStandbyInfoActive())
218+
ereport(ERROR,
219+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
220+
errmsg("pg_log_standby_snapshot() can only be used if wal_level >= replica")));
221+
222+
recptr=LogStandbySnapshot();
223+
224+
/*
225+
* As a convenience, return the WAL location of the last inserted record
226+
*/
227+
PG_RETURN_LSN(recptr);
228+
}
229+
199230
/*
200231
* pg_create_restore_point: a named point for restore
201232
*

‎src/backend/catalog/system_functions.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public;
644644

645645
REVOKE EXECUTEON FUNCTION pg_switch_wal()FROM public;
646646

647+
REVOKE EXECUTEON FUNCTION pg_log_standby_snapshot()FROM public;
648+
647649
REVOKE EXECUTEON FUNCTION pg_wal_replay_pause()FROM public;
648650

649651
REVOKE EXECUTEON FUNCTION pg_wal_replay_resume()FROM public;

‎src/backend/replication/logical/decode.c

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
152152
* can restart from there.
153153
*/
154154
break;
155+
caseXLOG_PARAMETER_CHANGE:
156+
{
157+
xl_parameter_change*xlrec=
158+
(xl_parameter_change*)XLogRecGetData(buf->record);
159+
160+
/*
161+
* If wal_level on the primary is reduced to less than
162+
* logical, we want to prevent existing logical slots from
163+
* being used. Existing logical slots on the standby get
164+
* invalidated when this WAL record is replayed; and further,
165+
* slot creation fails when wal_level is not sufficient; but
166+
* all these operations are not synchronized, so a logical
167+
* slot may creep in while the wal_level is being
168+
* reduced. Hence this extra check.
169+
*/
170+
if (xlrec->wal_level<WAL_LEVEL_LOGICAL)
171+
{
172+
/*
173+
* This can occur only on a standby, as a primary would
174+
* not allow to restart after changing wal_level < logical
175+
* if there is pre-existing logical slot.
176+
*/
177+
Assert(RecoveryInProgress());
178+
ereport(ERROR,
179+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
180+
errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary")));
181+
}
182+
break;
183+
}
155184
caseXLOG_NOOP:
156185
caseXLOG_NEXTOID:
157186
caseXLOG_SWITCH:
158187
caseXLOG_BACKUP_END:
159-
caseXLOG_PARAMETER_CHANGE:
160188
caseXLOG_RESTORE_POINT:
161189
caseXLOG_FPW_CHANGE:
162190
caseXLOG_FPI_FOR_HINT:

‎src/backend/replication/logical/logical.c

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
124124
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
125125
errmsg("logical decoding requires a database connection")));
126126

127-
/* ----
128-
* TODO: We got to change that someday soon...
129-
*
130-
* There's basically three things missing to allow this:
131-
* 1) We need to be able to correctly and quickly identify the timeline a
132-
* LSN belongs to
133-
* 2) We need to force hot_standby_feedback to be enabled at all times so
134-
* the primary cannot remove rows we need.
135-
* 3) support dropping replication slots referring to a database, in
136-
* dbase_redo. There can't be any active ones due to HS recovery
137-
* conflicts, so that should be relatively easy.
138-
* ----
139-
*/
140127
if (RecoveryInProgress())
141-
ereport(ERROR,
142-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
143-
errmsg("logical decoding cannot be used while in recovery")));
128+
{
129+
/*
130+
* This check may have race conditions, but whenever
131+
* XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
132+
* verify that there are no existing logical replication slots. And to
133+
* avoid races around creating a new slot,
134+
* CheckLogicalDecodingRequirements() is called once before creating
135+
* the slot, and once when logical decoding is initially starting up.
136+
*/
137+
if (GetActiveWalLevelOnStandby()<WAL_LEVEL_LOGICAL)
138+
ereport(ERROR,
139+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
140+
errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary")));
141+
}
144142
}
145143

146144
/*
@@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
342340
LogicalDecodingContext*ctx;
343341
MemoryContextold_context;
344342

343+
/*
344+
* On a standby, this check is also required while creating the
345+
* slot. Check the comments in the function.
346+
*/
347+
CheckLogicalDecodingRequirements();
348+
345349
/* shorter lines... */
346350
slot=MyReplicationSlot;
347351

‎src/backend/replication/slot.c

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
#include"access/transam.h"
4343
#include"access/xlog_internal.h"
44+
#include"access/xlogrecovery.h"
4445
#include"common/file_utils.h"
4546
#include"common/string.h"
4647
#include"miscadmin.h"
@@ -1192,37 +1193,28 @@ ReplicationSlotReserveWal(void)
11921193
/*
11931194
* For logical slots log a standby snapshot and start logical decoding
11941195
* at exactly that position. That allows the slot to start up more
1195-
* quickly.
1196+
* quickly. But on a standby we cannot do WAL writes, so just use the
1197+
* replay pointer; effectively, an attempt to create a logical slot on
1198+
* standby will cause it to wait for an xl_running_xact record to be
1199+
* logged independently on the primary, so that a snapshot can be
1200+
* built using the record.
11961201
*
1197-
*That's notneeded (or indeed helpful) for physical slots as they'll
1198-
* start replay at the last logged checkpoint anyway. Instead return
1199-
* the location of the last redo LSN. While that slightly increases
1200-
* the chance that we have to retry, it's where a base backup has to
1201-
* start replay at.
1202+
*None of this isneeded (or indeed helpful) for physical slots as
1203+
*they'llstart replay at the last logged checkpoint anyway. Instead
1204+
*returnthe location of the last redo LSN. While that slightly
1205+
*increasesthe chance that we have to retry, it's where a base
1206+
*backup has tostart replay at.
12021207
*/
1203-
if (!RecoveryInProgress()&&SlotIsLogical(slot))
1204-
{
1205-
XLogRecPtrflushptr;
1206-
1207-
/* start at current insert position */
1208+
if (SlotIsPhysical(slot))
1209+
restart_lsn=GetRedoRecPtr();
1210+
elseif (RecoveryInProgress())
1211+
restart_lsn=GetXLogReplayRecPtr(NULL);
1212+
else
12081213
restart_lsn=GetXLogInsertRecPtr();
1209-
SpinLockAcquire(&slot->mutex);
1210-
slot->data.restart_lsn=restart_lsn;
1211-
SpinLockRelease(&slot->mutex);
1212-
1213-
/* make sure we have enough information to start */
1214-
flushptr=LogStandbySnapshot();
12151214

1216-
/* and make sure it's fsynced to disk */
1217-
XLogFlush(flushptr);
1218-
}
1219-
else
1220-
{
1221-
restart_lsn=GetRedoRecPtr();
1222-
SpinLockAcquire(&slot->mutex);
1223-
slot->data.restart_lsn=restart_lsn;
1224-
SpinLockRelease(&slot->mutex);
1225-
}
1215+
SpinLockAcquire(&slot->mutex);
1216+
slot->data.restart_lsn=restart_lsn;
1217+
SpinLockRelease(&slot->mutex);
12261218

12271219
/* prevent WAL removal as fast as possible */
12281220
ReplicationSlotsComputeRequiredLSN();
@@ -1238,6 +1230,17 @@ ReplicationSlotReserveWal(void)
12381230
if (XLogGetLastRemovedSegno()<segno)
12391231
break;
12401232
}
1233+
1234+
if (!RecoveryInProgress()&&SlotIsLogical(slot))
1235+
{
1236+
XLogRecPtrflushptr;
1237+
1238+
/* make sure we have enough information to start */
1239+
flushptr=LogStandbySnapshot();
1240+
1241+
/* and make sure it's fsynced to disk */
1242+
XLogFlush(flushptr);
1243+
}
12411244
}
12421245

12431246
/*

‎src/backend/replication/walsender.c

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
906906
intcount;
907907
WALReadErrorerrinfo;
908908
XLogSegNosegno;
909-
TimeLineIDcurrTLI=GetWALInsertionTimeLine();
909+
TimeLineIDcurrTLI;
910+
911+
/*
912+
* Make sure we have enough WAL available before retrieving the current
913+
* timeline. This is needed to determine am_cascading_walsender accurately
914+
* which is needed to determine the current timeline.
915+
*/
916+
flushptr=WalSndWaitForWal(targetPagePtr+reqLen);
910917

911918
/*
912-
* Since logical decoding isonly permitted on aprimary server, weknow
913-
*that the current timeline ID can't be changing any more. If we did this
914-
*on a standby, we'd have to worry aboutthevalues we compute here
915-
*becoming invalid due to a promotion or timeline change.
919+
* Since logical decoding isalso permitted on astandby server, weneed
920+
*to check if the server is in recovery to decide how to get the current
921+
*timeline ID (so that it also coverthepromotion or timeline change
922+
*cases).
916923
*/
924+
am_cascading_walsender=RecoveryInProgress();
925+
926+
if (am_cascading_walsender)
927+
GetXLogReplayRecPtr(&currTLI);
928+
else
929+
currTLI=GetWALInsertionTimeLine();
930+
917931
XLogReadDetermineTimeline(state,targetPagePtr,reqLen,currTLI);
918932
sendTimeLineIsHistoric= (state->currTLI!=currTLI);
919933
sendTimeLine=state->currTLI;
920934
sendTimeLineValidUpto=state->currTLIValidUntil;
921935
sendTimeLineNextTLI=state->nextTLI;
922936

923-
/* make sure we have enough WAL available */
924-
flushptr=WalSndWaitForWal(targetPagePtr+reqLen);
925-
926937
/* fail if not (implies we are going to shut down) */
927938
if (flushptr<targetPagePtr+reqLen)
928939
return-1;
@@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
937948
cur_page,
938949
targetPagePtr,
939950
XLOG_BLCKSZ,
940-
state->seg.ws_tli,/* Pass the current TLI because only
941-
* WalSndSegmentOpen controls whether new
942-
* TLI is needed. */
951+
currTLI,/* Pass the current TLI because only
952+
* WalSndSegmentOpen controls whether new TLI
953+
* is needed. */
943954
&errinfo))
944955
WALReadRaiseError(&errinfo);
945956

@@ -3076,10 +3087,14 @@ XLogSendLogical(void)
30763087
* If first time through in this session, initialize flushPtr. Otherwise,
30773088
* we only need to update flushPtr if EndRecPtr is past it.
30783089
*/
3079-
if (flushPtr==InvalidXLogRecPtr)
3080-
flushPtr=GetFlushRecPtr(NULL);
3081-
elseif (logical_decoding_ctx->reader->EndRecPtr >=flushPtr)
3082-
flushPtr=GetFlushRecPtr(NULL);
3090+
if (flushPtr==InvalidXLogRecPtr||
3091+
logical_decoding_ctx->reader->EndRecPtr >=flushPtr)
3092+
{
3093+
if (am_cascading_walsender)
3094+
flushPtr=GetStandbyFlushRecPtr(NULL);
3095+
else
3096+
flushPtr=GetFlushRecPtr(NULL);
3097+
}
30833098

30843099
/* If EndRecPtr is still past our flushPtr, it means we caught up. */
30853100
if (logical_decoding_ctx->reader->EndRecPtr >=flushPtr)
@@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
31703185
receivePtr=GetWalRcvFlushRecPtr(NULL,&receiveTLI);
31713186
replayPtr=GetXLogReplayRecPtr(&replayTLI);
31723187

3173-
*tli=replayTLI;
3188+
if (tli)
3189+
*tli=replayTLI;
31743190

31753191
result=replayPtr;
31763192
if (receiveTLI==replayTLI&&receivePtr>replayPtr)

‎src/include/access/xlog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
230230
externvoidBootStrapXLOG(void);
231231
externvoidInitializeWalConsistencyChecking(void);
232232
externvoidLocalProcessControlFile(boolreset);
233+
externWalLevelGetActiveWalLevelOnStandby(void);
233234
externvoidStartupXLOG(void);
234235
externvoidShutdownXLOG(intcode,Datumarg);
235236
externvoidCreateCheckPoint(intflags);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp