Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4909b38

Browse files
author
Amit Kapila
committed
Fix data loss in logical replication.
Data loss can happen when the DDLs like ALTER PUBLICATION ... ADD TABLE ...or ALTER TYPE ... that don't take a strong lock on table happensconcurrently to DMLs on the tables involved in the DDL. This happensbecause logical decoding doesn't distribute invalidations to concurrenttransactions and those transactions use stale cache data to decode thechanges. The problem becomes bigger because we keep using the stale cacheeven after those in-progress transactions are finished and skip thechanges required to be sent to the client.This commit fixes the issue by distributing invalidation messages fromcatalog-modifying transactions to all concurrent in-progress transactions.This allows the necessary rebuild of the catalog cache when decoding newchanges after concurrent DDL.We observed performance regression primarily during frequent execution of*publication DDL* statements that modify the published tables. Theregression is minor or nearly nonexistent for DDLs that do not affect thepublished tables or occur infrequently, making this a worthwhile cost toresolve a longstanding data loss issue.An alternative approach considered was to take a strong lock on eachaffected table during publication modification. However, this would onlyaddress issues related to publication DDLs (but not the ALTER TYPE ...)and require locking every relation in the database for publicationscreated as FOR ALL TABLES, which is impractical.The bug exists in all supported branches, but we are backpatching till 14.The fix for 13 requires somewhat bigger changes than this fix, so the fixfor that branch is still under discussion.Reported-by: hubert depesz lubaczewski <depesz@depesz.com>Reported-by: Tomas Vondra <tomas.vondra@enterprisedb.com>Author: Shlok Kyal <shlok.kyal.oss@gmail.com>Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>Reviewed-by: Zhijie Hou <houzj.fnst@fujitsu.com>Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>Tested-by: Benoit Lobréau <benoit.lobreau@dalibo.com>Backpatch-through: 14Discussion:https://postgr.es/m/de52b282-1166-1180-45a2-8d8917ca74c6@enterprisedb.comDiscussion:https://postgr.es/m/CAD21AoAenVqiMjpN-PvGHL1N9DWnHSq673bfgr6phmBUzx=kLQ@mail.gmail.com
1 parent9ad1929 commit4909b38

File tree

7 files changed

+135
-15
lines changed

7 files changed

+135
-15
lines changed

‎contrib/test_decoding/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml\
1010
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream\
1111
twophase_snapshot slot_creation_error catalog_change_snapshot\
12-
skip_snapshot_restore
12+
skip_snapshot_restore invalidation_distrubution
1313

1414
REGRESS_OPTS = --temp-config$(top_srcdir)/contrib/test_decoding/logical.conf
1515
ISOLATION_OPTS = --temp-config$(top_srcdir)/contrib/test_decoding/logical.conf
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
Parsed test spec with 2 sessions
2+
3+
starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
4+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
5+
step s1_begin: BEGIN;
6+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
7+
step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
8+
step s1_commit: COMMIT;
9+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
10+
step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
11+
count
12+
-----
13+
1
14+
(1 row)
15+
16+
?column?
17+
--------
18+
stop
19+
(1 row)
20+

‎contrib/test_decoding/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ tests += {
6363
'twophase_snapshot',
6464
'slot_creation_error',
6565
'skip_snapshot_restore',
66+
'invalidation_distrubution',
6667
],
6768
'regress_args': [
6869
'--temp-config',files('logical.conf'),
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Test that catalog cache invalidation messages are distributed to ongoing
2+
# transactions, ensuring they can access the updated catalog content after
3+
# processing these messages.
4+
setup
5+
{
6+
SELECT'init'FROMpg_create_logical_replication_slot('isolation_slot','pgoutput');
7+
CREATETABLEtbl1(val1integer,val2integer);
8+
CREATEPUBLICATIONpub;
9+
}
10+
11+
teardown
12+
{
13+
DROPTABLEtbl1;
14+
DROPPUBLICATIONpub;
15+
SELECT'stop'FROMpg_drop_replication_slot('isolation_slot');
16+
}
17+
18+
session"s1"
19+
setup {SETsynchronous_commit=on; }
20+
21+
step"s1_begin" {BEGIN; }
22+
step"s1_insert_tbl1" {INSERTINTOtbl1 (val1,val2)VALUES (1,1); }
23+
step"s1_commit" {COMMIT; }
24+
25+
session"s2"
26+
setup {SETsynchronous_commit=on; }
27+
28+
step"s2_alter_pub_add_tbl" {ALTERPUBLICATIONpubADDTABLEtbl1; }
29+
step"s2_get_binary_changes" {SELECTcount(data)FROMpg_logical_slot_get_binary_changes('isolation_slot',NULL,NULL,'proto_version','4','publication_names','pub')WHEREget_byte(data,0)=73; }
30+
31+
# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
32+
permutation"s1_insert_tbl1""s1_begin""s1_insert_tbl1""s2_alter_pub_add_tbl""s1_commit""s1_insert_tbl1""s2_get_binary_changes"

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5460,3 +5460,26 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
54605460
*cmax=ent->cmax;
54615461
return true;
54625462
}
5463+
5464+
/*
5465+
* Count invalidation messages of specified transaction.
5466+
*
5467+
* Returns number of messages, and msgs is set to the pointer of the linked
5468+
* list for the messages.
5469+
*/
5470+
uint32
5471+
ReorderBufferGetInvalidations(ReorderBuffer*rb,TransactionIdxid,
5472+
SharedInvalidationMessage**msgs)
5473+
{
5474+
ReorderBufferTXN*txn;
5475+
5476+
txn=ReorderBufferTXNByXid(rb,xid, false,NULL,InvalidXLogRecPtr,
5477+
false);
5478+
5479+
if (txn==NULL)
5480+
return0;
5481+
5482+
*msgs=txn->invalidations;
5483+
5484+
returntxn->ninvalidations;
5485+
}

‎src/backend/replication/logical/snapbuild.c

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
161161

162162
staticvoidSnapBuildSnapIncRefcount(Snapshotsnap);
163163

164-
staticvoidSnapBuildDistributeNewCatalogSnapshot(SnapBuild*builder,XLogRecPtrlsn);
164+
staticvoidSnapBuildDistributeSnapshotAndInval(SnapBuild*builder,XLogRecPtrlsn,TransactionIdxid);
165165

166166
staticinlineboolSnapBuildXidHasCatalogChanges(SnapBuild*builder,TransactionIdxid,
167167
uint32xinfo);
@@ -720,23 +720,24 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
720720
}
721721

722722
/*
723-
* Add a new Snapshot to all transactions we're decoding that currently are
724-
* in-progress so they can see new catalog contents made by the transaction
725-
* that just committed. This is necessary because those in-progress
726-
* transactions will use the new catalog's contents from here on (at the very
727-
* least everything they do needs to be compatible with newer catalog
728-
* contents).
723+
* Add a new Snapshotand invalidation messagesto all transactions we're
724+
*decoding that currently arein-progress so they can see new catalog contents
725+
*made by the transactionthat just committed. This is necessary because those
726+
*in-progresstransactions will use the new catalog's contents from here on
727+
*(at the veryleast everything they do needs to be compatible with newer
728+
*catalogcontents).
729729
*/
730730
staticvoid
731-
SnapBuildDistributeNewCatalogSnapshot(SnapBuild*builder,XLogRecPtrlsn)
731+
SnapBuildDistributeSnapshotAndInval(SnapBuild*builder,XLogRecPtrlsn,TransactionIdxid)
732732
{
733733
dlist_itertxn_i;
734734
ReorderBufferTXN*txn;
735735

736736
/*
737737
* Iterate through all toplevel transactions. This can include
738738
* subtransactions which we just don't yet know to be that, but that's
739-
* fine, they will just get an unnecessary snapshot queued.
739+
* fine, they will just get an unnecessary snapshot and invalidations
740+
* queued.
740741
*/
741742
dlist_foreach(txn_i,&builder->reorder->toplevel_by_lsn)
742743
{
@@ -749,6 +750,15 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
749750
* transaction which in turn implies we don't yet need a snapshot at
750751
* all. We'll add a snapshot when the first change gets queued.
751752
*
753+
* Similarly, we don't need to add invalidations to a transaction
754+
* whose base snapshot is not yet set. Once a base snapshot is built,
755+
* it will include the xids of committed transactions that have
756+
* modified the catalog, thus reflecting the new catalog contents. The
757+
* existing catalog cache will have already been invalidated after
758+
* processing the invalidations in the transaction that modified
759+
* catalogs, ensuring that a fresh cache is constructed during
760+
* decoding.
761+
*
752762
* NB: This works correctly even for subtransactions because
753763
* ReorderBufferAssignChild() takes care to transfer the base snapshot
754764
* to the top-level transaction, and while iterating the changequeue
@@ -758,13 +768,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
758768
continue;
759769

760770
/*
761-
* We don't need to add snapshotto prepared transactions as they
762-
* should not see the new catalog contents.
771+
* We don't need to add snapshotor invalidations to prepared
772+
*transactions as theyshould not see the new catalog contents.
763773
*/
764774
if (rbtxn_is_prepared(txn))
765775
continue;
766776

767-
elog(DEBUG2,"adding a new snapshot to %u at %X/%X",
777+
elog(DEBUG2,"adding a new snapshotand invalidationsto %u at %X/%X",
768778
txn->xid,LSN_FORMAT_ARGS(lsn));
769779

770780
/*
@@ -774,6 +784,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
774784
SnapBuildSnapIncRefcount(builder->snapshot);
775785
ReorderBufferAddSnapshot(builder->reorder,txn->xid,lsn,
776786
builder->snapshot);
787+
788+
/*
789+
* Add invalidation messages to the reorder buffer of in-progress
790+
* transactions except the current committed transaction, for which we
791+
* will execute invalidations at the end.
792+
*
793+
* It is required, otherwise, we will end up using the stale catcache
794+
* contents built by the current transaction even after its decoding,
795+
* which should have been invalidated due to concurrent catalog
796+
* changing transaction.
797+
*/
798+
if (txn->xid!=xid)
799+
{
800+
uint32ninvalidations;
801+
SharedInvalidationMessage*msgs=NULL;
802+
803+
ninvalidations=ReorderBufferGetInvalidations(builder->reorder,
804+
xid,&msgs);
805+
806+
if (ninvalidations>0)
807+
{
808+
Assert(msgs!=NULL);
809+
810+
ReorderBufferAddInvalidations(builder->reorder,txn->xid,lsn,
811+
ninvalidations,msgs);
812+
}
813+
}
777814
}
778815
}
779816

@@ -1045,8 +1082,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
10451082
/* refcount of the snapshot builder for the new snapshot */
10461083
SnapBuildSnapIncRefcount(builder->snapshot);
10471084

1048-
/* add a new catalog snapshot to all currently running transactions */
1049-
SnapBuildDistributeNewCatalogSnapshot(builder,lsn);
1085+
/*
1086+
* Add a new catalog snapshot and invalidations messages to all
1087+
* currently running transactions.
1088+
*/
1089+
SnapBuildDistributeSnapshotAndInval(builder,lsn,xid);
10501090
}
10511091
}
10521092

‎src/include/replication/reorderbuffer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,10 @@ extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb);
758758

759759
externvoidReorderBufferSetRestartPoint(ReorderBuffer*rb,XLogRecPtrptr);
760760

761+
externuint32ReorderBufferGetInvalidations(ReorderBuffer*rb,
762+
TransactionIdxid,
763+
SharedInvalidationMessage**msgs);
764+
761765
externvoidStartupReorderBuffer(void);
762766

763767
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp