Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite58e13e

Browse files
committed
Fix consistency issues with replication slot copy
Commit9f06d79's replication slot copying failed toproperly reserve the WAL that the slot is expecting to seeduring DecodingContextFindStartpoint (to set the confirmed_flushLSN), so concurrent activity could remove that WAL and cause thecopy process to error out. But it doesn't actually *need* thatWAL anyway: instead of running decode to find confirmed_flush, itcan be copied from the source slot. Fix this by rearranging thingsto avoid DecodingContextFindStartpoint() (leaving the target slot'sconfirmed_flush_lsn to invalid), and set that up afterwards by copyingfrom the target slot's value.Also ensure the source slot's confirmed_flush_lsn is valid.Reported-by: Arseny SherAuthor: Masahiko Sawada, Arseny SherDiscussion:https://postgr.es/m/871rr3ohbo.fsf@ars-thinkpad
1 parentbcd4600 commite58e13e

File tree

2 files changed

+41
-7
lines changed

2 files changed

+41
-7
lines changed

‎src/backend/replication/logical/logical.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,8 @@ StartupDecodingContext(List *output_plugin_options,
211211
*
212212
* plugin -- contains the name of the output plugin
213213
* output_plugin_options -- contains options passed to the output plugin
214+
* need_full_snapshot -- if true, must obtain a snapshot able to read all
215+
*tables; if false, one that can read only catalogs is acceptable.
214216
* restart_lsn -- if given as invalid, it's this routine's responsibility to
215217
*mark WAL as reserved by setting a convenient restart_lsn for the slot.
216218
*Otherwise, we set for decoding to start from the given LSN without

‎src/backend/replication/slotfuncs.c

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,14 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
118118
* Helper function for creating a new logical replication slot with
119119
* given arguments. Note that this function doesn't release the created
120120
* slot.
121+
*
122+
* When find_startpoint is false, the slot's confirmed_flush is not set; it's
123+
* caller's responsibility to ensure it's set to something sensible.
121124
*/
122125
staticvoid
123126
create_logical_replication_slot(char*name,char*plugin,
124-
booltemporary,XLogRecPtrrestart_lsn)
127+
booltemporary,XLogRecPtrrestart_lsn,
128+
boolfind_startpoint)
125129
{
126130
LogicalDecodingContext*ctx=NULL;
127131

@@ -139,16 +143,24 @@ create_logical_replication_slot(char *name, char *plugin,
139143
temporary ?RS_TEMPORARY :RS_EPHEMERAL);
140144

141145
/*
142-
* Create logical decoding context, to build the initial snapshot.
146+
* Create logical decoding context to find start point or, if we don't
147+
* need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
148+
*
149+
* Note: when !find_startpoint this is still important, because it's at
150+
* this point that the output plugin is validated.
143151
*/
144152
ctx=CreateInitDecodingContext(plugin,NIL,
145-
false,/*do not build snapshot */
153+
false,/*just catalogs is OK */
146154
restart_lsn,
147155
logical_read_local_xlog_page,NULL,NULL,
148156
NULL);
149157

150-
/* build initial snapshot, might take a while */
151-
DecodingContextFindStartpoint(ctx);
158+
/*
159+
* If caller needs us to determine the decoding start point, do so now.
160+
* This might take a while.
161+
*/
162+
if (find_startpoint)
163+
DecodingContextFindStartpoint(ctx);
152164

153165
/* don't need the decoding context anymore */
154166
FreeDecodingContext(ctx);
@@ -179,7 +191,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
179191
create_logical_replication_slot(NameStr(*name),
180192
NameStr(*plugin),
181193
temporary,
182-
InvalidXLogRecPtr);
194+
InvalidXLogRecPtr,
195+
true);
183196

184197
values[0]=NameGetDatum(&MyReplicationSlot->data.name);
185198
values[1]=LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
@@ -691,10 +704,18 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
691704

692705
/* Create new slot and acquire it */
693706
if (logical_slot)
707+
{
708+
/*
709+
* We must not try to read WAL, since we haven't reserved it yet --
710+
* hence pass find_startpoint false. confirmed_flush will be set
711+
* below, by copying from the source slot.
712+
*/
694713
create_logical_replication_slot(NameStr(*dst_name),
695714
plugin,
696715
temporary,
697-
src_restart_lsn);
716+
src_restart_lsn,
717+
false);
718+
}
698719
else
699720
create_physical_replication_slot(NameStr(*dst_name),
700721
true,
@@ -711,6 +732,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
711732
TransactionIdcopy_xmin;
712733
TransactionIdcopy_catalog_xmin;
713734
XLogRecPtrcopy_restart_lsn;
735+
XLogRecPtrcopy_confirmed_flush;
714736
boolcopy_islogical;
715737
char*copy_name;
716738

@@ -722,6 +744,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
722744
copy_xmin=src->data.xmin;
723745
copy_catalog_xmin=src->data.catalog_xmin;
724746
copy_restart_lsn=src->data.restart_lsn;
747+
copy_confirmed_flush=src->data.confirmed_flush;
725748

726749
/* for existence check */
727750
copy_name=pstrdup(NameStr(src->data.name));
@@ -746,6 +769,14 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
746769
NameStr(*src_name)),
747770
errdetail("The source replication slot was modified incompatibly during the copy operation.")));
748771

772+
/* The source slot must have a consistent snapshot */
773+
if (src_islogical&&XLogRecPtrIsInvalid(copy_confirmed_flush))
774+
ereport(ERROR,
775+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
776+
errmsg("cannot copy unfinished logical replication slot \"%s\"",
777+
NameStr(*src_name)),
778+
errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
779+
749780
/* Install copied values again */
750781
SpinLockAcquire(&MyReplicationSlot->mutex);
751782
MyReplicationSlot->effective_xmin=copy_effective_xmin;
@@ -754,6 +785,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
754785
MyReplicationSlot->data.xmin=copy_xmin;
755786
MyReplicationSlot->data.catalog_xmin=copy_catalog_xmin;
756787
MyReplicationSlot->data.restart_lsn=copy_restart_lsn;
788+
MyReplicationSlot->data.confirmed_flush=copy_confirmed_flush;
757789
SpinLockRelease(&MyReplicationSlot->mutex);
758790

759791
ReplicationSlotMarkDirty();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp