Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitec5896a

Browse files
committed
Fix several weaknesses in slot and logical replication on-disk serialization.
Heikki noticed in 544E23C0.8090605@vmware.com that slot.c andsnapbuild.c were missing the FIN_CRC32 call when computing/checkingchecksums of on disk files. That doesn't lower the the error detectioncapabilities of the checksum, but is inconsistent with other usages.In a followup mail Heikki also noticed that, contrary to a comment,the 'version' and 'length' struct fields of replication slot's on diskdata where not covered by the checksum. That's not likely to lead toactually missed corruption as those fields are cross checked with theexpected version and the actual file length. But it's wrongnonetheless.As fixing these issues makes existing on disk files unreadable, bumpthe expected versions of on disk files for both slots and logicaldecoding historic catalog snapshots. This means that loading oldfiles will fail withERROR: "replication slot file ... has unsupported version 1"andERROR: "snapbuild state file ... has unsupported version 1 instead of2" respectively. Given the low likelihood of anybody already usingthese new features in a production setup that seems acceptable.Fixing these issues made me notice that there's no regression testcovering the loading of historic snapshot from disk - so add one.Backpatch to 9.4 where these features were introduced.
1 parentbd4ae0f commitec5896a

File tree

5 files changed

+116
-13
lines changed

5 files changed

+116
-13
lines changed

‎contrib/test_decoding/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ regresscheck-install-force: | submake-regress submake-test_decoding
5353
--extra-install=contrib/test_decoding\
5454
$(REGRESSCHECKS)
5555

56-
ISOLATIONCHECKS=mxact delayed_startup concurrent_ddl_dml
56+
ISOLATIONCHECKS=mxact delayed_startupondisk_startupconcurrent_ddl_dml
5757

5858
isolationcheck: all | submake-isolation submake-test_decoding
5959
$(MKDIR_P) isolation_output
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
Parsed test spec with 3 sessions
2+
3+
starting permutation: s2txid s1init s3txid s2alter s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start
4+
step s2txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
5+
?column?
6+
7+
f
8+
step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...>
9+
step s3txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
10+
?column?
11+
12+
f
13+
step s2alter: ALTER TABLE do_write ADD COLUMN addedbys2 int;
14+
step s2c: COMMIT;
15+
step s1init: <... completed>
16+
?column?
17+
18+
init
19+
step s1insert: INSERT INTO do_write DEFAULT VALUES;
20+
step s1checkpoint: CHECKPOINT;
21+
step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');
22+
data
23+
24+
BEGIN
25+
table public.do_write: INSERT: id[integer]:1 addedbys2[integer]:null
26+
COMMIT
27+
step s1insert: INSERT INTO do_write DEFAULT VALUES;
28+
step s1alter: ALTER TABLE do_write ADD COLUMN addedbys1 int;
29+
step s1insert: INSERT INTO do_write DEFAULT VALUES;
30+
step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');
31+
data
32+
33+
BEGIN
34+
table public.do_write: INSERT: id[integer]:2 addedbys2[integer]:null
35+
COMMIT
36+
BEGIN
37+
COMMIT
38+
BEGIN
39+
table public.do_write: INSERT: id[integer]:3 addedbys2[integer]:null addedbys1[integer]:null
40+
COMMIT
41+
?column?
42+
43+
stop
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Force usage of ondisk decoding snapshots to test that code path.
2+
setup
3+
{
4+
DROPTABLEIFEXISTSdo_write;
5+
CREATETABLEdo_write(idserialprimarykey);
6+
}
7+
8+
teardown
9+
{
10+
DROPTABLEdo_write;
11+
SELECT'stop'FROMpg_drop_replication_slot('isolation_slot');
12+
}
13+
14+
15+
session"s1"
16+
setup {SETsynchronous_commit=on; }
17+
18+
step"s1init" {SELECT'init'FROMpg_create_logical_replication_slot('isolation_slot','test_decoding');}
19+
step"s1start" {SELECTdataFROMpg_logical_slot_get_changes('isolation_slot',NULL,NULL,'include-xids','false');}
20+
step"s1insert" {INSERTINTOdo_writeDEFAULTVALUES; }
21+
step"s1checkpoint" {CHECKPOINT; }
22+
step"s1alter" {ALTERTABLEdo_writeADDCOLUMNaddedbys1int; }
23+
24+
session"s2"
25+
setup {SETsynchronous_commit=on; }
26+
27+
step"s2txid" {BEGINISOLATIONLEVELREPEATABLEREAD;SELECTtxid_current()ISNULL; }
28+
step"s2alter" {ALTERTABLEdo_writeADDCOLUMNaddedbys2int; }
29+
step"s2c" {COMMIT; }
30+
31+
32+
session"s3"
33+
setup {SETsynchronous_commit=on; }
34+
35+
step"s3txid" {BEGINISOLATIONLEVELREPEATABLEREAD;SELECTtxid_current()ISNULL; }
36+
step"s3c" {COMMIT; }
37+
38+
# Force usage of ondisk snapshot by starting and not finishing a
39+
# transaction with a assigned xid after consistency has been
40+
# reached. In combination with a checkpoint forcing a snapshot to be
41+
# written and a new restart point computed that'll lead to the usage
42+
# of the snapshot.
43+
permutation"s2txid""s1init""s3txid""s2alter""s2c""s1insert""s1checkpoint""s1start""s1insert""s1alter""s1insert""s1start"

‎src/backend/replication/logical/snapbuild.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1406,7 +1406,7 @@ typedef struct SnapBuildOnDisk
14061406
offsetof(SnapBuildOnDisk, version)
14071407

14081408
#defineSNAPBUILD_MAGIC 0x51A1E001
1409-
#defineSNAPBUILD_VERSION1
1409+
#defineSNAPBUILD_VERSION2
14101410

14111411
/*
14121412
* Store/Load a snapshot from disk, depending on the snapshot builder's state.
@@ -1552,6 +1552,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
15521552
COMP_CRC32C(ondisk->checksum,ondisk_c,sz);
15531553
ondisk_c+=sz;
15541554

1555+
FIN_CRC32C(ondisk->checksum);
1556+
15551557
/* we have valid data now, open tempfile and write it there */
15561558
fd=OpenTransientFile(tmppath,
15571559
O_CREAT |O_EXCL |O_WRONLY |PG_BINARY,
@@ -1724,6 +1726,8 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
17241726

17251727
CloseTransientFile(fd);
17261728

1729+
FIN_CRC32C(checksum);
1730+
17271731
/* verify checksum of what we've read */
17281732
if (!EQ_CRC32C(checksum,ondisk.checksum))
17291733
ereport(ERROR,

‎src/backend/replication/slot.c

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,29 @@ typedef struct ReplicationSlotOnDisk
6161
uint32version;
6262
uint32length;
6363

64+
/*
65+
* The actual data in the slot that follows can differ based on the above
66+
* 'version'.
67+
*/
68+
6469
ReplicationSlotPersistentDataslotdata;
6570
}ReplicationSlotOnDisk;
6671

67-
/* size ofthe part of the slot that isversion independent */
72+
/* size of version independent data */
6873
#defineReplicationSlotOnDiskConstantSize \
6974
offsetof(ReplicationSlotOnDisk, slotdata)
70-
/* size of the slots that is not version indepenent */
71-
#defineReplicationSlotOnDiskDynamicSize \
75+
/* size of the part of the slot not covered by the checksum */
76+
#defineSnapBuildOnDiskNotChecksummedSize \
77+
offsetof(ReplicationSlotOnDisk, version)
78+
/* size of the part covered by the checksum */
79+
#defineSnapBuildOnDiskChecksummedSize \
80+
sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
81+
/* size of the slot data that is version dependant */
82+
#defineReplicationSlotOnDiskV2Size \
7283
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
7384

7485
#defineSLOT_MAGIC0x1051CA1/* format identifier */
75-
#defineSLOT_VERSION1/* version for new files */
86+
#defineSLOT_VERSION2/* version for new files */
7687

7788
/* Control array for replication slot management */
7889
ReplicationSlotCtlData*ReplicationSlotCtl=NULL;
@@ -992,8 +1003,8 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
9921003

9931004
cp.magic=SLOT_MAGIC;
9941005
INIT_CRC32C(cp.checksum);
995-
cp.version=1;
996-
cp.length=ReplicationSlotOnDiskDynamicSize;
1006+
cp.version=SLOT_VERSION;
1007+
cp.length=ReplicationSlotOnDiskV2Size;
9971008

9981009
SpinLockAcquire(&slot->mutex);
9991010

@@ -1002,8 +1013,9 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
10021013
SpinLockRelease(&slot->mutex);
10031014

10041015
COMP_CRC32C(cp.checksum,
1005-
(char*) (&cp)+ReplicationSlotOnDiskConstantSize,
1006-
ReplicationSlotOnDiskDynamicSize);
1016+
(char*) (&cp)+SnapBuildOnDiskNotChecksummedSize,
1017+
SnapBuildOnDiskChecksummedSize);
1018+
FIN_CRC32C(cp.checksum);
10071019

10081020
if ((write(fd,&cp,sizeof(cp)))!=sizeof(cp))
10091021
{
@@ -1155,7 +1167,7 @@ RestoreSlotFromDisk(const char *name)
11551167
path,cp.version)));
11561168

11571169
/* boundary check on length */
1158-
if (cp.length!=ReplicationSlotOnDiskDynamicSize)
1170+
if (cp.length!=ReplicationSlotOnDiskV2Size)
11591171
ereport(PANIC,
11601172
(errcode_for_file_access(),
11611173
errmsg("replication slot file \"%s\" has corrupted length %u",
@@ -1182,8 +1194,9 @@ RestoreSlotFromDisk(const char *name)
11821194
/* now verify the CRC */
11831195
INIT_CRC32C(checksum);
11841196
COMP_CRC32C(checksum,
1185-
(char*)&cp+ReplicationSlotOnDiskConstantSize,
1186-
ReplicationSlotOnDiskDynamicSize);
1197+
(char*)&cp+SnapBuildOnDiskNotChecksummedSize,
1198+
SnapBuildOnDiskChecksummedSize);
1199+
FIN_CRC32C(checksum);
11871200

11881201
if (!EQ_CRC32C(checksum,cp.checksum))
11891202
ereport(PANIC,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp