Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit068674f

Browse files
Fix possibility of logical decoding partial transaction changes.
When creating and initializing a logical slot, the restart_lsn is setto the latest WAL insertion point (or the latest replay point onstandbys). Subsequently, WAL records are decoded from that point tofind the start point for extracting changes in theDecodingContextFindStartpoint() function. Since the initialrestart_lsn could be in the middle of a transaction, the start pointmust be a consistent point where we won't see the data for partialtransactions.Previously, when not building a full snapshot, serialized snapshotswere restored, and the SnapBuild jumps to the consistent state evenwhile finding the start point. Consequently, the slot's restart_lsnand confirmed_flush could be set to the middle of a transaction. Thiscould lead to various unexpected consequences. Specifically, therewere reports of logical decoding decoding partial transactions, andassertion failures occurred because only subtransactions were decodedwithout decoding their top-level transaction until decoding the commitrecord.To resolve this issue, the changes prevent restoring the serializedsnapshot and jumping to the consistent state while finding the startpoint.On v17 and HEAD, a flag indicating whether snapshot restores should beskipped has been added to the SnapBuild struct, and SNAPBUILD_VERSIONhas been bumpded.On backbranches, the flag is stored in the LogicalDecodingContextinstead, preserving on-disk compatibility.Backpatch to all supported versions.Reported-by: Drew CallahanReviewed-by: Amit Kapila, Hayato KurodaDiscussion:https://postgr.es/m/2444AA15-D21B-4CCE-8052-52C7C2DAFE5C%40amazon.comBackpatch-through: 12
1 parenta9747be commit068674f

File tree

7 files changed

+122
-10
lines changed

7 files changed

+122
-10
lines changed

‎contrib/test_decoding/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
88
spill slot truncate stream stats twophase twophase_stream
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml\
1010
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream\
11-
twophase_snapshot slot_creation_error catalog_change_snapshot
11+
twophase_snapshot slot_creation_error catalog_change_snapshot\
12+
skip_snapshot_restore
1213

1314
REGRESS_OPTS = --temp-config$(top_srcdir)/contrib/test_decoding/logical.conf
1415
ISOLATION_OPTS = --temp-config$(top_srcdir)/contrib/test_decoding/logical.conf
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
Parsed test spec with 3 sessions
2+
3+
starting permutation: s0_init s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1
4+
step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding');
5+
?column?
6+
--------
7+
init
8+
(1 row)
9+
10+
step s0_begin: BEGIN;
11+
step s0_insert1: INSERT INTO tbl VALUES (1);
12+
step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); <waiting ...>
13+
step s2_checkpoint: CHECKPOINT;
14+
step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
15+
data
16+
----
17+
(0 rows)
18+
19+
step s0_insert2: INSERT INTO tbl VALUES (2);
20+
step s0_commit: COMMIT;
21+
step s1_init: <... completed>
22+
?column?
23+
--------
24+
init
25+
(1 row)
26+
27+
step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
28+
data
29+
-----------------------------------------
30+
BEGIN
31+
table public.tbl: INSERT: val1[integer]:1
32+
table public.tbl: INSERT: val1[integer]:2
33+
COMMIT
34+
(4 rows)
35+
36+
step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
37+
data
38+
----
39+
(0 rows)
40+
41+
?column?
42+
--------
43+
stop
44+
(1 row)
45+

‎contrib/test_decoding/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ tests += {
6262
'concurrent_stream',
6363
'twophase_snapshot',
6464
'slot_creation_error',
65+
'skip_snapshot_restore',
6566
],
6667
'regress_args': [
6768
'--temp-config',files('logical.conf'),
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Test that a slot creation skips to restore serialized snapshot to reach
2+
# the consistent state.
3+
4+
setup
5+
{
6+
DROPTABLEIFEXISTStbl;
7+
CREATETABLEtbl (val1integer);
8+
}
9+
10+
teardown
11+
{
12+
DROPTABLEtbl;
13+
SELECT'stop'FROMpg_drop_replication_slot('slot0');
14+
SELECT'stop'FROMpg_drop_replication_slot('slot1');
15+
}
16+
17+
session"s0"
18+
setup {SETsynchronous_commit=on; }
19+
step"s0_init" {SELECT'init'FROMpg_create_logical_replication_slot('slot0','test_decoding'); }
20+
step"s0_begin" {BEGIN; }
21+
step"s0_insert1" {INSERTINTOtblVALUES (1); }
22+
step"s0_insert2" {INSERTINTOtblVALUES (2); }
23+
step"s0_commit" {COMMIT; }
24+
25+
session"s1"
26+
setup {SETsynchronous_commit=on; }
27+
step"s1_init" {SELECT'init'FROMpg_create_logical_replication_slot('slot1','test_decoding'); }
28+
step"s1_get_changes_slot0" {SELECTdataFROMpg_logical_slot_get_changes('slot0',NULL,NULL,'skip-empty-xacts','1','include-xids','0'); }
29+
step"s1_get_changes_slot1" {SELECTdataFROMpg_logical_slot_get_changes('slot1',NULL,NULL,'skip-empty-xacts','1','include-xids','0'); }
30+
31+
session"s2"
32+
setup {SETsynchronous_commit=on ;}
33+
step"s2_checkpoint" {CHECKPOINT; }
34+
step"s2_get_changes_slot0" {SELECTdataFROMpg_logical_slot_get_changes('slot0',NULL,NULL,'skip-empty-xacts','1','include-xids','0'); }
35+
36+
37+
# While 'slot1' creation by "s1_init" waits for s0-transaction to commit, the
38+
# RUNNING_XACTS record is written by "s2_checkpoint" and "s2_get_changes_slot1"
39+
# serializes consistent snapshots to the disk at LSNs where are before
40+
# s0-transaction's commit. After s0-transaction commits, "s1_init" resumes but
41+
# must not restore any serialized snapshots and will reach the consistent state
42+
# when decoding a RUNNING_XACT record generated after s0-transaction's commit.
43+
# We check if the get_changes on 'slot1' will not return any s0-transaction's
44+
# changes as its confirmed_flush_lsn will be after the s0-transaction's commit
45+
# record.
46+
permutation"s0_init""s0_begin""s0_insert1""s1_init""s2_checkpoint""s2_get_changes_slot0""s0_insert2""s0_commit""s1_get_changes_slot0""s1_get_changes_slot1"

‎src/backend/replication/logical/logical.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ StartupDecodingContext(List *output_plugin_options,
152152
TransactionIdxmin_horizon,
153153
boolneed_full_snapshot,
154154
boolfast_forward,
155+
boolin_create,
155156
XLogReaderRoutine*xl_routine,
156157
LogicalOutputPluginWriterPrepareWriteprepare_write,
157158
LogicalOutputPluginWriterWritedo_write,
@@ -212,7 +213,7 @@ StartupDecodingContext(List *output_plugin_options,
212213
ctx->reorder=ReorderBufferAllocate();
213214
ctx->snapshot_builder=
214215
AllocateSnapshotBuilder(ctx->reorder,xmin_horizon,start_lsn,
215-
need_full_snapshot,slot->data.two_phase_at);
216+
need_full_snapshot,in_create,slot->data.two_phase_at);
216217

217218
ctx->reorder->private_data=ctx;
218219

@@ -438,7 +439,7 @@ CreateInitDecodingContext(const char *plugin,
438439
ReplicationSlotSave();
439440

440441
ctx=StartupDecodingContext(NIL,restart_lsn,xmin_horizon,
441-
need_full_snapshot, false,
442+
need_full_snapshot, false, true,
442443
xl_routine,prepare_write,do_write,
443444
update_progress);
444445

@@ -592,7 +593,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
592593

593594
ctx=StartupDecodingContext(output_plugin_options,
594595
start_lsn,InvalidTransactionId, false,
595-
fast_forward,xl_routine,prepare_write,
596+
fast_forward,false,xl_routine,prepare_write,
596597
do_write,update_progress);
597598

598599
/* call output plugin initialization callback */

‎src/backend/replication/logical/snapbuild.c

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,14 @@ struct SnapBuild
189189
/* Indicates if we are building full snapshot or just catalog one. */
190190
boolbuilding_full_snapshot;
191191

192+
/*
193+
* Indicates if we are using the snapshot builder for the creation of a
194+
* logical replication slot. If it's true, the start point for decoding
195+
* changes is not determined yet. So we skip snapshot restores to properly
196+
* find the start point. See SnapBuildFindSnapshot() for details.
197+
*/
198+
boolin_slot_creation;
199+
192200
/*
193201
* Snapshot that's valid to see the catalog state seen at this moment.
194202
*/
@@ -317,6 +325,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
317325
TransactionIdxmin_horizon,
318326
XLogRecPtrstart_lsn,
319327
boolneed_full_snapshot,
328+
boolin_slot_creation,
320329
XLogRecPtrtwo_phase_at)
321330
{
322331
MemoryContextcontext;
@@ -347,6 +356,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
347356

348357
builder->initial_xmin_horizon=xmin_horizon;
349358
builder->start_decoding_at=start_lsn;
359+
builder->in_slot_creation=in_slot_creation;
350360
builder->building_full_snapshot=need_full_snapshot;
351361
builder->two_phase_at=two_phase_at;
352362

@@ -1327,10 +1337,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
13271337
* state while waiting on c)'s sub-states.
13281338
*
13291339
* b) This (in a previous run) or another decoding slot serialized a
1330-
* snapshot to disk that we can use. Can't use this method for the
1331-
* initial snapshot when slot is being created and needs full snapshot
1332-
* for export or direct use, as that snapshot will only contain catalog
1333-
* modifying transactions.
1340+
* snapshot to disk that we can use. Can't use this method while finding
1341+
* the start point for decoding changes as the restart LSN would be an
1342+
* arbitrary LSN but we need to find the start point to extract changes
1343+
* where we won't see the data for partial transactions. Also, we cannot
1344+
* use this method when a slot needs a full snapshot for export or direct
1345+
* use, as that snapshot will only contain catalog modifying transactions.
13341346
*
13351347
* c) First incrementally build a snapshot for catalog tuples
13361348
* (BUILDING_SNAPSHOT), that requires all, already in-progress,
@@ -1395,8 +1407,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
13951407

13961408
return false;
13971409
}
1398-
/* b) valid on disk state and not building full snapshot */
1410+
1411+
/*
1412+
* b) valid on disk state and while neither building full snapshot nor
1413+
* creating a slot.
1414+
*/
13991415
elseif (!builder->building_full_snapshot&&
1416+
!builder->in_slot_creation&&
14001417
SnapBuildRestore(builder,lsn))
14011418
{
14021419
/* there won't be any state to cleanup */
@@ -1580,7 +1597,7 @@ typedef struct SnapBuildOnDisk
15801597
offsetof(SnapBuildOnDisk, version)
15811598

15821599
#defineSNAPBUILD_MAGIC 0x51A1E001
1583-
#defineSNAPBUILD_VERSION5
1600+
#defineSNAPBUILD_VERSION6
15841601

15851602
/*
15861603
* Store/Load a snapshot from disk, depending on the snapshot builder's state.

‎src/include/replication/snapbuild.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ extern void CheckPointSnapBuild(void);
6262
externSnapBuild*AllocateSnapshotBuilder(structReorderBuffer*reorder,
6363
TransactionIdxmin_horizon,XLogRecPtrstart_lsn,
6464
boolneed_full_snapshot,
65+
boolin_slot_creation,
6566
XLogRecPtrtwo_phase_at);
6667
externvoidFreeSnapshotBuilder(SnapBuild*builder);
6768

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp