Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7944607

Browse files
author
Amit Kapila
committed
Fix catalog lookup with the wrong snapshot during logical decoding.
Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATIONrecords to know if the transaction has modified the catalog, and thatinformation is not serialized to snapshot. Therefore, after the restart,if the logical decoding decodes only the commit record of the transactionthat has actually modified a catalog, we will miss adding its XID to thesnapshot. Thus, we will end up looking at catalogs with the wrongsnapshot.To fix this problem, this changes the snapshot builder so that itremembers the last-running-xacts list of the decoded RUNNING_XACTS recordafter restoring the previously serialized snapshot. Then, we mark thetransaction as containing catalog changes if it's in the list of initialrunning transactions and its commit record has XACT_XINFO_HAS_INVALS. Toavoid ABI breakage, we store the array of the initial running transactionsin the static variables InitialRunningXacts and NInitialRunningXacts,instead of storing those in SnapBuild or ReorderBuffer.This approach has a false positive; we could end up adding the transactionthat didn't change catalog to the snapshot since we cannot distinguishwhether the transaction has catalog changes only by checking the COMMITrecord. It doesn't have the information on which (sub) transaction hascatalog changes, and XACT_XINFO_HAS_INVALS doesn't necessarily indicatethat the transaction has catalog change. But that won't be a problem sincewe use snapshot built during decoding only to read system catalogs.On the master branch, we took a more future-proof approach by writingcatalog modifying transactions to the serialized snapshot which avoids theabove false positive. But we cannot backpatch it because of a change inthe SnapBuild.Reported-by: Mike OhAuthor: Masahiko SawadaReviewed-by: Amit Kapila, Shi yu, Takamichi Osumi, Kyotaro Horiguchi, Bertrand Drouvot, Ahsan HadiBackpatch-through: 10Discussion:https://postgr.es/m/81D0D8B0-E7C4-4999-B616-1E5004DBDCD2%40amazon.com
1 parent5b948b5 commit7944607

File tree

6 files changed

+227
-9
lines changed

6 files changed

+227
-9
lines changed

‎contrib/test_decoding/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
77
decoding_into_rel binary prepared replorigin time messages\
88
spill slot truncate
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml\
10-
oldest_xmin snapshot_transfer subxact_without_top
10+
oldest_xmin snapshot_transfer subxact_without_top catalog_change_snapshot
1111

1212
REGRESS_OPTS = --temp-config$(top_srcdir)/contrib/test_decoding/logical.conf
1313
ISOLATION_OPTS = --temp-config$(top_srcdir)/contrib/test_decoding/logical.conf
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
Parsed test spec with 2 sessions
2+
3+
starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
4+
step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
5+
?column?
6+
--------
7+
init
8+
(1 row)
9+
10+
step s0_begin: BEGIN;
11+
step s0_savepoint: SAVEPOINT sp1;
12+
step s0_truncate: TRUNCATE tbl1;
13+
step s1_checkpoint: CHECKPOINT;
14+
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
15+
data
16+
----
17+
(0 rows)
18+
19+
step s0_commit: COMMIT;
20+
step s0_begin: BEGIN;
21+
step s0_insert: INSERT INTO tbl1 VALUES (1);
22+
step s1_checkpoint: CHECKPOINT;
23+
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
24+
data
25+
---------------------------------------
26+
BEGIN
27+
table public.tbl1: TRUNCATE: (no-flags)
28+
COMMIT
29+
(3 rows)
30+
31+
step s0_commit: COMMIT;
32+
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
33+
data
34+
-------------------------------------------------------------
35+
BEGIN
36+
table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
37+
COMMIT
38+
(3 rows)
39+
40+
?column?
41+
--------
42+
stop
43+
(1 row)
44+
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Test decoding only the commit record of the transaction that have
2+
# modified catalogs.
3+
setup
4+
{
5+
DROPTABLEIFEXISTStbl1;
6+
CREATETABLEtbl1(val1integer,val2integer);
7+
}
8+
9+
teardown
10+
{
11+
DROPTABLEtbl1;
12+
SELECT'stop'FROMpg_drop_replication_slot('isolation_slot');
13+
}
14+
15+
session"s0"
16+
setup{SETsynchronous_commit=on;}
17+
step"s0_init"{SELECT'init'FROMpg_create_logical_replication_slot('isolation_slot','test_decoding');}
18+
step"s0_begin"{ BEGIN;}
19+
step"s0_savepoint"{SAVEPOINTsp1;}
20+
step"s0_truncate"{TRUNCATEtbl1;}
21+
step"s0_insert"{INSERTINTOtbl1VALUES(1);}
22+
step"s0_commit"{COMMIT;}
23+
24+
session"s1"
25+
setup{SETsynchronous_commit=on;}
26+
step"s1_checkpoint"{CHECKPOINT;}
27+
step"s1_get_changes"{SELECTdataFROMpg_logical_slot_get_changes('isolation_slot',NULL,NULL,'skip-empty-xacts','1','include-xids','0');}
28+
29+
# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
30+
# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
31+
# during the first checkpoint execution. This transaction must be marked as
32+
# containing catalog changes while decoding the COMMIT record and the decoding
33+
# of the INSERT record must read the pg_class with the correct historic snapshot.
34+
#
35+
# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit"
36+
# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS
37+
# record written by bgwriter. One might think we can either stop the bgwriter or
38+
# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
39+
permutation"s0_init""s0_begin""s0_savepoint""s0_truncate""s1_checkpoint""s1_get_changes""s0_commit""s0_begin""s0_insert""s1_checkpoint""s1_get_changes""s0_commit""s1_get_changes"

‎src/backend/replication/logical/decode.c

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,20 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
585585
if (!ctx->fast_forward)
586586
ReorderBufferAddInvalidations(ctx->reorder,xid,buf->origptr,
587587
parsed->nmsgs,parsed->msgs);
588-
ReorderBufferXidSetCatalogChanges(ctx->reorder,xid,buf->origptr);
588+
/*
589+
* If the COMMIT record has invalidation messages, it could have catalog
590+
* changes. It is possible that we didn't mark this transaction and
591+
* its subtransactions as containing catalog changes when the decoding
592+
* starts from a commit record without decoding the transaction's other
593+
* changes. Therefore, we ensure to mark such transactions as containing
594+
* catalog change.
595+
*
596+
* This must be done before SnapBuildCommitTxn() so that we can include
597+
* these transactions in the historic snapshot.
598+
*/
599+
SnapBuildXidSetCatalogChanges(ctx->snapshot_builder,xid,
600+
parsed->nsubxacts,parsed->subxacts,
601+
buf->origptr);
589602
}
590603

591604
SnapBuildCommitTxn(ctx->snapshot_builder,buf->origptr,xid,

‎src/backend/replication/logical/snapbuild.c

Lines changed: 126 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,38 @@ struct SnapBuild
257257
staticResourceOwnerSavedResourceOwnerDuringExport=NULL;
258258
staticboolExportInProgress= false;
259259

260-
/* ->committed manipulation */
261-
staticvoidSnapBuildPurgeCommittedTxn(SnapBuild*builder);
260+
/*
261+
* Array of transactions and subtransactions that were running when
262+
* the xl_running_xacts record that we decoded was written. The array is
263+
* sorted in xidComparator order. We remove xids from this array when
264+
* they become old enough to matter, and then it eventually becomes empty.
265+
* This array is allocated in builder->context so its lifetime is the same
266+
* as the snapshot builder.
267+
*
268+
* We normally rely on some WAL record types such as HEAP2_NEW_CID to know
269+
* if the transaction has changed the catalog. But it could happen that the
270+
* logical decoding decodes only the commit record of the transaction after
271+
* restoring the previously serialized snapshot in which case we will miss
272+
* adding the xid to the snapshot and end up looking at the catalogs with the
273+
* wrong snapshot.
274+
*
275+
* Now to avoid the above problem, if the COMMIT record of the xid listed in
276+
* InitialRunningXacts has XACT_XINFO_HAS_INVALS flag, we mark both the top
277+
* transaction and its substransactions as containing catalog changes.
278+
*
279+
* We could end up adding the transaction that didn't change catalog
280+
* to the snapshot since we cannot distinguish whether the transaction
281+
* has catalog changes only by checking the COMMIT record. It doesn't
282+
* have the information on which (sub) transaction has catalog changes,
283+
* and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the
284+
* transaction has catalog change. But that won't be a problem since we
285+
* use snapshot built during decoding only for reading system catalogs.
286+
*/
287+
staticTransactionId*InitialRunningXacts=NULL;
288+
staticintNInitialRunningXacts=0;
289+
290+
/* ->committed and InitailRunningXacts manipulation */
291+
staticvoidSnapBuildPurgeOlderTxn(SnapBuild*builder);
262292

263293
/* snapshot building/manipulation/distribution functions */
264294
staticSnapshotSnapBuildBuildSnapshot(SnapBuild*builder);
@@ -895,12 +925,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
895925
}
896926

897927
/*
898-
* Remove knowledge about transactions we treat as committed that are smaller
899-
* than ->xmin. Those won't ever get checked via the ->committed array but via
900-
* the clog machinery, so we don't need to waste memory on them.
928+
* Remove knowledge about transactions we treat as committed and the initial
929+
* running transactions that are smaller than ->xmin. Those won't ever get
930+
* checked via the ->committed or InitialRunningXacts array, respectively.
931+
* The committed xids will get checked via the clog machinery.
932+
*
933+
* We can ideally remove the transaction from InitialRunningXacts array
934+
* once it is finished (committed/aborted) but that could be costly as we need
935+
* to maintain the xids order in the array.
901936
*/
902937
staticvoid
903-
SnapBuildPurgeCommittedTxn(SnapBuild*builder)
938+
SnapBuildPurgeOlderTxn(SnapBuild*builder)
904939
{
905940
intoff;
906941
TransactionId*workspace;
@@ -935,6 +970,49 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
935970
builder->committed.xcnt=surviving_xids;
936971

937972
pfree(workspace);
973+
974+
/* Quick exit if there is no initial running transactions */
975+
if (NInitialRunningXacts==0)
976+
return;
977+
978+
/* bound check if there is at least one transaction to remove */
979+
if (!NormalTransactionIdPrecedes(InitialRunningXacts[0],
980+
builder->xmin))
981+
return;
982+
983+
/*
984+
* purge xids in InitialRunningXacts as well. The purged array must also
985+
* be sorted in xidComparator order.
986+
*/
987+
workspace=
988+
MemoryContextAlloc(builder->context,
989+
NInitialRunningXacts*sizeof(TransactionId));
990+
surviving_xids=0;
991+
for (off=0;off<NInitialRunningXacts;off++)
992+
{
993+
if (NormalTransactionIdPrecedes(InitialRunningXacts[off],
994+
builder->xmin))
995+
;/* remove */
996+
else
997+
workspace[surviving_xids++]=InitialRunningXacts[off];
998+
}
999+
1000+
if (surviving_xids>0)
1001+
memcpy(InitialRunningXacts,workspace,
1002+
sizeof(TransactionId)*surviving_xids);
1003+
else
1004+
{
1005+
pfree(InitialRunningXacts);
1006+
InitialRunningXacts=NULL;
1007+
}
1008+
1009+
elog(DEBUG3,"purged initial running transactions from %u to %u, oldest running xid %u",
1010+
(uint32)NInitialRunningXacts,
1011+
(uint32)surviving_xids,
1012+
builder->xmin);
1013+
1014+
NInitialRunningXacts=surviving_xids;
1015+
pfree(workspace);
9381016
}
9391017

9401018
/*
@@ -1142,7 +1220,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
11421220
builder->xmin=running->oldestRunningXid;
11431221

11441222
/* Remove transactions we don't need to keep track off anymore */
1145-
SnapBuildPurgeCommittedTxn(builder);
1223+
SnapBuildPurgeOlderTxn(builder);
11461224

11471225
/*
11481226
* Advance the xmin limit for the current replication slot, to allow
@@ -1293,6 +1371,20 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
12931371
elseif (!builder->building_full_snapshot&&
12941372
SnapBuildRestore(builder,lsn))
12951373
{
1374+
intnxacts=running->subxcnt+running->xcnt;
1375+
Sizesz=sizeof(TransactionId)*nxacts;
1376+
1377+
/*
1378+
* Remember the transactions and subtransactions that were running
1379+
* when xl_running_xacts record that we decoded was written. We use
1380+
* this later to identify the transactions have performed catalog
1381+
* changes. See SnapBuildXidSetCatalogChanges.
1382+
*/
1383+
NInitialRunningXacts=nxacts;
1384+
InitialRunningXacts=MemoryContextAlloc(builder->context,sz);
1385+
memcpy(InitialRunningXacts,running->xids,sz);
1386+
qsort(InitialRunningXacts,nxacts,sizeof(TransactionId),xidComparator);
1387+
12961388
/* there won't be any state to cleanup */
12971389
return false;
12981390
}
@@ -2035,3 +2127,30 @@ CheckPointSnapBuild(void)
20352127
}
20362128
FreeDir(snap_dir);
20372129
}
2130+
2131+
/*
2132+
* Mark the transaction as containing catalog changes. In addition, if the
2133+
* given xid is in the list of the initial running xacts, we mark its
2134+
* subtransactions as well. See comments for NInitialRunningXacts and
2135+
* InitialRunningXacts for additional info.
2136+
*/
2137+
void
2138+
SnapBuildXidSetCatalogChanges(SnapBuild*builder,TransactionIdxid,intsubxcnt,
2139+
TransactionId*subxacts,XLogRecPtrlsn)
2140+
{
2141+
ReorderBufferXidSetCatalogChanges(builder->reorder,xid,lsn);
2142+
2143+
/* Skip if there is no initial running xacts information */
2144+
if (NInitialRunningXacts==0)
2145+
return;
2146+
2147+
if (bsearch(&xid,InitialRunningXacts,NInitialRunningXacts,
2148+
sizeof(TransactionId),xidComparator)!=NULL)
2149+
{
2150+
for (inti=0;i<subxcnt;i++)
2151+
{
2152+
ReorderBufferAssignChild(builder->reorder,xid,subxacts[i],lsn);
2153+
ReorderBufferXidSetCatalogChanges(builder->reorder,subxacts[i],lsn);
2154+
}
2155+
}
2156+
}

‎src/include/replication/snapbuild.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,7 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
8888
structxl_running_xacts*running);
8989
externvoidSnapBuildSerializationPoint(SnapBuild*builder,XLogRecPtrlsn);
9090

91+
externvoidSnapBuildXidSetCatalogChanges(SnapBuild*builder,TransactionIdxid,
92+
intsubxcnt,TransactionId*subxacts,
93+
XLogRecPtrlsn);
9194
#endif/* SNAPBUILD_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp