Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit45c357e

Browse files
Fix re-distributing previously distributed invalidation messages during logical decoding.
Commit4909b38 introduced logic to distribute invalidation messagesfrom catalog-modifying transactions to all concurrent in-progresstransactions. However, since each transaction distributes not only itsoriginal invalidation messages but also previously distributedmessages to other transactions, this leads to an exponential increasein allocation request size for invalidation messages, ultimatelycausing memory allocation failure.This commit fixes this issue by tracking distributed invalidationmessages separately per decoded transaction and not redistributingthese messages to other in-progress transactions. The maximum size ofdistributed invalidation messages that one transaction can store islimited to MAX_DISTR_INVAL_MSG_PER_TXN (8MB). Once the size of thedistributed invalidation messages exceeds this threshold, weinvalidate all caches in locations where distributed invalidationmessages need to be executed.Back-patch to all supported versions where we introduced the fix bycommit4909b38.Note that this commit adds two new fields to ReorderBufferTXN to storethe distributed transactions. This change breaks ABI compatibility inback branches, affecting third-party extensions that depend on thesize of the ReorderBufferTXN struct, though this scenario seemsunlikely.Additionally, it adds a new flag to the txn_flags field ofReorderBufferTXN to indicate distributed invalidation messageoverflow. This should not affect existing implementations, as it isunlikely that third-party extensions use unused bits in the txn_flagsfield.Bug: #18938 #18942Author: vignesh C <vignesh21@gmail.com>Reported-by: Duncan Sands <duncan.sands@deepbluecap.com>Reported-by: John Hutchins <john.hutchins@wicourts.gov>Reported-by: Laurence Parry <greenreaper@hotmail.com>Reported-by: Max Madden <maxmmadden@gmail.com>Reported-by: Braulio Fdo Gonzalez <brauliofg@gmail.com>Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>Discussion:https://postgr.es/m/680bdaf6-f7d1-4536-b580-05c2760c67c6@deepbluecap.comDiscussion:https://postgr.es/m/18942-0ab1e5ae156613ad@postgresql.orgDiscussion:https://postgr.es/m/18938-57c9a1c463b68ce0@postgresql.orgDiscussion:https://postgr.es/m/CAD1FGCT2sYrP_70RTuo56QTizyc+J3wJdtn2gtO3VttQFpdMZg@mail.gmail.comDiscussion:https://postgr.es/m/CANO2=B=2BT1hSYCE=nuuTnVTnjidMg0+-FfnRnqM6kd23qoygg@mail.gmail.comBackpatch-through: 13
1 parent32ab0fd commit45c357e

File tree

5 files changed

+231
-45
lines changed

5 files changed

+231
-45
lines changed

‎contrib/test_decoding/expected/invalidation_distribution.out‎

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Parsed test spec with2 sessions
1+
Parsed test spec with3 sessions
22

33
starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
44
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
@@ -18,3 +18,24 @@ count
1818
stop
1919
(1 row)
2020

21+
22+
starting permutation: s1_begin s1_insert_tbl1 s3_begin s3_insert_tbl1 s2_alter_pub_add_tbl s1_insert_tbl1 s1_commit s3_commit s2_get_binary_changes
23+
step s1_begin: BEGIN;
24+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
25+
step s3_begin: BEGIN;
26+
step s3_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (2, 2);
27+
step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
28+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
29+
step s1_commit: COMMIT;
30+
step s3_commit: COMMIT;
31+
step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
32+
count
33+
-----
34+
1
35+
(1 row)
36+
37+
?column?
38+
--------
39+
stop
40+
(1 row)
41+

‎contrib/test_decoding/specs/invalidation_distribution.spec‎

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,16 @@ setup { SET synchronous_commit=on; }
2828
step"s2_alter_pub_add_tbl" {ALTERPUBLICATIONpubADDTABLEtbl1; }
2929
step"s2_get_binary_changes" {SELECTcount(data)FROMpg_logical_slot_get_binary_changes('isolation_slot',NULL,NULL,'proto_version','4','publication_names','pub')WHEREget_byte(data,0)=73; }
3030

31+
session"s3"
32+
setup {SETsynchronous_commit=on; }
33+
step"s3_begin" {BEGIN; }
34+
step"s3_insert_tbl1" {INSERTINTOtbl1 (val1,val2)VALUES (2,2); }
35+
step"s3_commit" {COMMIT; }
36+
3137
# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
3238
permutation"s1_insert_tbl1""s1_begin""s1_insert_tbl1""s2_alter_pub_add_tbl""s1_commit""s1_insert_tbl1""s2_get_binary_changes"
39+
40+
# Expect to get one insert change with LOGICAL_REP_MSG_INSERT = 'I' from
41+
# the second "s1_insert_tbl1" executed after adding the table tbl1 to the
42+
# publication in "s2_alter_pub_add_tbl".
43+
permutation"s1_begin""s1_insert_tbl1""s3_begin""s3_insert_tbl1""s2_alter_pub_add_tbl""s1_insert_tbl1""s1_commit""s3_commit""s2_get_binary_changes"

‎src/backend/replication/logical/reorderbuffer.c‎

Lines changed: 163 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,22 @@
108108
#include"storage/fd.h"
109109
#include"storage/sinval.h"
110110
#include"utils/builtins.h"
111+
#include"utils/inval.h"
111112
#include"utils/memutils.h"
112113
#include"utils/rel.h"
113114
#include"utils/relfilenumbermap.h"
114115

116+
/*
117+
* Each transaction has an 8MB limit for invalidation messages distributed from
118+
* other transactions. This limit is set considering scenarios with many
119+
* concurrent logical decoding operations. When the distributed invalidation
120+
* messages reach this threshold, the transaction is marked as
121+
* RBTXN_DISTR_INVAL_OVERFLOWED to invalidate the complete cache as we have lost
122+
* some inval messages and hence don't know what needs to be invalidated.
123+
*/
124+
#defineMAX_DISTR_INVAL_MSG_PER_TXN \
125+
((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))
126+
115127
/* entry for a hash table we use to map from xid to our transaction state */
116128
typedefstructReorderBufferTXNByIdEnt
117129
{
@@ -469,6 +481,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
469481
txn->invalidations=NULL;
470482
}
471483

484+
if (txn->invalidations_distributed)
485+
{
486+
pfree(txn->invalidations_distributed);
487+
txn->invalidations_distributed=NULL;
488+
}
489+
472490
/* Reset the toast hash */
473491
ReorderBufferToastReset(rb,txn);
474492

@@ -2574,7 +2592,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
25742592
AbortCurrentTransaction();
25752593

25762594
/* make sure there's no cache pollution */
2577-
ReorderBufferExecuteInvalidations(txn->ninvalidations,txn->invalidations);
2595+
if (rbtxn_distr_inval_overflowed(txn))
2596+
{
2597+
Assert(txn->ninvalidations_distributed==0);
2598+
InvalidateSystemCaches();
2599+
}
2600+
else
2601+
{
2602+
ReorderBufferExecuteInvalidations(txn->ninvalidations,txn->invalidations);
2603+
ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
2604+
txn->invalidations_distributed);
2605+
}
25782606

25792607
if (using_subtxn)
25802608
RollbackAndReleaseCurrentSubTransaction();
@@ -2620,8 +2648,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
26202648
AbortCurrentTransaction();
26212649

26222650
/* make sure there's no cache pollution */
2623-
ReorderBufferExecuteInvalidations(txn->ninvalidations,
2624-
txn->invalidations);
2651+
if (rbtxn_distr_inval_overflowed(txn))
2652+
{
2653+
Assert(txn->ninvalidations_distributed==0);
2654+
InvalidateSystemCaches();
2655+
}
2656+
else
2657+
{
2658+
ReorderBufferExecuteInvalidations(txn->ninvalidations,txn->invalidations);
2659+
ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
2660+
txn->invalidations_distributed);
2661+
}
26252662

26262663
if (using_subtxn)
26272664
RollbackAndReleaseCurrentSubTransaction();
@@ -2951,7 +2988,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
29512988
* We might have decoded changes for this transaction that could load
29522989
* the cache as per the current transaction's view (consider DDL's
29532990
* happened in this transaction). We don't want the decoding of future
2954-
* transactions to use those cache entries so execute invalidations.
2991+
* transactions to use those cache entries so execute only the inval
2992+
* messages in this transaction.
29552993
*/
29562994
if (txn->ninvalidations>0)
29572995
ReorderBufferImmediateInvalidation(rb,txn->ninvalidations,
@@ -3038,9 +3076,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
30383076
txn->final_lsn=lsn;
30393077

30403078
/*
3041-
* Process cache invalidation messages if there are any. Even if we're not
3042-
* interested in the transaction's contents, it could have manipulated the
3043-
* catalog and we need to update the caches according to that.
3079+
* Process only cache invalidation messages in this transaction if there
3080+
* are any. Even if we're not interested in the transaction's contents, it
3081+
* could have manipulated the catalog and we need to update the caches
3082+
* according to that.
30443083
*/
30453084
if (txn->base_snapshot!=NULL&&txn->ninvalidations>0)
30463085
ReorderBufferImmediateInvalidation(rb,txn->ninvalidations,
@@ -3312,6 +3351,57 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
33123351
txn->ntuplecids++;
33133352
}
33143353

3354+
/*
3355+
* Add new invalidation messages to the reorder buffer queue.
3356+
*/
3357+
staticvoid
3358+
ReorderBufferQueueInvalidations(ReorderBuffer*rb,TransactionIdxid,
3359+
XLogRecPtrlsn,Sizenmsgs,
3360+
SharedInvalidationMessage*msgs)
3361+
{
3362+
ReorderBufferChange*change;
3363+
3364+
change=ReorderBufferGetChange(rb);
3365+
change->action=REORDER_BUFFER_CHANGE_INVALIDATION;
3366+
change->data.inval.ninvalidations=nmsgs;
3367+
change->data.inval.invalidations= (SharedInvalidationMessage*)
3368+
palloc(sizeof(SharedInvalidationMessage)*nmsgs);
3369+
memcpy(change->data.inval.invalidations,msgs,
3370+
sizeof(SharedInvalidationMessage)*nmsgs);
3371+
3372+
ReorderBufferQueueChange(rb,xid,lsn,change, false);
3373+
}
3374+
3375+
/*
3376+
* A helper function for ReorderBufferAddInvalidations() and
3377+
* ReorderBufferAddDistributedInvalidations() to accumulate the invalidation
3378+
* messages to the **invals_out.
3379+
*/
3380+
staticvoid
3381+
ReorderBufferAccumulateInvalidations(SharedInvalidationMessage**invals_out,
3382+
uint32*ninvals_out,
3383+
SharedInvalidationMessage*msgs_new,
3384+
Sizenmsgs_new)
3385+
{
3386+
if (*ninvals_out==0)
3387+
{
3388+
*ninvals_out=nmsgs_new;
3389+
*invals_out= (SharedInvalidationMessage*)
3390+
palloc(sizeof(SharedInvalidationMessage)*nmsgs_new);
3391+
memcpy(*invals_out,msgs_new,sizeof(SharedInvalidationMessage)*nmsgs_new);
3392+
}
3393+
else
3394+
{
3395+
/* Enlarge the array of inval messages */
3396+
*invals_out= (SharedInvalidationMessage*)
3397+
repalloc(*invals_out,sizeof(SharedInvalidationMessage)*
3398+
(*ninvals_out+nmsgs_new));
3399+
memcpy(*invals_out+*ninvals_out,msgs_new,
3400+
nmsgs_new*sizeof(SharedInvalidationMessage));
3401+
*ninvals_out+=nmsgs_new;
3402+
}
3403+
}
3404+
33153405
/*
33163406
* Accumulate the invalidations for executing them later.
33173407
*
@@ -3332,7 +3422,6 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
33323422
{
33333423
ReorderBufferTXN*txn;
33343424
MemoryContextoldcontext;
3335-
ReorderBufferChange*change;
33363425

33373426
txn=ReorderBufferTXNByXid(rb,xid, true,NULL,lsn, true);
33383427

@@ -3347,35 +3436,76 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
33473436

33483437
Assert(nmsgs>0);
33493438

3350-
/* Accumulate invalidations. */
3351-
if (txn->ninvalidations==0)
3352-
{
3353-
txn->ninvalidations=nmsgs;
3354-
txn->invalidations= (SharedInvalidationMessage*)
3355-
palloc(sizeof(SharedInvalidationMessage)*nmsgs);
3356-
memcpy(txn->invalidations,msgs,
3357-
sizeof(SharedInvalidationMessage)*nmsgs);
3358-
}
3359-
else
3439+
ReorderBufferAccumulateInvalidations(&txn->invalidations,
3440+
&txn->ninvalidations,
3441+
msgs,nmsgs);
3442+
3443+
ReorderBufferQueueInvalidations(rb,xid,lsn,nmsgs,msgs);
3444+
3445+
MemoryContextSwitchTo(oldcontext);
3446+
}
3447+
3448+
/*
3449+
* Accumulate the invalidations distributed by other committed transactions
3450+
* for executing them later.
3451+
*
3452+
* This function is similar to ReorderBufferAddInvalidations() but stores
3453+
* the given inval messages to the txn->invalidations_distributed with the
3454+
* overflow check.
3455+
*
3456+
* This needs to be called by committed transactions to distribute their
3457+
* inval messages to in-progress transactions.
3458+
*/
3459+
void
3460+
ReorderBufferAddDistributedInvalidations(ReorderBuffer*rb,TransactionIdxid,
3461+
XLogRecPtrlsn,Sizenmsgs,
3462+
SharedInvalidationMessage*msgs)
3463+
{
3464+
ReorderBufferTXN*txn;
3465+
MemoryContextoldcontext;
3466+
3467+
txn=ReorderBufferTXNByXid(rb,xid, true,NULL,lsn, true);
3468+
3469+
oldcontext=MemoryContextSwitchTo(rb->context);
3470+
3471+
/*
3472+
* Collect all the invalidations under the top transaction, if available,
3473+
* so that we can execute them all together. See comments
3474+
* ReorderBufferAddInvalidations.
3475+
*/
3476+
txn=rbtxn_get_toptxn(txn);
3477+
3478+
Assert(nmsgs>0);
3479+
3480+
if (!rbtxn_distr_inval_overflowed(txn))
33603481
{
3361-
txn->invalidations= (SharedInvalidationMessage*)
3362-
repalloc(txn->invalidations,sizeof(SharedInvalidationMessage)*
3363-
(txn->ninvalidations+nmsgs));
3482+
/*
3483+
* Check the transaction has enough space for storing distributed
3484+
* invalidation messages.
3485+
*/
3486+
if (txn->ninvalidations_distributed+nmsgs >=MAX_DISTR_INVAL_MSG_PER_TXN)
3487+
{
3488+
/*
3489+
* Mark the invalidation message as overflowed and free up the
3490+
* messages accumulated so far.
3491+
*/
3492+
txn->txn_flags |=RBTXN_DISTR_INVAL_OVERFLOWED;
33643493

3365-
memcpy(txn->invalidations+txn->ninvalidations,msgs,
3366-
nmsgs*sizeof(SharedInvalidationMessage));
3367-
txn->ninvalidations+=nmsgs;
3494+
if (txn->invalidations_distributed)
3495+
{
3496+
pfree(txn->invalidations_distributed);
3497+
txn->invalidations_distributed=NULL;
3498+
txn->ninvalidations_distributed=0;
3499+
}
3500+
}
3501+
else
3502+
ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed,
3503+
&txn->ninvalidations_distributed,
3504+
msgs,nmsgs);
33683505
}
33693506

3370-
change=ReorderBufferGetChange(rb);
3371-
change->action=REORDER_BUFFER_CHANGE_INVALIDATION;
3372-
change->data.inval.ninvalidations=nmsgs;
3373-
change->data.inval.invalidations= (SharedInvalidationMessage*)
3374-
palloc(sizeof(SharedInvalidationMessage)*nmsgs);
3375-
memcpy(change->data.inval.invalidations,msgs,
3376-
sizeof(SharedInvalidationMessage)*nmsgs);
3377-
3378-
ReorderBufferQueueChange(rb,xid,lsn,change, false);
3507+
/* Queue the invalidation messages into the transaction */
3508+
ReorderBufferQueueInvalidations(rb,xid,lsn,nmsgs,msgs);
33793509

33803510
MemoryContextSwitchTo(oldcontext);
33813511
}

‎src/backend/replication/logical/snapbuild.c‎

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -932,6 +932,13 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
932932
* contents built by the current transaction even after its decoding,
933933
* which should have been invalidated due to concurrent catalog
934934
* changing transaction.
935+
*
936+
* Distribute only the invalidation messages generated by the current
937+
* committed transaction. Invalidation messages received from other
938+
* transactions would have already been propagated to the relevant
939+
* in-progress transactions. This transaction would have processed
940+
* those invalidations, ensuring that subsequent transactions observe
941+
* a consistent cache state.
935942
*/
936943
if (txn->xid!=xid)
937944
{
@@ -945,8 +952,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
945952
{
946953
Assert(msgs!=NULL);
947954

948-
ReorderBufferAddInvalidations(builder->reorder,txn->xid,lsn,
949-
ninvalidations,msgs);
955+
ReorderBufferAddDistributedInvalidations(builder->reorder,
956+
txn->xid,lsn,
957+
ninvalidations,msgs);
950958
}
951959
}
952960
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp