Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd6da71f

Browse files
author
Amit Kapila
committed
Fix the logical replication timeout during large transactions.
The problem is that we don't send keep-alive messages for a long timewhile processing large transactions during logical replication where wedon't send any data of such transactions. This can happen when the tablemodified in the transaction is not published or because all the changesgot filtered. We do try to send the keep_alive if necessary at the end ofthe transaction (via WalSndWriteData()) but by that time thesubscriber-side can timeout and exit.To fix this we try to send the keepalive message if required afterprocessing certain threshold of changes.Reported-by: Fabrice ChapuisAuthor: Wang wei and Amit KapilaReviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato KurodaBackpatch-through: 10Discussion:https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
1 parentca9e9b0 commitd6da71f

File tree

4 files changed

+105
-7
lines changed

4 files changed

+105
-7
lines changed

‎src/backend/replication/logical/logical.c

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
724724

725725
/* set output state */
726726
ctx->accept_writes= false;
727+
ctx->end_xact= false;
727728

728729
/* do the actual work: call callback */
729730
ctx->callbacks.startup_cb(ctx,opt,is_init);
@@ -751,6 +752,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
751752

752753
/* set output state */
753754
ctx->accept_writes= false;
755+
ctx->end_xact= false;
754756

755757
/* do the actual work: call callback */
756758
ctx->callbacks.shutdown_cb(ctx);
@@ -786,6 +788,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
786788
ctx->accept_writes= true;
787789
ctx->write_xid=txn->xid;
788790
ctx->write_location=txn->first_lsn;
791+
ctx->end_xact= false;
789792

790793
/* do the actual work: call callback */
791794
ctx->callbacks.begin_cb(ctx,txn);
@@ -817,6 +820,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
817820
ctx->accept_writes= true;
818821
ctx->write_xid=txn->xid;
819822
ctx->write_location=txn->end_lsn;/* points to the end of the record */
823+
ctx->end_xact= true;
820824

821825
/* do the actual work: call callback */
822826
ctx->callbacks.commit_cb(ctx,txn,commit_lsn);
@@ -857,6 +861,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
857861
ctx->accept_writes= true;
858862
ctx->write_xid=txn->xid;
859863
ctx->write_location=txn->first_lsn;
864+
ctx->end_xact= false;
860865

861866
/*
862867
* If the plugin supports two-phase commits then begin prepare callback is
@@ -901,6 +906,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
901906
ctx->accept_writes= true;
902907
ctx->write_xid=txn->xid;
903908
ctx->write_location=txn->end_lsn;/* points to the end of the record */
909+
ctx->end_xact= true;
904910

905911
/*
906912
* If the plugin supports two-phase commits then prepare callback is
@@ -945,6 +951,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
945951
ctx->accept_writes= true;
946952
ctx->write_xid=txn->xid;
947953
ctx->write_location=txn->end_lsn;/* points to the end of the record */
954+
ctx->end_xact= true;
948955

949956
/*
950957
* If the plugin support two-phase commits then commit prepared callback
@@ -990,6 +997,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
990997
ctx->accept_writes= true;
991998
ctx->write_xid=txn->xid;
992999
ctx->write_location=txn->end_lsn;/* points to the end of the record */
1000+
ctx->end_xact= true;
9931001

9941002
/*
9951003
* If the plugin support two-phase commits then rollback prepared callback
@@ -1040,6 +1048,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
10401048
*/
10411049
ctx->write_location=change->lsn;
10421050

1051+
ctx->end_xact= false;
1052+
10431053
ctx->callbacks.change_cb(ctx,txn,relation,change);
10441054

10451055
/* Pop the error context stack */
@@ -1080,6 +1090,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
10801090
*/
10811091
ctx->write_location=change->lsn;
10821092

1093+
ctx->end_xact= false;
1094+
10831095
ctx->callbacks.truncate_cb(ctx,txn,nrelations,relations,change);
10841096

10851097
/* Pop the error context stack */
@@ -1107,6 +1119,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
11071119

11081120
/* set output state */
11091121
ctx->accept_writes= false;
1122+
ctx->end_xact= false;
11101123

11111124
/* do the actual work: call callback */
11121125
ret=ctx->callbacks.filter_prepare_cb(ctx,xid,gid);
@@ -1137,6 +1150,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
11371150

11381151
/* set output state */
11391152
ctx->accept_writes= false;
1153+
ctx->end_xact= false;
11401154

11411155
/* do the actual work: call callback */
11421156
ret=ctx->callbacks.filter_by_origin_cb(ctx,origin_id);
@@ -1174,6 +1188,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
11741188
ctx->accept_writes= true;
11751189
ctx->write_xid=txn!=NULL ?txn->xid :InvalidTransactionId;
11761190
ctx->write_location=message_lsn;
1191+
ctx->end_xact= false;
11771192

11781193
/* do the actual work: call callback */
11791194
ctx->callbacks.message_cb(ctx,txn,message_lsn,transactional,prefix,
@@ -1217,6 +1232,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
12171232
*/
12181233
ctx->write_location=first_lsn;
12191234

1235+
ctx->end_xact= false;
1236+
12201237
/* in streaming mode, stream_start_cb is required */
12211238
if (ctx->callbacks.stream_start_cb==NULL)
12221239
ereport(ERROR,
@@ -1264,6 +1281,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
12641281
*/
12651282
ctx->write_location=last_lsn;
12661283

1284+
ctx->end_xact= false;
1285+
12671286
/* in streaming mode, stream_stop_cb is required */
12681287
if (ctx->callbacks.stream_stop_cb==NULL)
12691288
ereport(ERROR,
@@ -1303,6 +1322,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
13031322
ctx->accept_writes= true;
13041323
ctx->write_xid=txn->xid;
13051324
ctx->write_location=abort_lsn;
1325+
ctx->end_xact= true;
13061326

13071327
/* in streaming mode, stream_abort_cb is required */
13081328
if (ctx->callbacks.stream_abort_cb==NULL)
@@ -1347,6 +1367,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
13471367
ctx->accept_writes= true;
13481368
ctx->write_xid=txn->xid;
13491369
ctx->write_location=txn->end_lsn;
1370+
ctx->end_xact= true;
13501371

13511372
/* in streaming mode with two-phase commits, stream_prepare_cb is required */
13521373
if (ctx->callbacks.stream_prepare_cb==NULL)
@@ -1387,6 +1408,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
13871408
ctx->accept_writes= true;
13881409
ctx->write_xid=txn->xid;
13891410
ctx->write_location=txn->end_lsn;
1411+
ctx->end_xact= true;
13901412

13911413
/* in streaming mode, stream_commit_cb is required */
13921414
if (ctx->callbacks.stream_commit_cb==NULL)
@@ -1435,6 +1457,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
14351457
*/
14361458
ctx->write_location=change->lsn;
14371459

1460+
ctx->end_xact= false;
1461+
14381462
/* in streaming mode, stream_change_cb is required */
14391463
if (ctx->callbacks.stream_change_cb==NULL)
14401464
ereport(ERROR,
@@ -1479,6 +1503,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
14791503
ctx->accept_writes= true;
14801504
ctx->write_xid=txn!=NULL ?txn->xid :InvalidTransactionId;
14811505
ctx->write_location=message_lsn;
1506+
ctx->end_xact= false;
14821507

14831508
/* do the actual work: call callback */
14841509
ctx->callbacks.stream_message_cb(ctx,txn,message_lsn,transactional,prefix,
@@ -1527,6 +1552,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
15271552
*/
15281553
ctx->write_location=change->lsn;
15291554

1555+
ctx->end_xact= false;
1556+
15301557
ctx->callbacks.stream_truncate_cb(ctx,txn,nrelations,relations,change);
15311558

15321559
/* Pop the error context stack */

‎src/backend/replication/pgoutput/pgoutput.c

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
7070
uint32hashvalue);
7171
staticvoidsend_relation_and_attrs(Relationrelation,TransactionIdxid,
7272
LogicalDecodingContext*ctx);
73+
staticvoidupdate_replication_progress(LogicalDecodingContext*ctx);
7374

7475
/*
7576
* Entry in the map used to remember which relation schemas we sent.
@@ -381,7 +382,7 @@ static void
381382
pgoutput_commit_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
382383
XLogRecPtrcommit_lsn)
383384
{
384-
OutputPluginUpdateProgress(ctx);
385+
update_replication_progress(ctx);
385386

386387
OutputPluginPrepareWrite(ctx, true);
387388
logicalrep_write_commit(ctx->out,txn,commit_lsn);
@@ -535,6 +536,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
535536
TransactionIdxid=InvalidTransactionId;
536537
Relationancestor=NULL;
537538

539+
update_replication_progress(ctx);
540+
538541
if (!is_publishable_relation(relation))
539542
return;
540543

@@ -677,6 +680,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
677680
Oid*relids;
678681
TransactionIdxid=InvalidTransactionId;
679682

683+
update_replication_progress(ctx);
684+
680685
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
681686
if (in_streaming)
682687
xid=change->txn->xid;
@@ -735,6 +740,8 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
735740
PGOutputData*data= (PGOutputData*)ctx->output_plugin_private;
736741
TransactionIdxid=InvalidTransactionId;
737742

743+
update_replication_progress(ctx);
744+
738745
if (!data->messages)
739746
return;
740747

@@ -921,7 +928,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
921928
Assert(!in_streaming);
922929
Assert(rbtxn_is_streamed(txn));
923930

924-
OutputPluginUpdateProgress(ctx);
931+
update_replication_progress(ctx);
925932

926933
OutputPluginPrepareWrite(ctx, true);
927934
logicalrep_write_stream_commit(ctx->out,txn,commit_lsn);
@@ -1304,3 +1311,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
13041311
entry->pubactions.pubtruncate= false;
13051312
}
13061313
}
1314+
1315+
/*
1316+
* Try to update progress and send a keepalive message if too many changes were
1317+
* processed.
1318+
*
1319+
* For a large transaction, if we don't send any change to the downstream for a
1320+
* long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
1321+
* This can happen when all or most of the changes are not published.
1322+
*/
1323+
staticvoid
1324+
update_replication_progress(LogicalDecodingContext*ctx)
1325+
{
1326+
staticintchanges_count=0;
1327+
1328+
/*
1329+
* We don't want to try sending a keepalive message after processing each
1330+
* change as that can have overhead. Tests revealed that there is no
1331+
* noticeable overhead in doing it after continuously processing 100 or so
1332+
* changes.
1333+
*/
1334+
#defineCHANGES_THRESHOLD 100
1335+
1336+
/*
1337+
* If we are at the end of transaction LSN, update progress tracking.
1338+
* Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
1339+
* try to send a keepalive message if required.
1340+
*/
1341+
if (ctx->end_xact||++changes_count >=CHANGES_THRESHOLD)
1342+
{
1343+
OutputPluginUpdateProgress(ctx);
1344+
changes_count=0;
1345+
}
1346+
}

‎src/backend/replication/walsender.c

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ static void ProcessStandbyMessage(void);
240240
staticvoidProcessStandbyReplyMessage(void);
241241
staticvoidProcessStandbyHSFeedbackMessage(void);
242242
staticvoidProcessRepliesIfAny(void);
243+
staticvoidProcessPendingWrites(void);
243244
staticvoidWalSndKeepalive(boolrequestReply);
244245
staticvoidWalSndKeepaliveIfNecessary(void);
245246
staticvoidWalSndCheckTimeOut(void);
@@ -1288,6 +1289,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
12881289
}
12891290

12901291
/* If we have pending write here, go to slow path */
1292+
ProcessPendingWrites();
1293+
}
1294+
1295+
/*
1296+
* Wait until there is no pending write. Also process replies from the other
1297+
* side and check timeouts during that.
1298+
*/
1299+
staticvoid
1300+
ProcessPendingWrites(void)
1301+
{
12911302
for (;;)
12921303
{
12931304
longsleeptime;
@@ -1342,18 +1353,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
13421353
{
13431354
staticTimestampTzsendTime=0;
13441355
TimestampTznow=GetCurrentTimestamp();
1356+
boolend_xact=ctx->end_xact;
13451357

13461358
/*
13471359
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
13481360
* avoid flooding the lag tracker when we commit frequently.
1361+
*
1362+
* We don't have a mechanism to get the ack for any LSN other than end
1363+
* xact LSN from the downstream. So, we track lag only for end of
1364+
* transaction LSN.
13491365
*/
13501366
#defineWALSND_LOGICAL_LAG_TRACK_INTERVAL_MS1000
1351-
if (!TimestampDifferenceExceeds(sendTime,now,
1352-
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1353-
return;
1367+
if (end_xact&&TimestampDifferenceExceeds(sendTime,now,
1368+
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1369+
{
1370+
LagTrackerWrite(lsn,now);
1371+
sendTime=now;
1372+
}
13541373

1355-
LagTrackerWrite(lsn,now);
1356-
sendTime=now;
1374+
/*
1375+
* Try to send a keepalive if required. We don't need to try sending keep
1376+
* alive messages at the transaction end as that will be done at a later
1377+
* point in time. This is required only for large transactions where we
1378+
* don't send any changes to the downstream and the receiver can timeout
1379+
* due to that.
1380+
*/
1381+
if (!end_xact&&
1382+
now >=TimestampTzPlusMilliseconds(last_reply_timestamp,
1383+
wal_sender_timeout /2))
1384+
ProcessPendingWrites();
13571385
}
13581386

13591387
/*

‎src/include/replication/logical.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ typedef struct LogicalDecodingContext
4949
*/
5050
boolfast_forward;
5151

52+
/* Are we processing the end LSN of a transaction? */
53+
boolend_xact;
54+
5255
OutputPluginCallbackscallbacks;
5356
OutputPluginOptionsoptions;
5457

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp