Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2ec005b

Browse files
author
Amit Kapila
committed
Ensure that the sync slots reach a consistent state after promotion without losing data.
We were directly copying the LSN locations while syncing the slots on thestandby. Now, it is possible that at some particular restart_lsn there aresome running xacts, which means if we start reading the WAL from thatlocation after promotion, we won't reach a consistent snapshot state atthat point. However, on the primary, we would have already been in aconsistent snapshot state at that restart_lsn so we would have justserialized the existing snapshot.To avoid this problem we will use the advance_slot functionality unlessthe snapshot already exists at the synced restart_lsn location. This willhelp us to ensure that snapbuilder/slot statuses are updated properlywithout generating any changes. Note that the synced slot will remain asRS_TEMPORARY till the decoding from corresponding restart_lsn can reach aconsistent snapshot state after which they will be marked asRS_PERSISTENT.Per buildfarmAuthor: Hou ZhijieReviewed-by: Bertrand Drouvot, Shveta Malik, Bharath Rupireddy, Amit KapilaDiscussion:https://postgr.es/m/OS0PR01MB5716B3942AE49F3F725ACA92943B2@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parente37662f commit2ec005b

File tree

7 files changed

+351
-171
lines changed

7 files changed

+351
-171
lines changed

‎src/backend/replication/logical/logical.c

Lines changed: 143 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include"replication/decode.h"
3737
#include"replication/logical.h"
3838
#include"replication/reorderbuffer.h"
39+
#include"replication/slotsync.h"
3940
#include"replication/snapbuild.h"
4041
#include"storage/proc.h"
4142
#include"storage/procarray.h"
@@ -516,17 +517,23 @@ CreateDecodingContext(XLogRecPtr start_lsn,
516517
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
517518
errmsg("cannot use physical replication slot for logical decoding")));
518519

519-
if (slot->data.database!=MyDatabaseId)
520+
/*
521+
* We need to access the system tables during decoding to build the
522+
* logical changes unless we are in fast_forward mode where no changes are
523+
* generated.
524+
*/
525+
if (slot->data.database!=MyDatabaseId&& !fast_forward)
520526
ereport(ERROR,
521527
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
522528
errmsg("replication slot \"%s\" was not created in this database",
523529
NameStr(slot->data.name))));
524530

525531
/*
526-
* Do not allow consumption of a "synchronized" slot until the standby
527-
* gets promoted.
532+
* The slots being synced from the primary can't be used for decoding as
533+
* they are used after failover. However, we do allow advancing the LSNs
534+
* during the synchronization of slots. See update_local_synced_slot.
528535
*/
529-
if (RecoveryInProgress()&&slot->data.synced)
536+
if (RecoveryInProgress()&&slot->data.synced&& !IsSyncingReplicationSlots())
530537
ereport(ERROR,
531538
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
532539
errmsg("cannot use replication slot \"%s\" for logical decoding",
@@ -2034,3 +2041,135 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
20342041

20352042
returnhas_pending_wal;
20362043
}
2044+
2045+
/*
2046+
* Helper function for advancing our logical replication slot forward.
2047+
*
2048+
* The slot's restart_lsn is used as start point for reading records, while
2049+
* confirmed_flush is used as base point for the decoding context.
2050+
*
2051+
* We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
2052+
* because we need to digest WAL to advance restart_lsn allowing to recycle
2053+
* WAL and removal of old catalog tuples. As decoding is done in fast_forward
2054+
* mode, no changes are generated anyway.
2055+
*
2056+
* *found_consistent_snapshot will be true if the initial decoding snapshot has
2057+
* been built; Otherwise, it will be false.
2058+
*/
2059+
XLogRecPtr
2060+
LogicalSlotAdvanceAndCheckSnapState(XLogRecPtrmoveto,
2061+
bool*found_consistent_snapshot)
2062+
{
2063+
LogicalDecodingContext*ctx;
2064+
ResourceOwnerold_resowner=CurrentResourceOwner;
2065+
XLogRecPtrretlsn;
2066+
2067+
Assert(moveto!=InvalidXLogRecPtr);
2068+
2069+
if (found_consistent_snapshot)
2070+
*found_consistent_snapshot= false;
2071+
2072+
PG_TRY();
2073+
{
2074+
/*
2075+
* Create our decoding context in fast_forward mode, passing start_lsn
2076+
* as InvalidXLogRecPtr, so that we start processing from my slot's
2077+
* confirmed_flush.
2078+
*/
2079+
ctx=CreateDecodingContext(InvalidXLogRecPtr,
2080+
NIL,
2081+
true,/* fast_forward */
2082+
XL_ROUTINE(.page_read=read_local_xlog_page,
2083+
.segment_open=wal_segment_open,
2084+
.segment_close=wal_segment_close),
2085+
NULL,NULL,NULL);
2086+
2087+
/*
2088+
* Wait for specified streaming replication standby servers (if any)
2089+
* to confirm receipt of WAL up to moveto lsn.
2090+
*/
2091+
WaitForStandbyConfirmation(moveto);
2092+
2093+
/*
2094+
* Start reading at the slot's restart_lsn, which we know to point to
2095+
* a valid record.
2096+
*/
2097+
XLogBeginRead(ctx->reader,MyReplicationSlot->data.restart_lsn);
2098+
2099+
/* invalidate non-timetravel entries */
2100+
InvalidateSystemCaches();
2101+
2102+
/* Decode records until we reach the requested target */
2103+
while (ctx->reader->EndRecPtr<moveto)
2104+
{
2105+
char*errm=NULL;
2106+
XLogRecord*record;
2107+
2108+
/*
2109+
* Read records. No changes are generated in fast_forward mode,
2110+
* but snapbuilder/slot statuses are updated properly.
2111+
*/
2112+
record=XLogReadRecord(ctx->reader,&errm);
2113+
if (errm)
2114+
elog(ERROR,"could not find record while advancing replication slot: %s",
2115+
errm);
2116+
2117+
/*
2118+
* Process the record. Storage-level changes are ignored in
2119+
* fast_forward mode, but other modules (such as snapbuilder)
2120+
* might still have critical updates to do.
2121+
*/
2122+
if (record)
2123+
LogicalDecodingProcessRecord(ctx,ctx->reader);
2124+
2125+
CHECK_FOR_INTERRUPTS();
2126+
}
2127+
2128+
if (found_consistent_snapshot&&DecodingContextReady(ctx))
2129+
*found_consistent_snapshot= true;
2130+
2131+
/*
2132+
* Logical decoding could have clobbered CurrentResourceOwner during
2133+
* transaction management, so restore the executor's value. (This is
2134+
* a kluge, but it's not worth cleaning up right now.)
2135+
*/
2136+
CurrentResourceOwner=old_resowner;
2137+
2138+
if (ctx->reader->EndRecPtr!=InvalidXLogRecPtr)
2139+
{
2140+
LogicalConfirmReceivedLocation(moveto);
2141+
2142+
/*
2143+
* If only the confirmed_flush LSN has changed the slot won't get
2144+
* marked as dirty by the above. Callers on the walsender
2145+
* interface are expected to keep track of their own progress and
2146+
* don't need it written out. But SQL-interface users cannot
2147+
* specify their own start positions and it's harder for them to
2148+
* keep track of their progress, so we should make more of an
2149+
* effort to save it for them.
2150+
*
2151+
* Dirty the slot so it is written out at the next checkpoint. The
2152+
* LSN position advanced to may still be lost on a crash but this
2153+
* makes the data consistent after a clean shutdown.
2154+
*/
2155+
ReplicationSlotMarkDirty();
2156+
}
2157+
2158+
retlsn=MyReplicationSlot->data.confirmed_flush;
2159+
2160+
/* free context, call shutdown callback */
2161+
FreeDecodingContext(ctx);
2162+
2163+
InvalidateSystemCaches();
2164+
}
2165+
PG_CATCH();
2166+
{
2167+
/* clear all timetravel entries */
2168+
InvalidateSystemCaches();
2169+
2170+
PG_RE_THROW();
2171+
}
2172+
PG_END_TRY();
2173+
2174+
returnretlsn;
2175+
}

‎src/backend/replication/logical/slotsync.c

Lines changed: 96 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,15 @@
2525
* which slot sync worker can perform the sync periodically or user can call
2626
* pg_sync_replication_slots() periodically to perform the syncs.
2727
*
28+
* If synchronized slots fail to build a consistent snapshot from the
29+
* restart_lsn before reaching confirmed_flush_lsn, they would become
30+
* unreliable after promotion due to potential data loss from changes
31+
* before reaching a consistent point. This can happen because the slots can
32+
* be synced at some random time and we may not reach the consistent point
33+
* at the same WAL location as the primary. So, we mark such slots as
34+
* RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
35+
* consistent point, they will be marked as RS_PERSISTENT.
36+
*
2837
* The slot sync worker waits for some time before the next synchronization,
2938
* with the duration varying based on whether any slots were updated during
3039
* the last cycle. Refer to the comments above wait_for_slot_activity() for
@@ -49,8 +58,9 @@
4958
#include"postmaster/fork_process.h"
5059
#include"postmaster/interrupt.h"
5160
#include"postmaster/postmaster.h"
52-
#include"replication/slot.h"
61+
#include"replication/logical.h"
5362
#include"replication/slotsync.h"
63+
#include"replication/snapbuild.h"
5464
#include"storage/ipc.h"
5565
#include"storage/lmgr.h"
5666
#include"storage/proc.h"
@@ -147,50 +157,85 @@ static void slotsync_failure_callback(int code, Datum arg);
147157
*
148158
* If no update was needed (the data of the remote slot is the same as the
149159
* local slot) return false, otherwise true.
160+
*
161+
* *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
162+
* modified, and decoding from the corresponding LSN's can reach a
163+
* consistent snapshot.
150164
*/
151165
staticbool
152-
update_local_synced_slot(RemoteSlot*remote_slot,Oidremote_dbid)
166+
update_local_synced_slot(RemoteSlot*remote_slot,Oidremote_dbid,
167+
bool*found_consistent_snapshot)
153168
{
154169
ReplicationSlot*slot=MyReplicationSlot;
155-
boolxmin_changed;
156-
boolrestart_lsn_changed;
157-
NameDataplugin_name;
170+
boolslot_updated= false;
158171

159172
Assert(slot->data.invalidated==RS_INVAL_NONE);
160173

161-
xmin_changed= (remote_slot->catalog_xmin!=slot->data.catalog_xmin);
162-
restart_lsn_changed=(remote_slot->restart_lsn!=slot->data.restart_lsn);
174+
if (found_consistent_snapshot)
175+
*found_consistent_snapshot=false;
163176

164-
if (!xmin_changed&&
165-
!restart_lsn_changed&&
166-
remote_dbid==slot->data.database&&
167-
remote_slot->two_phase==slot->data.two_phase&&
168-
remote_slot->failover==slot->data.failover&&
169-
remote_slot->confirmed_lsn==slot->data.confirmed_flush&&
170-
strcmp(remote_slot->plugin,NameStr(slot->data.plugin))==0)
171-
return false;
177+
if (remote_slot->confirmed_lsn!=slot->data.confirmed_flush||
178+
remote_slot->restart_lsn!=slot->data.restart_lsn||
179+
remote_slot->catalog_xmin!=slot->data.catalog_xmin)
180+
{
181+
/*
182+
* We can't directly copy the remote slot's LSN or xmin unless there
183+
* exists a consistent snapshot at that point. Otherwise, after
184+
* promotion, the slots may not reach a consistent point before the
185+
* confirmed_flush_lsn which can lead to a data loss. To avoid data
186+
* loss, we let slot machinery advance the slot which ensures that
187+
* snapbuilder/slot statuses are updated properly.
188+
*/
189+
if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
190+
{
191+
/*
192+
* Update the slot info directly if there is a serialized snapshot
193+
* at the restart_lsn, as the slot can quickly reach consistency
194+
* at restart_lsn by restoring the snapshot.
195+
*/
196+
SpinLockAcquire(&slot->mutex);
197+
slot->data.restart_lsn=remote_slot->restart_lsn;
198+
slot->data.confirmed_flush=remote_slot->confirmed_lsn;
199+
slot->data.catalog_xmin=remote_slot->catalog_xmin;
200+
slot->effective_catalog_xmin=remote_slot->catalog_xmin;
201+
SpinLockRelease(&slot->mutex);
172202

173-
/* Avoid expensive operations while holding a spinlock. */
174-
namestrcpy(&plugin_name,remote_slot->plugin);
175-
176-
SpinLockAcquire(&slot->mutex);
177-
slot->data.plugin=plugin_name;
178-
slot->data.database=remote_dbid;
179-
slot->data.two_phase=remote_slot->two_phase;
180-
slot->data.failover=remote_slot->failover;
181-
slot->data.restart_lsn=remote_slot->restart_lsn;
182-
slot->data.confirmed_flush=remote_slot->confirmed_lsn;
183-
slot->data.catalog_xmin=remote_slot->catalog_xmin;
184-
slot->effective_catalog_xmin=remote_slot->catalog_xmin;
185-
SpinLockRelease(&slot->mutex);
186-
187-
if (xmin_changed)
188-
ReplicationSlotsComputeRequiredXmin(false);
203+
if (found_consistent_snapshot)
204+
*found_consistent_snapshot= true;
205+
}
206+
else
207+
{
208+
LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
209+
found_consistent_snapshot);
210+
}
189211

190-
if (restart_lsn_changed)
212+
ReplicationSlotsComputeRequiredXmin(false);
191213
ReplicationSlotsComputeRequiredLSN();
192214

193-
return true;
215+
slot_updated= true;
216+
}
217+
218+
if (remote_dbid!=slot->data.database||
219+
remote_slot->two_phase!=slot->data.two_phase||
220+
remote_slot->failover!=slot->data.failover||
221+
strcmp(remote_slot->plugin,NameStr(slot->data.plugin))!=0)
222+
{
223+
NameDataplugin_name;
224+
225+
/* Avoid expensive operations while holding a spinlock. */
226+
namestrcpy(&plugin_name,remote_slot->plugin);
227+
228+
SpinLockAcquire(&slot->mutex);
229+
slot->data.plugin=plugin_name;
230+
slot->data.database=remote_dbid;
231+
slot->data.two_phase=remote_slot->two_phase;
232+
slot->data.failover=remote_slot->failover;
233+
SpinLockRelease(&slot->mutex);
234+
235+
slot_updated= true;
236+
}
237+
238+
returnslot_updated;
194239
}
195240

196241
/*
@@ -413,6 +458,7 @@ static bool
413458
update_and_persist_local_synced_slot(RemoteSlot*remote_slot,Oidremote_dbid)
414459
{
415460
ReplicationSlot*slot=MyReplicationSlot;
461+
boolfound_consistent_snapshot= false;
416462

417463
/*
418464
* Check if the primary server has caught up. Refer to the comment atop
@@ -443,9 +489,22 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
443489
return false;
444490
}
445491

446-
/* First time slot update, the function must return true */
447-
if (!update_local_synced_slot(remote_slot,remote_dbid))
448-
elog(ERROR,"failed to update slot");
492+
(void)update_local_synced_slot(remote_slot,remote_dbid,
493+
&found_consistent_snapshot);
494+
495+
/*
496+
* Don't persist the slot if it cannot reach the consistent point from the
497+
* restart_lsn. See comments atop this file.
498+
*/
499+
if (!found_consistent_snapshot)
500+
{
501+
ereport(LOG,
502+
errmsg("could not sync slot \"%s\"",remote_slot->name),
503+
errdetail("Logical decoding cannot find consistent point from local slot's LSN %X/%X.",
504+
LSN_FORMAT_ARGS(slot->data.restart_lsn)));
505+
506+
return false;
507+
}
449508

450509
ReplicationSlotPersist();
451510

@@ -578,7 +637,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
578637
LSN_FORMAT_ARGS(remote_slot->restart_lsn));
579638

580639
/* Make sure the slot changes persist across server restart */
581-
if (update_local_synced_slot(remote_slot,remote_dbid))
640+
if (update_local_synced_slot(remote_slot,remote_dbid,NULL))
582641
{
583642
ReplicationSlotMarkDirty();
584643
ReplicationSlotSave();

‎src/backend/replication/logical/snapbuild.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2134,3 +2134,26 @@ CheckPointSnapBuild(void)
21342134
}
21352135
FreeDir(snap_dir);
21362136
}
2137+
2138+
/*
2139+
* Check if a logical snapshot at the specified point has been serialized.
2140+
*/
2141+
bool
2142+
SnapBuildSnapshotExists(XLogRecPtrlsn)
2143+
{
2144+
charpath[MAXPGPATH];
2145+
intret;
2146+
structstatstat_buf;
2147+
2148+
sprintf(path,"pg_logical/snapshots/%X-%X.snap",
2149+
LSN_FORMAT_ARGS(lsn));
2150+
2151+
ret=stat(path,&stat_buf);
2152+
2153+
if (ret!=0&&errno!=ENOENT)
2154+
ereport(ERROR,
2155+
(errcode_for_file_access(),
2156+
errmsg("could not stat file \"%s\": %m",path)));
2157+
2158+
returnret==0;
2159+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp