Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0fedb4e

Browse files
committed
Fix walsender timeouts when decoding a large transaction
The logical slots have a fast code path for sending data so as not toimpose too high a per message overhead. The fast path skips checks forinterrupts and timeouts. However, the existing coding failed to considerthe fact that a transaction with a large number of changes may take avery long time to be processed and sent to the client. This causes thewalsender to ignore interrupts for potentially a long time and moreimportantly it will result in the walsender being killed due totimeout at the end of such a transaction.This commit changes the fast path to also check for interrupts and onlyallows calling the fast path when the last keepalive check happened lessthan half the walsender timeout ago. Otherwise the slower code path willbe taken.Backpatched to 9.4Petr Jelinek, reviewed by Kyotaro HORIGUCHI, Yura Sokolov, CraigRinger and Robert Haas.Discussion:https://postgr.es/m/e082a56a-fd95-a250-3bae-0fff93832510@2ndquadrant.com
1 parent1fcd0ad commit0fedb4e

File tree

1 file changed

+37
-29
lines changed

1 file changed

+37
-29
lines changed

‎src/backend/replication/walsender.c

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,6 +1151,8 @@ static void
11511151
WalSndWriteData(LogicalDecodingContext*ctx,XLogRecPtrlsn,TransactionIdxid,
11521152
boollast_write)
11531153
{
1154+
TimestampTznow;
1155+
11541156
/* output previously gathered data in a CopyData packet */
11551157
pq_putmessage_noblock('d',ctx->out->data,ctx->out->len);
11561158

@@ -1160,23 +1162,54 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
11601162
* several releases by streaming physical replication.
11611163
*/
11621164
resetStringInfo(&tmpbuf);
1163-
pq_sendint64(&tmpbuf,GetCurrentTimestamp());
1165+
now=GetCurrentTimestamp();
1166+
pq_sendint64(&tmpbuf,now);
11641167
memcpy(&ctx->out->data[1+sizeof(int64)+sizeof(int64)],
11651168
tmpbuf.data,sizeof(int64));
11661169

1167-
/* fast path */
1170+
CHECK_FOR_INTERRUPTS();
1171+
11681172
/* Try to flush pending output to the client */
11691173
if (pq_flush_if_writable()!=0)
11701174
WalSndShutdown();
11711175

1172-
if (!pq_is_send_pending())
1176+
/* Try taking fast path unless we get too close to walsender timeout. */
1177+
if (now<TimestampTzPlusMilliseconds(last_reply_timestamp,
1178+
wal_sender_timeout /2)&&
1179+
!pq_is_send_pending())
1180+
{
11731181
return;
1182+
}
11741183

1184+
/* If we have pending write here, go to slow path */
11751185
for (;;)
11761186
{
11771187
intwakeEvents;
11781188
longsleeptime;
1179-
TimestampTznow;
1189+
1190+
/* Check for input from the client */
1191+
ProcessRepliesIfAny();
1192+
1193+
now=GetCurrentTimestamp();
1194+
1195+
/* die if timeout was reached */
1196+
WalSndCheckTimeOut(now);
1197+
1198+
/* Send keepalive if the time has come */
1199+
WalSndKeepaliveIfNecessary(now);
1200+
1201+
if (!pq_is_send_pending())
1202+
break;
1203+
1204+
sleeptime=WalSndComputeSleeptime(now);
1205+
1206+
wakeEvents=WL_LATCH_SET |WL_POSTMASTER_DEATH |
1207+
WL_SOCKET_WRITEABLE |WL_SOCKET_READABLE |WL_TIMEOUT;
1208+
1209+
/* Sleep until something happens or we time out */
1210+
WaitLatchOrSocket(MyLatch,wakeEvents,
1211+
MyProcPort->sock,sleeptime,
1212+
WAIT_EVENT_WAL_SENDER_WRITE_DATA);
11801213

11811214
/*
11821215
* Emergency bailout if postmaster has died. This is to avoid the
@@ -1198,34 +1231,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
11981231
SyncRepInitConfig();
11991232
}
12001233

1201-
/* Check for input from the client */
1202-
ProcessRepliesIfAny();
1203-
12041234
/* Try to flush pending output to the client */
12051235
if (pq_flush_if_writable()!=0)
12061236
WalSndShutdown();
1207-
1208-
/* If we finished clearing the buffered data, we're done here. */
1209-
if (!pq_is_send_pending())
1210-
break;
1211-
1212-
now=GetCurrentTimestamp();
1213-
1214-
/* die if timeout was reached */
1215-
WalSndCheckTimeOut(now);
1216-
1217-
/* Send keepalive if the time has come */
1218-
WalSndKeepaliveIfNecessary(now);
1219-
1220-
sleeptime=WalSndComputeSleeptime(now);
1221-
1222-
wakeEvents=WL_LATCH_SET |WL_POSTMASTER_DEATH |
1223-
WL_SOCKET_WRITEABLE |WL_SOCKET_READABLE |WL_TIMEOUT;
1224-
1225-
/* Sleep until something happens or we time out */
1226-
WaitLatchOrSocket(MyLatch,wakeEvents,
1227-
MyProcPort->sock,sleeptime,
1228-
WAIT_EVENT_WAL_SENDER_WRITE_DATA);
12291237
}
12301238

12311239
/* reactivate latch so WalSndLoop knows to continue */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp