Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1f63b36

Browse files
author
Amit Kapila
committed
Fix xmin advancement during fast_forward decoding.
During logical decoding, we advance catalog_xmin of logical too early infast_forward mode, resulting in required catalog data being removed byvacuum. This mode is normally used to advance the slot without processingthe changes, but we still can't let the slot's xmin to advance to anincorrect value.Commitf49a80c fixed a similar issue where the logical slot'scatalog_xmin was getting advanced prematurely during non-fast-forwardmode. During xl_running_xacts processing, instead of directly advancingthe slot's xmin to the oldest running xid in the record, it allowed thexmin to be held back for snapshots that can be used fornot-yet-replayed transactions, as those might consider older txns asrunning too. However, it missed the fact that the same problem can happenduring fast_forward mode decoding, as we won't build a base snapshot inthat mode, and the future call to get_changes from the same slot can missseeing the required catalog changes leading to incorrect reslts.This commit allows building the base snapshot even in fast_forward mode toprevent the early advancement of xmin.Reported-by: Amit Kapila <amit.kapila16@gmail.com>Author: Zhijie Hou <houzj.fnst@fujitsu.com>Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>Reviewed-by: shveta malik <shveta.malik@gmail.com>Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>Backpatch-through: 13Discussion:https://postgr.es/m/CAA4eK1LqWncUOqKijiafe+Ypt1gQAQRjctKLMY953J79xDBgAg@mail.gmail.comDiscussion:https://postgr.es/m/OS0PR01MB57163087F86621D44D9A72BF94BB2@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parentbb1bc9f commit1f63b36

File tree

3 files changed

+71
-13
lines changed

3 files changed

+71
-13
lines changed

‎contrib/test_decoding/expected/oldest_xmin.out

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,44 @@ COMMIT
3838
stop
3939
(1 row)
4040

41+
42+
starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit s0_checkpoint s0_advance_slot s0_advance_slot s1_commit s0_vacuum s0_get_changes
43+
step s0_begin: BEGIN;
44+
step s0_getxid: SELECT pg_current_xact_id() IS NULL;
45+
?column?
46+
--------
47+
f
48+
(1 row)
49+
50+
step s1_begin: BEGIN;
51+
step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3));
52+
step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos;
53+
step s0_commit: COMMIT;
54+
step s0_checkpoint: CHECKPOINT;
55+
step s0_advance_slot: SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn());
56+
slot_name
57+
--------------
58+
isolation_slot
59+
(1 row)
60+
61+
step s0_advance_slot: SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn());
62+
slot_name
63+
--------------
64+
isolation_slot
65+
(1 row)
66+
67+
step s1_commit: COMMIT;
68+
step s0_vacuum: VACUUM pg_attribute;
69+
step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
70+
data
71+
------------------------------------------------------
72+
BEGIN
73+
table public.harvest: INSERT: fruits[basket]:'(1,2,3)'
74+
COMMIT
75+
(3 rows)
76+
77+
?column?
78+
--------
79+
stop
80+
(1 row)
81+

‎contrib/test_decoding/specs/oldest_xmin.spec

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ step "s0_commit" { COMMIT; }
2525
step"s0_checkpoint" {CHECKPOINT; }
2626
step"s0_vacuum" {VACUUMpg_attribute; }
2727
step"s0_get_changes" {SELECTdataFROMpg_logical_slot_get_changes('isolation_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1'); }
28+
step"s0_advance_slot" {SELECTslot_nameFROMpg_replication_slot_advance('isolation_slot',pg_current_wal_lsn()); }
2829

2930
session"s1"
3031
setup {SETsynchronous_commit=on; }
@@ -40,3 +41,7 @@ step "s1_commit" { COMMIT; }
4041
# will be removed (xmax set) before T1 commits. That is, interlocking doesn't
4142
# forbid modifying catalog after someone read it (and didn't commit yet).
4243
permutation"s0_begin""s0_getxid""s1_begin""s1_insert""s0_alter""s0_commit""s0_checkpoint""s0_get_changes""s0_get_changes""s1_commit""s0_vacuum""s0_get_changes"
44+
45+
# Perform the same testing process as described above, but use advance_slot to
46+
# forces xmin advancement during fast forward decoding.
47+
permutation"s0_begin""s0_getxid""s1_begin""s1_insert""s0_alter""s0_commit""s0_checkpoint""s0_advance_slot""s0_advance_slot""s1_commit""s0_vacuum""s0_get_changes"

‎src/backend/replication/logical/decode.c

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -449,20 +449,24 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
449449

450450
/*
451451
* If we don't have snapshot or we are just fast-forwarding, there is no
452-
* point in decoding changes.
452+
* point in decoding data changes. However, it's crucial to build the base
453+
* snapshot during fast-forward mode (as is done in
454+
* SnapBuildProcessChange()) because we require the snapshot's xmin when
455+
* determining the candidate catalog_xmin for the replication slot. See
456+
* SnapBuildProcessRunningXacts().
453457
*/
454-
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT||
455-
ctx->fast_forward)
458+
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT)
456459
return;
457460

458461
switch (info)
459462
{
460463
caseXLOG_HEAP2_MULTI_INSERT:
461-
if (!ctx->fast_forward&&
462-
SnapBuildProcessChange(builder,xid,buf->origptr))
464+
if (SnapBuildProcessChange(builder,xid,buf->origptr)&&
465+
!ctx->fast_forward)
463466
DecodeMultiInsert(ctx,buf);
464467
break;
465468
caseXLOG_HEAP2_NEW_CID:
469+
if (!ctx->fast_forward)
466470
{
467471
xl_heap_new_cid*xlrec;
468472

@@ -509,16 +513,20 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
509513

510514
/*
511515
* If we don't have snapshot or we are just fast-forwarding, there is no
512-
* point in decoding data changes.
516+
* point in decoding data changes. However, it's crucial to build the base
517+
* snapshot during fast-forward mode (as is done in
518+
* SnapBuildProcessChange()) because we require the snapshot's xmin when
519+
* determining the candidate catalog_xmin for the replication slot. See
520+
* SnapBuildProcessRunningXacts().
513521
*/
514-
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT||
515-
ctx->fast_forward)
522+
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT)
516523
return;
517524

518525
switch (info)
519526
{
520527
caseXLOG_HEAP_INSERT:
521-
if (SnapBuildProcessChange(builder,xid,buf->origptr))
528+
if (SnapBuildProcessChange(builder,xid,buf->origptr)&&
529+
!ctx->fast_forward)
522530
DecodeInsert(ctx,buf);
523531
break;
524532

@@ -529,17 +537,20 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
529537
*/
530538
caseXLOG_HEAP_HOT_UPDATE:
531539
caseXLOG_HEAP_UPDATE:
532-
if (SnapBuildProcessChange(builder,xid,buf->origptr))
540+
if (SnapBuildProcessChange(builder,xid,buf->origptr)&&
541+
!ctx->fast_forward)
533542
DecodeUpdate(ctx,buf);
534543
break;
535544

536545
caseXLOG_HEAP_DELETE:
537-
if (SnapBuildProcessChange(builder,xid,buf->origptr))
546+
if (SnapBuildProcessChange(builder,xid,buf->origptr)&&
547+
!ctx->fast_forward)
538548
DecodeDelete(ctx,buf);
539549
break;
540550

541551
caseXLOG_HEAP_TRUNCATE:
542-
if (SnapBuildProcessChange(builder,xid,buf->origptr))
552+
if (SnapBuildProcessChange(builder,xid,buf->origptr)&&
553+
!ctx->fast_forward)
543554
DecodeTruncate(ctx,buf);
544555
break;
545556

@@ -567,7 +578,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
567578
break;
568579

569580
caseXLOG_HEAP_CONFIRM:
570-
if (SnapBuildProcessChange(builder,xid,buf->origptr))
581+
if (SnapBuildProcessChange(builder,xid,buf->origptr)&&
582+
!ctx->fast_forward)
571583
DecodeSpecConfirm(ctx,buf);
572584
break;
573585

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp