Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf832b50

Browse files
author
Amit Kapila
committed
Fix the logical replication timeout during large transactions.
The problem is that we don't send keep-alive messages for a long timewhile processing large transactions during logical replication where wedon't send any data of such transactions. This can happen when the tablemodified in the transaction is not published or because all the changesgot filtered. We do try to send the keep_alive if necessary at the end ofthe transaction (via WalSndWriteData()) but by that time thesubscriber-side can timeout and exit.To fix this we try to send the keepalive message if required afterprocessing certain threshold of changes.Reported-by: Fabrice ChapuisAuthor: Wang wei and Amit KapilaReviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato KurodaBackpatch-through: 10Discussion:https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
1 parentde6eec1 commitf832b50

File tree

4 files changed

+85
-6
lines changed

4 files changed

+85
-6
lines changed

‎src/backend/replication/logical/logical.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
624624

625625
/* set output state */
626626
ctx->accept_writes= false;
627+
ctx->end_xact= false;
627628

628629
/* do the actual work: call callback */
629630
ctx->callbacks.startup_cb(ctx,opt,is_init);
@@ -651,6 +652,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
651652

652653
/* set output state */
653654
ctx->accept_writes= false;
655+
ctx->end_xact= false;
654656

655657
/* do the actual work: call callback */
656658
ctx->callbacks.shutdown_cb(ctx);
@@ -686,6 +688,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
686688
ctx->accept_writes= true;
687689
ctx->write_xid=txn->xid;
688690
ctx->write_location=txn->first_lsn;
691+
ctx->end_xact= false;
689692

690693
/* do the actual work: call callback */
691694
ctx->callbacks.begin_cb(ctx,txn);
@@ -717,6 +720,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
717720
ctx->accept_writes= true;
718721
ctx->write_xid=txn->xid;
719722
ctx->write_location=txn->end_lsn;/* points to the end of the record */
723+
ctx->end_xact= true;
720724

721725
/* do the actual work: call callback */
722726
ctx->callbacks.commit_cb(ctx,txn,commit_lsn);
@@ -756,6 +760,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
756760
*/
757761
ctx->write_location=change->lsn;
758762

763+
ctx->end_xact= false;
764+
759765
ctx->callbacks.change_cb(ctx,txn,relation,change);
760766

761767
/* Pop the error context stack */
@@ -796,6 +802,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
796802
*/
797803
ctx->write_location=change->lsn;
798804

805+
ctx->end_xact= false;
806+
799807
ctx->callbacks.truncate_cb(ctx,txn,nrelations,relations,change);
800808

801809
/* Pop the error context stack */
@@ -822,6 +830,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
822830

823831
/* set output state */
824832
ctx->accept_writes= false;
833+
ctx->end_xact= false;
825834

826835
/* do the actual work: call callback */
827836
ret=ctx->callbacks.filter_by_origin_cb(ctx,origin_id);
@@ -859,6 +868,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
859868
ctx->accept_writes= true;
860869
ctx->write_xid=txn!=NULL ?txn->xid :InvalidTransactionId;
861870
ctx->write_location=message_lsn;
871+
ctx->end_xact= false;
862872

863873
/* do the actual work: call callback */
864874
ctx->callbacks.message_cb(ctx,txn,message_lsn,transactional,prefix,

‎src/backend/replication/pgoutput/pgoutput.c

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ static bool publications_valid;
5050
staticList*LoadPublications(List*pubnames);
5151
staticvoidpublication_invalidation_cb(Datumarg,intcacheid,
5252
uint32hashvalue);
53+
staticvoidupdate_replication_progress(LogicalDecodingContext*ctx);
5354

5455
/* Entry in the map used to remember which relation schemas we sent. */
5556
typedefstructRelationSyncEntry
@@ -247,7 +248,7 @@ static void
247248
pgoutput_commit_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
248249
XLogRecPtrcommit_lsn)
249250
{
250-
OutputPluginUpdateProgress(ctx);
251+
update_replication_progress(ctx);
251252

252253
OutputPluginPrepareWrite(ctx, true);
253254
logicalrep_write_commit(ctx->out,txn,commit_lsn);
@@ -309,6 +310,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
309310
MemoryContextold;
310311
RelationSyncEntry*relentry;
311312

313+
update_replication_progress(ctx);
314+
312315
if (!is_publishable_relation(relation))
313316
return;
314317

@@ -389,6 +392,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
389392
intnrelids;
390393
Oid*relids;
391394

395+
update_replication_progress(ctx);
396+
392397
old=MemoryContextSwitchTo(data->context);
393398

394399
relids=palloc0(nrelations*sizeof(Oid));
@@ -660,3 +665,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
660665
while ((entry= (RelationSyncEntry*)hash_seq_search(&status))!=NULL)
661666
entry->replicate_valid= false;
662667
}
668+
669+
/*
670+
* Try to update progress and send a keepalive message if too many changes were
671+
* processed.
672+
*
673+
* For a large transaction, if we don't send any change to the downstream for a
674+
* long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
675+
* This can happen when all or most of the changes are not published.
676+
*/
677+
staticvoid
678+
update_replication_progress(LogicalDecodingContext*ctx)
679+
{
680+
staticintchanges_count=0;
681+
682+
/*
683+
* We don't want to try sending a keepalive message after processing each
684+
* change as that can have overhead. Tests revealed that there is no
685+
* noticeable overhead in doing it after continuously processing 100 or so
686+
* changes.
687+
*/
688+
#defineCHANGES_THRESHOLD 100
689+
690+
/*
691+
* If we are at the end of transaction LSN, update progress tracking.
692+
* Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
693+
* try to send a keepalive message if required.
694+
*/
695+
if (ctx->end_xact||++changes_count >=CHANGES_THRESHOLD)
696+
{
697+
OutputPluginUpdateProgress(ctx);
698+
changes_count=0;
699+
}
700+
}

‎src/backend/replication/walsender.c

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ static void ProcessStandbyMessage(void);
244244
staticvoidProcessStandbyReplyMessage(void);
245245
staticvoidProcessStandbyHSFeedbackMessage(void);
246246
staticvoidProcessRepliesIfAny(void);
247+
staticvoidProcessPendingWrites(void);
247248
staticvoidWalSndKeepalive(boolrequestReply);
248249
staticvoidWalSndKeepaliveIfNecessary(void);
249250
staticvoidWalSndCheckTimeOut(void);
@@ -1214,6 +1215,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
12141215
}
12151216

12161217
/* If we have pending write here, go to slow path */
1218+
ProcessPendingWrites();
1219+
}
1220+
1221+
/*
1222+
* Wait until there is no pending write. Also process replies from the other
1223+
* side and check timeouts during that.
1224+
*/
1225+
staticvoid
1226+
ProcessPendingWrites(void)
1227+
{
12171228
for (;;)
12181229
{
12191230
intwakeEvents;
@@ -1273,18 +1284,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
12731284
{
12741285
staticTimestampTzsendTime=0;
12751286
TimestampTznow=GetCurrentTimestamp();
1287+
boolend_xact=ctx->end_xact;
12761288

12771289
/*
12781290
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
12791291
* avoid flooding the lag tracker when we commit frequently.
1292+
*
1293+
* We don't have a mechanism to get the ack for any LSN other than end
1294+
* xact LSN from the downstream. So, we track lag only for end of
1295+
* transaction LSN.
12801296
*/
12811297
#defineWALSND_LOGICAL_LAG_TRACK_INTERVAL_MS1000
1282-
if (!TimestampDifferenceExceeds(sendTime,now,
1283-
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1284-
return;
1298+
if (end_xact&&TimestampDifferenceExceeds(sendTime,now,
1299+
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1300+
{
1301+
LagTrackerWrite(lsn,now);
1302+
sendTime=now;
1303+
}
12851304

1286-
LagTrackerWrite(lsn,now);
1287-
sendTime=now;
1305+
/*
1306+
* Try to send a keepalive if required. We don't need to try sending keep
1307+
* alive messages at the transaction end as that will be done at a later
1308+
* point in time. This is required only for large transactions where we
1309+
* don't send any changes to the downstream and the receiver can timeout
1310+
* due to that.
1311+
*/
1312+
if (!end_xact&&
1313+
now >=TimestampTzPlusMilliseconds(last_reply_timestamp,
1314+
wal_sender_timeout /2))
1315+
ProcessPendingWrites();
12881316
}
12891317

12901318
/*

‎src/include/replication/logical.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ typedef struct LogicalDecodingContext
5050
*/
5151
boolfast_forward;
5252

53+
/* Are we processing the end LSN of a transaction? */
54+
boolend_xact;
55+
5356
OutputPluginCallbackscallbacks;
5457
OutputPluginOptionsoptions;
5558

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp