Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc28e0b1

Browse files
committed
Fix walsender timeouts when decoding a large transaction
The logical slots have a fast code path for sending data so as not toimpose too high a per message overhead. The fast path skips checks forinterrupts and timeouts. However, the existing coding failed to considerthe fact that a transaction with a large number of changes may take avery long time to be processed and sent to the client. This causes thewalsender to ignore interrupts for potentially a long time and moreimportantly it will result in the walsender being killed due totimeout at the end of such a transaction.This commit changes the fast path to also check for interrupts and onlyallows calling the fast path when the last keepalive check happened lessthan half the walsender timeout ago. Otherwise the slower code path willbe taken.Backpatched to 9.4Petr Jelinek, reviewed by Kyotaro HORIGUCHI, Yura Sokolov, CraigRinger and Robert Haas.Discussion:https://postgr.es/m/e082a56a-fd95-a250-3bae-0fff93832510@2ndquadrant.com
1 parentc26d46f commitc28e0b1

File tree

1 file changed

+38
-28
lines changed

1 file changed

+38
-28
lines changed

‎src/backend/replication/walsender.c

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,6 +1076,9 @@ static void
10761076
WalSndWriteData(LogicalDecodingContext*ctx,XLogRecPtrlsn,TransactionIdxid,
10771077
boollast_write)
10781078
{
1079+
TimestampTznow;
1080+
int64now_int;
1081+
10791082
/* output previously gathered data in a CopyData packet */
10801083
pq_putmessage_noblock('d',ctx->out->data,ctx->out->len);
10811084

@@ -1085,23 +1088,54 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
10851088
* several releases by streaming physical replication.
10861089
*/
10871090
resetStringInfo(&tmpbuf);
1088-
pq_sendint64(&tmpbuf,GetCurrentIntegerTimestamp());
1091+
now_int=GetCurrentIntegerTimestamp();
1092+
now=IntegerTimestampToTimestampTz(now_int);
1093+
pq_sendint64(&tmpbuf,now_int);
10891094
memcpy(&ctx->out->data[1+sizeof(int64)+sizeof(int64)],
10901095
tmpbuf.data,sizeof(int64));
10911096

1092-
/* fast path */
1097+
CHECK_FOR_INTERRUPTS();
1098+
10931099
/* Try to flush pending output to the client */
10941100
if (pq_flush_if_writable()!=0)
10951101
WalSndShutdown();
10961102

1097-
if (!pq_is_send_pending())
1103+
/* Try taking fast path unless we get too close to walsender timeout. */
1104+
if (now<TimestampTzPlusMilliseconds(last_reply_timestamp,
1105+
wal_sender_timeout /2)&&
1106+
!pq_is_send_pending())
1107+
{
10981108
return;
1109+
}
10991110

1111+
/* If we have pending write here, go to slow path */
11001112
for (;;)
11011113
{
11021114
intwakeEvents;
11031115
longsleeptime;
1104-
TimestampTznow;
1116+
1117+
/* Check for input from the client */
1118+
ProcessRepliesIfAny();
1119+
1120+
now=GetCurrentTimestamp();
1121+
1122+
/* die if timeout was reached */
1123+
WalSndCheckTimeOut(now);
1124+
1125+
/* Send keepalive if the time has come */
1126+
WalSndKeepaliveIfNecessary(now);
1127+
1128+
if (!pq_is_send_pending())
1129+
break;
1130+
1131+
sleeptime=WalSndComputeSleeptime(now);
1132+
1133+
wakeEvents=WL_LATCH_SET |WL_POSTMASTER_DEATH |
1134+
WL_SOCKET_WRITEABLE |WL_SOCKET_READABLE |WL_TIMEOUT;
1135+
1136+
/* Sleep until something happens or we time out */
1137+
WaitLatchOrSocket(MyLatch,wakeEvents,
1138+
MyProcPort->sock,sleeptime);
11051139

11061140
/*
11071141
* Emergency bailout if postmaster has died. This is to avoid the
@@ -1123,33 +1157,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
11231157
SyncRepInitConfig();
11241158
}
11251159

1126-
/* Check for input from the client */
1127-
ProcessRepliesIfAny();
1128-
11291160
/* Try to flush pending output to the client */
11301161
if (pq_flush_if_writable()!=0)
11311162
WalSndShutdown();
1132-
1133-
/* If we finished clearing the buffered data, we're done here. */
1134-
if (!pq_is_send_pending())
1135-
break;
1136-
1137-
now=GetCurrentTimestamp();
1138-
1139-
/* die if timeout was reached */
1140-
WalSndCheckTimeOut(now);
1141-
1142-
/* Send keepalive if the time has come */
1143-
WalSndKeepaliveIfNecessary(now);
1144-
1145-
sleeptime=WalSndComputeSleeptime(now);
1146-
1147-
wakeEvents=WL_LATCH_SET |WL_POSTMASTER_DEATH |
1148-
WL_SOCKET_WRITEABLE |WL_SOCKET_READABLE |WL_TIMEOUT;
1149-
1150-
/* Sleep until something happens or we time out */
1151-
WaitLatchOrSocket(MyLatch,wakeEvents,
1152-
MyProcPort->sock,sleeptime);
11531163
}
11541164

11551165
/* reactivate latch so WalSndLoop knows to continue */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp