Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit55558df

Browse files
author
Amit Kapila
committed
Fix the logical replication timeout during large transactions.
The problem is that we don't send keep-alive messages for a long timewhile processing large transactions during logical replication where wedon't send any data of such transactions. This can happen when the tablemodified in the transaction is not published or because all the changesgot filtered. We do try to send the keep_alive if necessary at the end ofthe transaction (via WalSndWriteData()) but by that time thesubscriber-side can timeout and exit.To fix this we try to send the keepalive message if required afterprocessing certain threshold of changes.Reported-by: Fabrice ChapuisAuthor: Wang wei and Amit KapilaReviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato KurodaBackpatch-through: 10Discussion:https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
1 parentb9d70ef commit55558df

File tree

4 files changed

+85
-6
lines changed

4 files changed

+85
-6
lines changed

‎src/backend/replication/logical/logical.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
622622

623623
/* set output state */
624624
ctx->accept_writes= false;
625+
ctx->end_xact= false;
625626

626627
/* do the actual work: call callback */
627628
ctx->callbacks.startup_cb(ctx,opt,is_init);
@@ -649,6 +650,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
649650

650651
/* set output state */
651652
ctx->accept_writes= false;
653+
ctx->end_xact= false;
652654

653655
/* do the actual work: call callback */
654656
ctx->callbacks.shutdown_cb(ctx);
@@ -684,6 +686,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
684686
ctx->accept_writes= true;
685687
ctx->write_xid=txn->xid;
686688
ctx->write_location=txn->first_lsn;
689+
ctx->end_xact= false;
687690

688691
/* do the actual work: call callback */
689692
ctx->callbacks.begin_cb(ctx,txn);
@@ -715,6 +718,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
715718
ctx->accept_writes= true;
716719
ctx->write_xid=txn->xid;
717720
ctx->write_location=txn->end_lsn;/* points to the end of the record */
721+
ctx->end_xact= true;
718722

719723
/* do the actual work: call callback */
720724
ctx->callbacks.commit_cb(ctx,txn,commit_lsn);
@@ -754,6 +758,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
754758
*/
755759
ctx->write_location=change->lsn;
756760

761+
ctx->end_xact= false;
762+
757763
ctx->callbacks.change_cb(ctx,txn,relation,change);
758764

759765
/* Pop the error context stack */
@@ -794,6 +800,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
794800
*/
795801
ctx->write_location=change->lsn;
796802

803+
ctx->end_xact= false;
804+
797805
ctx->callbacks.truncate_cb(ctx,txn,nrelations,relations,change);
798806

799807
/* Pop the error context stack */
@@ -820,6 +828,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
820828

821829
/* set output state */
822830
ctx->accept_writes= false;
831+
ctx->end_xact= false;
823832

824833
/* do the actual work: call callback */
825834
ret=ctx->callbacks.filter_by_origin_cb(ctx,origin_id);
@@ -857,6 +866,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
857866
ctx->accept_writes= true;
858867
ctx->write_xid=txn!=NULL ?txn->xid :InvalidTransactionId;
859868
ctx->write_location=message_lsn;
869+
ctx->end_xact= false;
860870

861871
/* do the actual work: call callback */
862872
ctx->callbacks.message_cb(ctx,txn,message_lsn,transactional,prefix,

‎src/backend/replication/pgoutput/pgoutput.c

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ static List *LoadPublications(List *pubnames);
5353
staticvoidpublication_invalidation_cb(Datumarg,intcacheid,
5454
uint32hashvalue);
5555
staticvoidsend_relation_and_attrs(Relationrelation,LogicalDecodingContext*ctx);
56+
staticvoidupdate_replication_progress(LogicalDecodingContext*ctx);
5657

5758
/*
5859
* Entry in the map used to remember which relation schemas we sent.
@@ -277,7 +278,7 @@ static void
277278
pgoutput_commit_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
278279
XLogRecPtrcommit_lsn)
279280
{
280-
OutputPluginUpdateProgress(ctx);
281+
update_replication_progress(ctx);
281282

282283
OutputPluginPrepareWrite(ctx, true);
283284
logicalrep_write_commit(ctx->out,txn,commit_lsn);
@@ -385,6 +386,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
385386
RelationSyncEntry*relentry;
386387
Relationancestor=NULL;
387388

389+
update_replication_progress(ctx);
390+
388391
if (!is_publishable_relation(relation))
389392
return;
390393

@@ -514,6 +517,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
514517
intnrelids;
515518
Oid*relids;
516519

520+
update_replication_progress(ctx);
521+
517522
old=MemoryContextSwitchTo(data->context);
518523

519524
relids=palloc0(nrelations*sizeof(Oid));
@@ -912,3 +917,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
912917
while ((entry= (RelationSyncEntry*)hash_seq_search(&status))!=NULL)
913918
entry->replicate_valid= false;
914919
}
920+
921+
/*
922+
* Try to update progress and send a keepalive message if too many changes were
923+
* processed.
924+
*
925+
* For a large transaction, if we don't send any change to the downstream for a
926+
* long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
927+
* This can happen when all or most of the changes are not published.
928+
*/
929+
staticvoid
930+
update_replication_progress(LogicalDecodingContext*ctx)
931+
{
932+
staticintchanges_count=0;
933+
934+
/*
935+
* We don't want to try sending a keepalive message after processing each
936+
* change as that can have overhead. Tests revealed that there is no
937+
* noticeable overhead in doing it after continuously processing 100 or so
938+
* changes.
939+
*/
940+
#defineCHANGES_THRESHOLD 100
941+
942+
/*
943+
* If we are at the end of transaction LSN, update progress tracking.
944+
* Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
945+
* try to send a keepalive message if required.
946+
*/
947+
if (ctx->end_xact||++changes_count >=CHANGES_THRESHOLD)
948+
{
949+
OutputPluginUpdateProgress(ctx);
950+
changes_count=0;
951+
}
952+
}

‎src/backend/replication/walsender.c

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ static void ProcessStandbyMessage(void);
240240
staticvoidProcessStandbyReplyMessage(void);
241241
staticvoidProcessStandbyHSFeedbackMessage(void);
242242
staticvoidProcessRepliesIfAny(void);
243+
staticvoidProcessPendingWrites(void);
243244
staticvoidWalSndKeepalive(boolrequestReply);
244245
staticvoidWalSndKeepaliveIfNecessary(void);
245246
staticvoidWalSndCheckTimeOut(void);
@@ -1295,6 +1296,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
12951296
}
12961297

12971298
/* If we have pending write here, go to slow path */
1299+
ProcessPendingWrites();
1300+
}
1301+
1302+
/*
1303+
* Wait until there is no pending write. Also process replies from the other
1304+
* side and check timeouts during that.
1305+
*/
1306+
staticvoid
1307+
ProcessPendingWrites(void)
1308+
{
12981309
for (;;)
12991310
{
13001311
intwakeEvents;
@@ -1354,18 +1365,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
13541365
{
13551366
staticTimestampTzsendTime=0;
13561367
TimestampTznow=GetCurrentTimestamp();
1368+
boolend_xact=ctx->end_xact;
13571369

13581370
/*
13591371
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
13601372
* avoid flooding the lag tracker when we commit frequently.
1373+
*
1374+
* We don't have a mechanism to get the ack for any LSN other than end
1375+
* xact LSN from the downstream. So, we track lag only for end of
1376+
* transaction LSN.
13611377
*/
13621378
#defineWALSND_LOGICAL_LAG_TRACK_INTERVAL_MS1000
1363-
if (!TimestampDifferenceExceeds(sendTime,now,
1364-
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1365-
return;
1379+
if (end_xact&&TimestampDifferenceExceeds(sendTime,now,
1380+
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1381+
{
1382+
LagTrackerWrite(lsn,now);
1383+
sendTime=now;
1384+
}
13661385

1367-
LagTrackerWrite(lsn,now);
1368-
sendTime=now;
1386+
/*
1387+
* Try to send a keepalive if required. We don't need to try sending keep
1388+
* alive messages at the transaction end as that will be done at a later
1389+
* point in time. This is required only for large transactions where we
1390+
* don't send any changes to the downstream and the receiver can timeout
1391+
* due to that.
1392+
*/
1393+
if (!end_xact&&
1394+
now >=TimestampTzPlusMilliseconds(last_reply_timestamp,
1395+
wal_sender_timeout /2))
1396+
ProcessPendingWrites();
13691397
}
13701398

13711399
/*

‎src/include/replication/logical.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ typedef struct LogicalDecodingContext
4949
*/
5050
boolfast_forward;
5151

52+
/* Are we processing the end LSN of a transaction? */
53+
boolend_xact;
54+
5255
OutputPluginCallbackscallbacks;
5356
OutputPluginOptionsoptions;
5457

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp