Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbf07ab4

Browse files
author
Amit Kapila
committed
Avoid unnecessary streaming of transactions during logical replication.
After restart, we don't perform streaming of an in-progress transaction ifit was previously decoded and confirmed by the client. To achieve that wewere comparing the END location of the WAL record being decoded with theWAL location we have already decoded and confirmed by the client. Whiledecoding the commit record, to decide whether to process and send thecomplete transaction, we compare its START location with the WAL locationwe have already decoded and confirmed by the client. Now, if we need toqueue some change in the transaction while decoding the commit record(e.g. snapshot), it is possible that we decide to stream the transactionbut later commit processing decides to skip it. In such a case, we wouldneedlessly send the changes and later when we decide to skip it, we willsend stream abort.We also sometimes decide to stream the changes when we actually just needto process them locally like a change for invalidations. This will lead usto send empty streams. To avoid this, while queuing each change fordecoding, we remember whether the transaction has any change that actuallyneeds to be sent downstream and use that information later to decidewhether to stream the transaction or not.Note, we can't avoid all cases where we have to send empty streams likethe case where the plugin later decides that the change is notpublishable. However, we will no longer need to send stream_abort when weskip sending a particular transaction.Author: Dilip KumarReviewed-by: Hou Zhijie, Ashutosh Bapat, Shi yu, Amit KapilaDiscussion:https://postgr.es/m/CAFiTN-tHK=7LzfrPs8fbT2ksrOJGQbzywcgXst2bM9-rJJAAUg@mail.gmail.com
1 parent3f0e786 commitbf07ab4

File tree

2 files changed

+53
-21
lines changed

2 files changed

+53
-21
lines changed

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -695,9 +695,9 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
695695
* Record the partial change for the streaming of in-progress transactions. We
696696
* can stream only complete changes so if we have a partial change like toast
697697
* table insert or speculative insert then we mark such a 'txn' so that it
698-
* can't be streamed. We also ensure that if the changes in such a 'txn'are
699-
* above logical_decoding_work_mem threshold then we stream them as soon as we
700-
* have a complete change.
698+
* can't be streamed. We also ensure that if the changes in such a 'txn'can
699+
*be streamed and areabove logical_decoding_work_mem threshold then we stream
700+
*them as soon as wehave a complete change.
701701
*/
702702
staticvoid
703703
ReorderBufferProcessPartialChange(ReorderBuffer*rb,ReorderBufferTXN*txn,
@@ -762,7 +762,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
762762
*/
763763
if (ReorderBufferCanStartStreaming(rb)&&
764764
!(rbtxn_has_partial_change(toptxn))&&
765-
rbtxn_is_serialized(txn))
765+
rbtxn_is_serialized(txn)&&
766+
rbtxn_has_streamable_change(toptxn))
766767
ReorderBufferStreamTXN(rb,toptxn);
767768
}
768769

@@ -793,6 +794,29 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
793794
return;
794795
}
795796

797+
/*
798+
* The changes that are sent downstream are considered streamable. We
799+
* remember such transactions so that only those will later be considered
800+
* for streaming.
801+
*/
802+
if (change->action==REORDER_BUFFER_CHANGE_INSERT||
803+
change->action==REORDER_BUFFER_CHANGE_UPDATE||
804+
change->action==REORDER_BUFFER_CHANGE_DELETE||
805+
change->action==REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT||
806+
change->action==REORDER_BUFFER_CHANGE_TRUNCATE||
807+
change->action==REORDER_BUFFER_CHANGE_MESSAGE)
808+
{
809+
ReorderBufferTXN*toptxn;
810+
811+
/* get the top transaction */
812+
if (txn->toptxn!=NULL)
813+
toptxn=txn->toptxn;
814+
else
815+
toptxn=txn;
816+
817+
toptxn->txn_flags |=RBTXN_HAS_STREAMABLE_CHANGE;
818+
}
819+
796820
change->lsn=lsn;
797821
change->txn=txn;
798822

@@ -2942,9 +2966,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
29422966
if (txn==NULL)
29432967
return;
29442968

2945-
/* For streamed transactions notify the remote node about the abort. */
2946-
if (rbtxn_is_streamed(txn))
2947-
rb->stream_abort(rb,txn,lsn);
2969+
/* this transaction mustn't be streamed */
2970+
Assert(!rbtxn_is_streamed(txn));
29482971

29492972
/* cosmetic... */
29502973
txn->final_lsn=lsn;
@@ -3460,14 +3483,15 @@ ReorderBufferLargestTXN(ReorderBuffer *rb)
34603483
}
34613484

34623485
/*
3463-
* Find the largest toplevel transaction to evict (by streaming).
3486+
* Find the largeststreamabletoplevel transaction to evict (by streaming).
34643487
*
34653488
* This can be seen as an optimized version of ReorderBufferLargestTXN, which
34663489
* should give us the same transaction (because we don't update memory account
34673490
* for subtransaction with streaming, so it's always 0). But we can simply
34683491
* iterate over the limited number of toplevel transactions that have a base
34693492
* snapshot. There is no use of selecting a transaction that doesn't have base
3470-
* snapshot because we don't decode such transactions.
3493+
* snapshot because we don't decode such transactions. Also, we do not select
3494+
* the transaction which doesn't have any streamable change.
34713495
*
34723496
* Note that, we skip transactions that contains incomplete changes. There
34733497
* is a scope of optimization here such that we can select the largest
@@ -3483,7 +3507,7 @@ ReorderBufferLargestTXN(ReorderBuffer *rb)
34833507
* the subxact from where we streamed the last change.
34843508
*/
34853509
staticReorderBufferTXN*
3486-
ReorderBufferLargestTopTXN(ReorderBuffer*rb)
3510+
ReorderBufferLargestStreamableTopTXN(ReorderBuffer*rb)
34873511
{
34883512
dlist_iteriter;
34893513
Sizelargest_size=0;
@@ -3502,7 +3526,8 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb)
35023526
Assert(txn->base_snapshot!=NULL);
35033527

35043528
if ((largest==NULL||txn->total_size>largest_size)&&
3505-
(txn->total_size>0)&& !(rbtxn_has_partial_change(txn)))
3529+
(txn->total_size>0)&& !(rbtxn_has_partial_change(txn))&&
3530+
rbtxn_has_streamable_change(txn))
35063531
{
35073532
largest=txn;
35083533
largest_size=txn->total_size;
@@ -3547,7 +3572,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
35473572
* memory by streaming, if possible. Otherwise, spill to disk.
35483573
*/
35493574
if (ReorderBufferCanStartStreaming(rb)&&
3550-
(txn=ReorderBufferLargestTopTXN(rb))!=NULL)
3575+
(txn=ReorderBufferLargestStreamableTopTXN(rb))!=NULL)
35513576
{
35523577
/* we know there has to be one, because the size is not zero */
35533578
Assert(txn&& !txn->toptxn);
@@ -3919,7 +3944,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
39193944
* restarting.
39203945
*/
39213946
if (ReorderBufferCanStream(rb)&&
3922-
!SnapBuildXactNeedsSkip(builder,ctx->reader->EndRecPtr))
3947+
!SnapBuildXactNeedsSkip(builder,ctx->reader->ReadRecPtr))
39233948
return true;
39243949

39253950
return false;

‎src/include/replication/reorderbuffer.h

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,15 @@ typedef struct ReorderBufferChange
168168
}ReorderBufferChange;
169169

170170
/* ReorderBufferTXN txn_flags */
171-
#defineRBTXN_HAS_CATALOG_CHANGES 0x0001
172-
#defineRBTXN_IS_SUBXACT 0x0002
173-
#defineRBTXN_IS_SERIALIZED 0x0004
174-
#defineRBTXN_IS_SERIALIZED_CLEAR 0x0008
175-
#defineRBTXN_IS_STREAMED 0x0010
176-
#defineRBTXN_HAS_PARTIAL_CHANGE 0x0020
177-
#defineRBTXN_PREPARE 0x0040
178-
#defineRBTXN_SKIPPED_PREPARE 0x0080
171+
#defineRBTXN_HAS_CATALOG_CHANGES 0x0001
172+
#defineRBTXN_IS_SUBXACT 0x0002
173+
#defineRBTXN_IS_SERIALIZED 0x0004
174+
#defineRBTXN_IS_SERIALIZED_CLEAR 0x0008
175+
#defineRBTXN_IS_STREAMED 0x0010
176+
#defineRBTXN_HAS_PARTIAL_CHANGE 0x0020
177+
#defineRBTXN_PREPARE 0x0040
178+
#defineRBTXN_SKIPPED_PREPARE 0x0080
179+
#defineRBTXN_HAS_STREAMABLE_CHANGE0x0100
179180

180181
/* Does the transaction have catalog changes? */
181182
#definerbtxn_has_catalog_changes(txn) \
@@ -207,6 +208,12 @@ typedef struct ReorderBufferChange
207208
((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
208209
)
209210

211+
/* Does this transaction contain streamable changes? */
212+
#definerbtxn_has_streamable_change(txn) \
213+
( \
214+
((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
215+
)
216+
210217
/*
211218
* Has this transaction been streamed to downstream?
212219
*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp