Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8705626

Browse files
committed
Fix walsender timeouts when decoding a large transaction
The logical slots have a fast code path for sending data so as not toimpose too high a per message overhead. The fast path skips checks forinterrupts and timeouts. However, the existing coding failed to considerthe fact that a transaction with a large number of changes may take avery long time to be processed and sent to the client. This causes thewalsender to ignore interrupts for potentially a long time and moreimportantly it will result in the walsender being killed due totimeout at the end of such a transaction.This commit changes the fast path to also check for interrupts and onlyallows calling the fast path when the last keepalive check happened lessthan half the walsender timeout ago. Otherwise the slower code path willbe taken.Backpatched to 9.4Petr Jelinek, reviewed by Kyotaro HORIGUCHI, Yura Sokolov, CraigRinger and Robert Haas.Discussion:https://postgr.es/m/e082a56a-fd95-a250-3bae-0fff93832510@2ndquadrant.com
1 parent05f239e commit8705626

File tree

1 file changed

+38
-28
lines changed

1 file changed

+38
-28
lines changed

‎src/backend/replication/walsender.c‎

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,6 +1068,9 @@ static void
10681068
WalSndWriteData(LogicalDecodingContext*ctx,XLogRecPtrlsn,TransactionIdxid,
10691069
boollast_write)
10701070
{
1071+
TimestampTznow;
1072+
int64now_int;
1073+
10711074
/* output previously gathered data in a CopyData packet */
10721075
pq_putmessage_noblock('d',ctx->out->data,ctx->out->len);
10731076

@@ -1077,23 +1080,54 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
10771080
* several releases by streaming physical replication.
10781081
*/
10791082
resetStringInfo(&tmpbuf);
1080-
pq_sendint64(&tmpbuf,GetCurrentIntegerTimestamp());
1083+
now_int=GetCurrentIntegerTimestamp();
1084+
now=IntegerTimestampToTimestampTz(now_int);
1085+
pq_sendint64(&tmpbuf,now_int);
10811086
memcpy(&ctx->out->data[1+sizeof(int64)+sizeof(int64)],
10821087
tmpbuf.data,sizeof(int64));
10831088

1084-
/* fast path */
1089+
CHECK_FOR_INTERRUPTS();
1090+
10851091
/* Try to flush pending output to the client */
10861092
if (pq_flush_if_writable()!=0)
10871093
WalSndShutdown();
10881094

1089-
if (!pq_is_send_pending())
1095+
/* Try taking fast path unless we get too close to walsender timeout. */
1096+
if (now<TimestampTzPlusMilliseconds(last_reply_timestamp,
1097+
wal_sender_timeout /2)&&
1098+
!pq_is_send_pending())
1099+
{
10901100
return;
1101+
}
10911102

1103+
/* If we have pending write here, go to slow path */
10921104
for (;;)
10931105
{
10941106
intwakeEvents;
10951107
longsleeptime;
1096-
TimestampTznow;
1108+
1109+
/* Check for input from the client */
1110+
ProcessRepliesIfAny();
1111+
1112+
now=GetCurrentTimestamp();
1113+
1114+
/* die if timeout was reached */
1115+
WalSndCheckTimeOut(now);
1116+
1117+
/* Send keepalive if the time has come */
1118+
WalSndKeepaliveIfNecessary(now);
1119+
1120+
if (!pq_is_send_pending())
1121+
break;
1122+
1123+
sleeptime=WalSndComputeSleeptime(now);
1124+
1125+
wakeEvents=WL_LATCH_SET |WL_POSTMASTER_DEATH |
1126+
WL_SOCKET_WRITEABLE |WL_SOCKET_READABLE |WL_TIMEOUT;
1127+
1128+
/* Sleep until something happens or we time out */
1129+
WaitLatchOrSocket(MyLatch,wakeEvents,
1130+
MyProcPort->sock,sleeptime);
10971131

10981132
/*
10991133
* Emergency bailout if postmaster has died. This is to avoid the
@@ -1115,33 +1149,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
11151149
SyncRepInitConfig();
11161150
}
11171151

1118-
/* Check for input from the client */
1119-
ProcessRepliesIfAny();
1120-
11211152
/* Try to flush pending output to the client */
11221153
if (pq_flush_if_writable()!=0)
11231154
WalSndShutdown();
1124-
1125-
/* If we finished clearing the buffered data, we're done here. */
1126-
if (!pq_is_send_pending())
1127-
break;
1128-
1129-
now=GetCurrentTimestamp();
1130-
1131-
/* die if timeout was reached */
1132-
WalSndCheckTimeOut(now);
1133-
1134-
/* Send keepalive if the time has come */
1135-
WalSndKeepaliveIfNecessary(now);
1136-
1137-
sleeptime=WalSndComputeSleeptime(now);
1138-
1139-
wakeEvents=WL_LATCH_SET |WL_POSTMASTER_DEATH |
1140-
WL_SOCKET_WRITEABLE |WL_SOCKET_READABLE |WL_TIMEOUT;
1141-
1142-
/* Sleep until something happens or we time out */
1143-
WaitLatchOrSocket(MyLatch,wakeEvents,
1144-
MyProcPort->sock,sleeptime);
11451155
}
11461156

11471157
/* reactivate latch so WalSndLoop knows to continue */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp