Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1573995

Browse files
alvherreArseny Shermichaelpq
committed
Rewrite comments in replication slot advance implementation
The code added by9c7d06d was a bit obscure; clarify that byrewriting the comments. Lack of clarity has already caused bugs, soit's a worthy goal.Co-authored-by: Arseny Sher <a.sher@postgrespro.ru>Co-authored-by: Michaël Paquier <michael@paquier.xyz>Co-authored-by: Álvaro Herrera <alvherre@alvh.no-ip.org>Reviewed-by: Petr Jelínek <petr.jelinek@2ndquadrant.com>Discussion:https://postgr.es/m/87y3fgoyrn.fsf@ars-thinkpad
1 parent309765f commit1573995

File tree

2 files changed

+50
-27
lines changed

2 files changed

+50
-27
lines changed

‎src/backend/replication/logical/logical.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,10 @@ CreateInitDecodingContext(char *plugin,
338338
*that, see below).
339339
*
340340
* output_plugin_options
341-
*contains options passed to the output plugin.
341+
*options passed to the output plugin.
342+
*
343+
* fast_forward
344+
*bypass the generation of logical changes.
342345
*
343346
* read_page, prepare_write, do_write, update_progress
344347
*callbacks that have to be filled to perform the use-case dependent,

‎src/backend/replication/slotfuncs.c

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
317317
}
318318

319319
/*
320-
* Helper function for advancing physical replication slot forward.
321-
* The LSN position to move to is compared simply to the slot's
322-
* restart_lsn, knowing that any position older than that would be
323-
* removed by successive checkpoints.
320+
* Helper function for advancing our physical replication slot forward.
321+
*
322+
* The LSN position to move to is compared simply to the slot's restart_lsn,
323+
* knowing that any position older than that would be removed by successive
324+
* checkpoints.
324325
*/
325326
staticXLogRecPtr
326327
pg_physical_replication_slot_advance(XLogRecPtrmoveto)
@@ -340,59 +341,78 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
340341
}
341342

342343
/*
343-
* Helper function for advancing logical replication slot forward.
344+
* Helper function for advancing our logical replication slot forward.
345+
*
344346
* The slot's restart_lsn is used as start point for reading records,
345347
* while confirmed_lsn is used as base point for the decoding context.
346-
* The LSN position to move to is checked by doing a per-record scan and
347-
* logical decoding which makes sure that confirmed_lsn is updated to a
348-
* LSN which allows the future slot consumer to get consistent logical
349-
* changes.
348+
*
349+
* We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
350+
* because we need to digest WAL to advance restart_lsn allowing to recycle
351+
* WAL and removal of old catalog tuples. As decoding is done in fast_forward
352+
* mode, no changes are generated anyway.
350353
*/
351354
staticXLogRecPtr
352355
pg_logical_replication_slot_advance(XLogRecPtrmoveto)
353356
{
354357
LogicalDecodingContext*ctx;
355358
ResourceOwnerold_resowner=CurrentResourceOwner;
356-
XLogRecPtrstartlsn=MyReplicationSlot->data.restart_lsn;
357-
XLogRecPtrretlsn=MyReplicationSlot->data.confirmed_flush;
359+
XLogRecPtrstartlsn;
360+
XLogRecPtrretlsn;
358361

359362
PG_TRY();
360363
{
361-
/* restart at slot's confirmed_flush */
364+
/*
365+
* Create our decoding context in fast_forward mode, passing start_lsn
366+
* as InvalidXLogRecPtr, so that we start processing from my slot's
367+
* confirmed_flush.
368+
*/
362369
ctx=CreateDecodingContext(InvalidXLogRecPtr,
363370
NIL,
364-
true,
371+
true,/* fast_forward */
365372
logical_read_local_xlog_page,
366373
NULL,NULL,NULL);
367374

375+
/*
376+
* Start reading at the slot's restart_lsn, which we know to point to
377+
* a valid record.
378+
*/
379+
startlsn=MyReplicationSlot->data.restart_lsn;
380+
381+
/* Initialize our return value in case we don't do anything */
382+
retlsn=MyReplicationSlot->data.confirmed_flush;
383+
368384
/* invalidate non-timetravel entries */
369385
InvalidateSystemCaches();
370386

371-
/* Decode until we run out of records */
372-
while ((startlsn!=InvalidXLogRecPtr&&startlsn<moveto)||
373-
(ctx->reader->EndRecPtr!=InvalidXLogRecPtr&&ctx->reader->EndRecPtr<moveto))
387+
/* Decode at least one record, until we run out of records */
388+
while ((!XLogRecPtrIsInvalid(startlsn)&&
389+
startlsn<moveto)||
390+
(!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr)&&
391+
ctx->reader->EndRecPtr<moveto))
374392
{
375-
XLogRecord*record;
376393
char*errm=NULL;
394+
XLogRecord*record;
377395

396+
/*
397+
* Read records. No changes are generated in fast_forward mode,
398+
* but snapbuilder/slot statuses are updated properly.
399+
*/
378400
record=XLogReadRecord(ctx->reader,startlsn,&errm);
379401
if (errm)
380402
elog(ERROR,"%s",errm);
381403

382-
/*
383-
* Now that we've set up the xlog reader state, subsequent calls
384-
* pass InvalidXLogRecPtr to say "continue from last record"
385-
*/
404+
/* Read sequentially from now on */
386405
startlsn=InvalidXLogRecPtr;
387406

388407
/*
389-
* The {begin_txn,change,commit_txn}_wrapper callbacks above will
390-
* store the description into our tuplestore.
408+
* Process the record. Storage-level changes are ignored in
409+
* fast_forward mode, but other modules (such as snapbuilder)
410+
* might still have critical updates to do.
391411
*/
392-
if (record!=NULL)
412+
if (record)
393413
LogicalDecodingProcessRecord(ctx,ctx->reader);
394414

395-
/* Stop once themoving point wanted by caller has been reached */
415+
/* Stop once therequested target has been reached */
396416
if (moveto <=ctx->reader->EndRecPtr)
397417
break;
398418

@@ -411,7 +431,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
411431
LogicalConfirmReceivedLocation(moveto);
412432

413433
/*
414-
* If only theconfirmed_flush_lsn has changed the slot won't get
434+
* If only theconfirmed_flush LSN has changed the slot won't get
415435
* marked as dirty by the above. Callers on the walsender
416436
* interface are expected to keep track of their own progress and
417437
* don't need it written out. But SQL-interface users cannot

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp