Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf731cfa

Browse files
committed
Fix a couple of bugs with replication slot advancing feature
A review of the code has showed up a couple of issues fixed by thiscommit:- Physical slots have been using the confirmed LSN position as a startcomparison point which is always 0/0, instead use the restart LSNposition (logical slots need to use the confirmed LSN position, whichwas correct).- The actual slot update was incorrect for both physical and logicalslots. Physical slots need to use their restart_lsn as base comparisonpoint (confirmed_flush was used because of previous point), and logicalslots need to begin reading WAL from restart_lsn (confirmed_flush wasused as well), while confirmed_flush is compiled depending on thedecoding context and record read, and is the LSN position returned backto the caller.- Never return 0/0 if a slot cannot be advanced. This way, if a slot isadvanced while the activity is idle, then the same position is returnedto the caller over and over without raising an error. Instead returnthe LSN the slot has been advanced to. With repetitive calls, the sameposition is returned hence caller can directly monitor the difference inprogress in bytes by doing simply LSN difference calculations, whichshould be monotonic.Note that as the slot is owned by the backend advancing it, then theread of those fields is fine lock-less, while updates need to happenwhile the slot mutex is held, so fix that on the way as well. Otherlocks for in-memory data of replication slots have been already fixedpreviously.Some of those issues have been pointed out by Petr and Simon during thepatch, while I noticed some of them after looking at the code. Thisalso visibly takes of a recently-discovered bug causing assertionfailures which can be triggered by a two-step slot forwarding whichfirst advanced the slot to a WAL page boundary and secondly advanced itto the latest position, say 'FF/FFFFFFF' to make sure that the newestLSN is used as forward point. It would have been nice to drop a testfor that, but the set of operators working on pg_lsn limits it, so thisis left for a future exercise.Author: Michael PaquierReviewed-by: Petr Jelinek, Simon RiggsDiscussion:https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.comDiscussion:https://www.postgresql.org/message-id/2840048a-1184-417a-9da8-3299d207a1d7%40postgrespro.ru
1 parent321f648 commitf731cfa

File tree

1 file changed

+36
-14
lines changed

1 file changed

+36
-14
lines changed

‎src/backend/replication/slotfuncs.c

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -318,32 +318,43 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
318318

319319
/*
320320
* Helper function for advancing physical replication slot forward.
321+
* The LSN position to move to is compared simply to the slot's
322+
* restart_lsn, knowing that any position older than that would be
323+
* removed by successive checkpoints.
321324
*/
322325
staticXLogRecPtr
323-
pg_physical_replication_slot_advance(XLogRecPtrstartlsn,XLogRecPtrmoveto)
326+
pg_physical_replication_slot_advance(XLogRecPtrmoveto)
324327
{
325-
XLogRecPtrretlsn=InvalidXLogRecPtr;
328+
XLogRecPtrstartlsn=MyReplicationSlot->data.restart_lsn;
329+
XLogRecPtrretlsn=startlsn;
326330

327-
SpinLockAcquire(&MyReplicationSlot->mutex);
328-
if (MyReplicationSlot->data.restart_lsn<moveto)
331+
if (startlsn<moveto)
329332
{
333+
SpinLockAcquire(&MyReplicationSlot->mutex);
330334
MyReplicationSlot->data.restart_lsn=moveto;
335+
SpinLockRelease(&MyReplicationSlot->mutex);
331336
retlsn=moveto;
332337
}
333-
SpinLockRelease(&MyReplicationSlot->mutex);
334338

335339
returnretlsn;
336340
}
337341

338342
/*
339343
* Helper function for advancing logical replication slot forward.
344+
* The slot's restart_lsn is used as start point for reading records,
345+
* while confirmed_lsn is used as base point for the decoding context.
346+
* The LSN position to move to is checked by doing a per-record scan and
347+
* logical decoding which makes sure that confirmed_lsn is updated to a
348+
* LSN which allows the future slot consumer to get consistent logical
349+
* changes.
340350
*/
341351
staticXLogRecPtr
342-
pg_logical_replication_slot_advance(XLogRecPtrstartlsn,XLogRecPtrmoveto)
352+
pg_logical_replication_slot_advance(XLogRecPtrmoveto)
343353
{
344354
LogicalDecodingContext*ctx;
345355
ResourceOwnerold_resowner=CurrentResourceOwner;
346-
XLogRecPtrretlsn=InvalidXLogRecPtr;
356+
XLogRecPtrstartlsn=MyReplicationSlot->data.restart_lsn;
357+
XLogRecPtrretlsn=MyReplicationSlot->data.confirmed_flush;
347358

348359
PG_TRY();
349360
{
@@ -384,7 +395,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
384395
if (record!=NULL)
385396
LogicalDecodingProcessRecord(ctx,ctx->reader);
386397

387-
/*check limits */
398+
/*Stop once the moving point wanted by caller has been reached */
388399
if (moveto <=ctx->reader->EndRecPtr)
389400
break;
390401

@@ -441,7 +452,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
441452
Nameslotname=PG_GETARG_NAME(0);
442453
XLogRecPtrmoveto=PG_GETARG_LSN(1);
443454
XLogRecPtrendlsn;
444-
XLogRecPtrstartlsn;
455+
XLogRecPtrminlsn;
445456
TupleDesctupdesc;
446457
Datumvalues[2];
447458
boolnulls[2];
@@ -472,21 +483,32 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
472483
/* Acquire the slot so we "own" it */
473484
ReplicationSlotAcquire(NameStr(*slotname), true);
474485

475-
startlsn=MyReplicationSlot->data.confirmed_flush;
476-
if (moveto<startlsn)
486+
/*
487+
* Check if the slot is not moving backwards. Physical slots rely simply
488+
* on restart_lsn as a minimum point, while logical slots have confirmed
489+
* consumption up to confirmed_lsn, meaning that in both cases data older
490+
* than that is not available anymore.
491+
*/
492+
if (OidIsValid(MyReplicationSlot->data.database))
493+
minlsn=MyReplicationSlot->data.confirmed_flush;
494+
else
495+
minlsn=MyReplicationSlot->data.restart_lsn;
496+
497+
if (moveto<minlsn)
477498
{
478499
ReplicationSlotRelease();
479500
ereport(ERROR,
480501
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
481502
errmsg("cannot move slot to %X/%X, minimum is %X/%X",
482503
(uint32) (moveto >>32), (uint32)moveto,
483-
(uint32) (startlsn >>32), (uint32)startlsn)));
504+
(uint32) (minlsn >>32), (uint32)minlsn)));
484505
}
485506

507+
/* Do the actual slot update, depending on the slot type */
486508
if (OidIsValid(MyReplicationSlot->data.database))
487-
endlsn=pg_logical_replication_slot_advance(startlsn,moveto);
509+
endlsn=pg_logical_replication_slot_advance(moveto);
488510
else
489-
endlsn=pg_physical_replication_slot_advance(startlsn,moveto);
511+
endlsn=pg_physical_replication_slot_advance(moveto);
490512

491513
values[0]=NameGetDatum(&MyReplicationSlot->data.name);
492514
nulls[0]= false;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp