Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd7eb52d

Browse files
author
Amit Kapila
committed
Execute invalidation messages for each XLOG_XACT_INVALIDATIONS message
during logical decoding.Prior to commitc55040c we have no way of knowing the invalidationsbefore commit. So, while decoding we use to execute all the invalidationsat each command end as we had no way of knowing which invalidationshappened before that command. Due to this, transactions involving largeamounts of DDLs use to take more time and also lead to high CPU usage. Butnow we know specific invalidations at each command end so we execute onlyrequired invalidations.It has been observed that decoding of a transaction containing truncationof a table with 1000 partitions would be finished in 1s whereas beforethis patch it used to take 4-5 minutes.Author: Dilip KumarReviewed-by: Amit Kapila and Keisuke KurodaDiscussion:https://postgr.es/m/CANDwggKYveEtXjXjqHA6RL3AKSHMsQyfRY6bK+NqhAWJyw8psQ@mail.gmail.com
1 parent564a410 commitd7eb52d

File tree

2 files changed

+92
-23
lines changed

2 files changed

+92
-23
lines changed

‎src/backend/replication/logical/reorderbuffer.c‎

Lines changed: 82 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
235235
staticReorderBufferChange*ReorderBufferIterTXNNext(ReorderBuffer*rb,ReorderBufferIterTXNState*state);
236236
staticvoidReorderBufferIterTXNFinish(ReorderBuffer*rb,
237237
ReorderBufferIterTXNState*state);
238-
staticvoidReorderBufferExecuteInvalidations(ReorderBuffer*rb,ReorderBufferTXN*txn);
238+
staticvoidReorderBufferExecuteInvalidations(uint32nmsgs,SharedInvalidationMessage*msgs);
239239

240240
/*
241241
* ---------------------------------------
@@ -486,6 +486,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
486486
pfree(change->data.msg.message);
487487
change->data.msg.message=NULL;
488488
break;
489+
caseREORDER_BUFFER_CHANGE_INVALIDATION:
490+
if (change->data.inval.invalidations)
491+
pfree(change->data.inval.invalidations);
492+
change->data.inval.invalidations=NULL;
493+
break;
489494
caseREORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
490495
if (change->data.snapshot)
491496
{
@@ -2194,6 +2199,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
21942199
ReorderBufferApplyMessage(rb,txn,change,streaming);
21952200
break;
21962201

2202+
caseREORDER_BUFFER_CHANGE_INVALIDATION:
2203+
/* Execute the invalidation messages locally */
2204+
ReorderBufferExecuteInvalidations(
2205+
change->data.inval.ninvalidations,
2206+
change->data.inval.invalidations);
2207+
break;
2208+
21972209
caseREORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
21982210
/* get rid of the old */
21992211
TeardownHistoricSnapshot(false);
@@ -2244,13 +2256,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
22442256

22452257
TeardownHistoricSnapshot(false);
22462258
SetupHistoricSnapshot(snapshot_now,txn->tuplecid_hash);
2247-
2248-
/*
2249-
* Every time the CommandId is incremented, we could
2250-
* see new catalog contents, so execute all
2251-
* invalidations.
2252-
*/
2253-
ReorderBufferExecuteInvalidations(rb,txn);
22542259
}
22552260

22562261
break;
@@ -2317,7 +2322,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
23172322
AbortCurrentTransaction();
23182323

23192324
/* make sure there's no cache pollution */
2320-
ReorderBufferExecuteInvalidations(rb,txn);
2325+
ReorderBufferExecuteInvalidations(txn->ninvalidations,txn->invalidations);
23212326

23222327
if (using_subtxn)
23232328
RollbackAndReleaseCurrentSubTransaction();
@@ -2356,7 +2361,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
23562361
AbortCurrentTransaction();
23572362

23582363
/* make sure there's no cache pollution */
2359-
ReorderBufferExecuteInvalidations(rb,txn);
2364+
ReorderBufferExecuteInvalidations(txn->ninvalidations,
2365+
txn->invalidations);
23602366

23612367
if (using_subtxn)
23622368
RollbackAndReleaseCurrentSubTransaction();
@@ -2813,23 +2819,30 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
28132819
* Setup the invalidation of the toplevel transaction.
28142820
*
28152821
* This needs to be called for each XLOG_XACT_INVALIDATIONS message and
2816-
* accumulates all the invalidation messages in the toplevel transaction.
2817-
* This is required because in some cases where we skip processing the
2818-
* transaction (see ReorderBufferForget), we need to execute all the
2819-
* invalidations together.
2822+
* accumulates all the invalidation messages in the toplevel transaction as
2823+
* well as in the form of change in reorder buffer. We require to record it in
2824+
* form of the change so that we can execute only the required invalidations
2825+
* instead of executing all the invalidations on each CommandId increment. We
2826+
* also need to accumulate these in the toplevel transaction because in some
2827+
* cases we skip processing the transaction (see ReorderBufferForget), we need
2828+
* to execute all the invalidations together.
28202829
*/
28212830
void
28222831
ReorderBufferAddInvalidations(ReorderBuffer*rb,TransactionIdxid,
28232832
XLogRecPtrlsn,Sizenmsgs,
28242833
SharedInvalidationMessage*msgs)
28252834
{
28262835
ReorderBufferTXN*txn;
2836+
MemoryContextoldcontext;
2837+
ReorderBufferChange*change;
28272838

28282839
txn=ReorderBufferTXNByXid(rb,xid, true,NULL,lsn, true);
28292840

2841+
oldcontext=MemoryContextSwitchTo(rb->context);
2842+
28302843
/*
2831-
*We collectall the invalidations under the top transaction so that we
2832-
*canexecute them all together.
2844+
*Collectall the invalidations under the top transaction so that we can
2845+
* execute them all together. See comment atop this function
28332846
*/
28342847
if (txn->toptxn)
28352848
txn=txn->toptxn;
@@ -2841,8 +2854,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
28412854
{
28422855
txn->ninvalidations=nmsgs;
28432856
txn->invalidations= (SharedInvalidationMessage*)
2844-
MemoryContextAlloc(rb->context,
2845-
sizeof(SharedInvalidationMessage)*nmsgs);
2857+
palloc(sizeof(SharedInvalidationMessage)*nmsgs);
28462858
memcpy(txn->invalidations,msgs,
28472859
sizeof(SharedInvalidationMessage)*nmsgs);
28482860
}
@@ -2856,19 +2868,31 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
28562868
nmsgs*sizeof(SharedInvalidationMessage));
28572869
txn->ninvalidations+=nmsgs;
28582870
}
2871+
2872+
change=ReorderBufferGetChange(rb);
2873+
change->action=REORDER_BUFFER_CHANGE_INVALIDATION;
2874+
change->data.inval.ninvalidations=nmsgs;
2875+
change->data.inval.invalidations= (SharedInvalidationMessage*)
2876+
palloc(sizeof(SharedInvalidationMessage)*nmsgs);
2877+
memcpy(change->data.inval.invalidations,msgs,
2878+
sizeof(SharedInvalidationMessage)*nmsgs);
2879+
2880+
ReorderBufferQueueChange(rb,xid,lsn,change, false);
2881+
2882+
MemoryContextSwitchTo(oldcontext);
28592883
}
28602884

28612885
/*
28622886
* Apply all invalidations we know. Possibly we only need parts at this point
28632887
* in the changestream but we don't know which those are.
28642888
*/
28652889
staticvoid
2866-
ReorderBufferExecuteInvalidations(ReorderBuffer*rb,ReorderBufferTXN*txn)
2890+
ReorderBufferExecuteInvalidations(uint32nmsgs,SharedInvalidationMessage*msgs)
28672891
{
28682892
inti;
28692893

2870-
for (i=0;i<txn->ninvalidations;i++)
2871-
LocalExecuteInvalidationMessage(&txn->invalidations[i]);
2894+
for (i=0;i<nmsgs;i++)
2895+
LocalExecuteInvalidationMessage(&msgs[i]);
28722896
}
28732897

28742898
/*
@@ -3301,6 +3325,24 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
33013325
change->data.msg.message_size);
33023326
data+=change->data.msg.message_size;
33033327

3328+
break;
3329+
}
3330+
caseREORDER_BUFFER_CHANGE_INVALIDATION:
3331+
{
3332+
char*data;
3333+
Sizeinval_size=sizeof(SharedInvalidationMessage)*
3334+
change->data.inval.ninvalidations;
3335+
3336+
sz+=inval_size;
3337+
3338+
ReorderBufferSerializeReserve(rb,sz);
3339+
data= ((char*)rb->outbuf)+sizeof(ReorderBufferDiskChange);
3340+
3341+
/* might have been reallocated above */
3342+
ondisk= (ReorderBufferDiskChange*)rb->outbuf;
3343+
memcpy(data,change->data.inval.invalidations,inval_size);
3344+
data+=inval_size;
3345+
33043346
break;
33053347
}
33063348
caseREORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
@@ -3578,6 +3620,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
35783620

35793621
break;
35803622
}
3623+
caseREORDER_BUFFER_CHANGE_INVALIDATION:
3624+
{
3625+
sz+=sizeof(SharedInvalidationMessage)*
3626+
change->data.inval.ninvalidations;
3627+
break;
3628+
}
35813629
caseREORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
35823630
{
35833631
Snapshotsnap;
@@ -3844,6 +3892,19 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
38443892
change->data.msg.message_size);
38453893
data+=change->data.msg.message_size;
38463894

3895+
break;
3896+
}
3897+
caseREORDER_BUFFER_CHANGE_INVALIDATION:
3898+
{
3899+
Sizeinval_size=sizeof(SharedInvalidationMessage)*
3900+
change->data.inval.ninvalidations;
3901+
3902+
change->data.inval.invalidations=
3903+
MemoryContextAlloc(rb->context,inval_size);
3904+
3905+
/* read the message */
3906+
memcpy(change->data.inval.invalidations,data,inval_size);
3907+
38473908
break;
38483909
}
38493910
caseREORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:

‎src/include/replication/reorderbuffer.h‎

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ enum ReorderBufferChangeType
5757
REORDER_BUFFER_CHANGE_UPDATE,
5858
REORDER_BUFFER_CHANGE_DELETE,
5959
REORDER_BUFFER_CHANGE_MESSAGE,
60+
REORDER_BUFFER_CHANGE_INVALIDATION,
6061
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
6162
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
6263
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
@@ -149,6 +150,13 @@ typedef struct ReorderBufferChange
149150
CommandIdcmax;
150151
CommandIdcombocid;
151152
}tuplecid;
153+
154+
/* Invalidation. */
155+
struct
156+
{
157+
uint32ninvalidations;/* Number of messages */
158+
SharedInvalidationMessage*invalidations;/* invalidation message */
159+
}inval;
152160
}data;
153161

154162
/*
@@ -313,8 +321,8 @@ typedef struct ReorderBufferTXN
313321
uint64nentries_mem;
314322

315323
/*
316-
* List of ReorderBufferChange structs, including new Snapshots and new
317-
* CommandIds
324+
* List of ReorderBufferChange structs, including new Snapshots, new
325+
* CommandIds and command invalidation messages.
318326
*/
319327
dlist_headchanges;
320328

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp