Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitea55160

Browse files
committed
In walsender, don't sleep if there's outstanding WAL waiting to be sent,
otherwise we effectively rate-limit the streaming as pointed out bySimon Riggs. Also, send the WAL in smaller chunks, to respond to signalsmore promptly.
1 parent4ed4b6c commitea55160

File tree

1 file changed

+96
-85
lines changed

1 file changed

+96
-85
lines changed

‎src/backend/replication/walsender.c

Lines changed: 96 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
*
3131
*
3232
* IDENTIFICATION
33-
* $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.20 2010/05/09 18:11:55 tgl Exp $
33+
* $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.21 2010/05/26 22:21:33 heikki Exp $
3434
*
3535
*-------------------------------------------------------------------------
3636
*/
@@ -100,13 +100,19 @@ static void InitWalSnd(void);
100100
staticvoidWalSndHandshake(void);
101101
staticvoidWalSndKill(intcode,Datumarg);
102102
staticvoidXLogRead(char*buf,XLogRecPtrrecptr,Sizenbytes);
103-
staticboolXLogSend(StringInfooutMsg);
103+
staticboolXLogSend(StringInfooutMsg,bool*caughtup);
104104
staticvoidCheckClosedConnection(void);
105105

106106
/*
107107
* How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
108+
*
109+
* We don't have a good idea of what a good value would be; there's some
110+
* overhead per message in both walsender and walreceiver, but on the other
111+
* hand sending large batches makes walsender less responsive to signals
112+
* because signals are checked only between messages. 128kB seems like
113+
* a reasonable guess for now.
108114
*/
109-
#defineMAX_SEND_SIZE (XLOG_SEG_SIZE / 2)
115+
#defineMAX_SEND_SIZE (128 * 1024)
110116

111117
/* Main entry point for walsender process */
112118
int
@@ -360,6 +366,7 @@ static int
360366
WalSndLoop(void)
361367
{
362368
StringInfoDataoutput_message;
369+
boolcaughtup= false;
363370

364371
initStringInfo(&output_message);
365372

@@ -387,7 +394,7 @@ WalSndLoop(void)
387394
*/
388395
if (ready_to_stop)
389396
{
390-
XLogSend(&output_message);
397+
XLogSend(&output_message,&caughtup);
391398
shutdown_requested= true;
392399
}
393400

@@ -402,31 +409,32 @@ WalSndLoop(void)
402409
}
403410

404411
/*
405-
* Nap for the configured time or until a message arrives.
412+
* If we had sent all accumulated WAL in last round, nap for the
413+
* configured time before retrying.
406414
*
407415
* On some platforms, signals won't interrupt the sleep. To ensure we
408416
* respond reasonably promptly when someone signals us, break down the
409417
* sleep into NAPTIME_PER_CYCLE increments, and check for
410418
* interrupts after each nap.
411419
*/
412-
remain=WalSndDelay*1000L;
413-
while (remain>0)
420+
if (caughtup)
414421
{
415-
if (got_SIGHUP||shutdown_requested||ready_to_stop)
416-
break;
422+
remain=WalSndDelay*1000L;
423+
while (remain>0)
424+
{
425+
/* Check for interrupts */
426+
if (got_SIGHUP||shutdown_requested||ready_to_stop)
427+
break;
417428

418-
/*
419-
* Check to see whether a message from the standby or an interrupt
420-
* from other processes has arrived.
421-
*/
422-
pg_usleep(remain>NAPTIME_PER_CYCLE ?NAPTIME_PER_CYCLE :remain);
423-
CheckClosedConnection();
429+
/* Sleep and check that the connection is still alive */
430+
pg_usleep(remain>NAPTIME_PER_CYCLE ?NAPTIME_PER_CYCLE :remain);
431+
CheckClosedConnection();
424432

425-
remain-=NAPTIME_PER_CYCLE;
433+
remain-=NAPTIME_PER_CYCLE;
434+
}
426435
}
427-
428436
/* Attempt to send the log once every loop */
429-
if (!XLogSend(&output_message))
437+
if (!XLogSend(&output_message,&caughtup))
430438
gotoeof;
431439
}
432440

@@ -623,15 +631,20 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
623631
}
624632

625633
/*
626-
* Read all WAL that's been written (and flushed) since last cycle, and send
627-
* it to client.
634+
* Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed),
635+
* but not yet sent to the client, and send it. If there is no unsent WAL,
636+
* *caughtup is set to true and nothing is sent, otherwise *caughtup is set
637+
* to false.
628638
*
629639
* Returns true if OK, false if trouble.
630640
*/
631641
staticbool
632-
XLogSend(StringInfooutMsg)
642+
XLogSend(StringInfooutMsg,bool*caughtup)
633643
{
634644
XLogRecPtrSendRqstPtr;
645+
XLogRecPtrstartptr;
646+
XLogRecPtrendptr;
647+
Sizenbytes;
635648
charactivitymsg[50];
636649

637650
/* use volatile pointer to prevent code rearrangement */
@@ -642,84 +655,82 @@ XLogSend(StringInfo outMsg)
642655

643656
/* Quick exit if nothing to do */
644657
if (!XLByteLT(sentPtr,SendRqstPtr))
658+
{
659+
*caughtup= true;
645660
return true;
661+
}
662+
/*
663+
* Otherwise let the caller know that we're not fully caught up. Unless
664+
* there's a huge backlog, we'll be caught up to the current WriteRecPtr
665+
* after we've sent everything below, but more WAL could accumulate while
666+
* we're busy sending.
667+
*/
668+
*caughtup= false;
646669

647670
/*
648-
* We gather multiple records together by issuing just one XLogRead() of a
649-
* suitable size, and send them as one CopyData message. Repeat until
650-
* we've sent everything we can.
671+
* Figure out how much to send in one message. If there's less than
672+
* MAX_SEND_SIZE bytes to send, send everything. Otherwise send
673+
* MAX_SEND_SIZE bytes, but round to page boundary.
674+
*
675+
* The rounding is not only for performance reasons. Walreceiver
676+
* relies on the fact that we never split a WAL record across two
677+
* messages. Since a long WAL record is split at page boundary into
678+
* continuation records, page boundary is always a safe cut-off point.
679+
* We also assume that SendRqstPtr never points in the middle of a WAL
680+
* record.
651681
*/
652-
while (XLByteLT(sentPtr,SendRqstPtr))
682+
startptr=sentPtr;
683+
if (startptr.xrecoff >=XLogFileSize)
653684
{
654-
XLogRecPtrstartptr;
655-
XLogRecPtrendptr;
656-
Sizenbytes;
657-
658685
/*
659-
* Figure out how much to send in one message. If there's less than
660-
* MAX_SEND_SIZE bytes to send, send everything. Otherwise send
661-
* MAX_SEND_SIZE bytes, but round to page boundary.
662-
*
663-
* The rounding is not only for performance reasons. Walreceiver
664-
* relies on the fact that we never split a WAL record across two
665-
* messages. Since a long WAL record is split at page boundary into
666-
* continuation records, page boundary is always a safe cut-off point.
667-
* We also assume that SendRqstPtr never points in the middle of a WAL
668-
* record.
686+
* crossing a logid boundary, skip the non-existent last log
687+
* segment in previous logical log file.
669688
*/
670-
startptr=sentPtr;
671-
if (startptr.xrecoff >=XLogFileSize)
672-
{
673-
/*
674-
* crossing a logid boundary, skip the non-existent last log
675-
* segment in previous logical log file.
676-
*/
677-
startptr.xlogid+=1;
678-
startptr.xrecoff=0;
679-
}
689+
startptr.xlogid+=1;
690+
startptr.xrecoff=0;
691+
}
680692

681-
endptr=startptr;
682-
XLByteAdvance(endptr,MAX_SEND_SIZE);
683-
/* round down to page boundary. */
684-
endptr.xrecoff-= (endptr.xrecoff %XLOG_BLCKSZ);
685-
/* if we went beyond SendRqstPtr, back off */
686-
if (XLByteLT(SendRqstPtr,endptr))
687-
endptr=SendRqstPtr;
693+
endptr=startptr;
694+
XLByteAdvance(endptr,MAX_SEND_SIZE);
695+
/* round down to page boundary. */
696+
endptr.xrecoff-= (endptr.xrecoff %XLOG_BLCKSZ);
697+
/* if we went beyond SendRqstPtr, back off */
698+
if (XLByteLT(SendRqstPtr,endptr))
699+
endptr=SendRqstPtr;
688700

689-
/*
690-
* OK to read and send the slice.
691-
*
692-
* We don't need to convert the xlogid/xrecoff from host byte order to
693-
* network byte order because the both server can be expected to have
694-
* the same byte order. If they have different byte order, we don't
695-
* reach here.
696-
*/
697-
pq_sendbyte(outMsg,'w');
698-
pq_sendbytes(outMsg, (char*)&startptr,sizeof(startptr));
701+
/*
702+
* OK to read and send the slice.
703+
*
704+
* We don't need to convert the xlogid/xrecoff from host byte order to
705+
* network byte order because the both server can be expected to have
706+
* the same byte order. If they have different byte order, we don't
707+
* reach here.
708+
*/
709+
pq_sendbyte(outMsg,'w');
710+
pq_sendbytes(outMsg, (char*)&startptr,sizeof(startptr));
699711

700-
if (endptr.xlogid!=startptr.xlogid)
701-
{
702-
Assert(endptr.xlogid==startptr.xlogid+1);
703-
nbytes=endptr.xrecoff+XLogFileSize-startptr.xrecoff;
704-
}
705-
else
706-
nbytes=endptr.xrecoff-startptr.xrecoff;
712+
if (endptr.xlogid!=startptr.xlogid)
713+
{
714+
Assert(endptr.xlogid==startptr.xlogid+1);
715+
nbytes=endptr.xrecoff+XLogFileSize-startptr.xrecoff;
716+
}
717+
else
718+
nbytes=endptr.xrecoff-startptr.xrecoff;
707719

708-
sentPtr=endptr;
720+
sentPtr=endptr;
709721

710-
/*
711-
* Read the log directly into the output buffer to prevent extra
712-
* memcpy calls.
713-
*/
714-
enlargeStringInfo(outMsg,nbytes);
722+
/*
723+
* Read the log directly into the output buffer to prevent extra
724+
* memcpy calls.
725+
*/
726+
enlargeStringInfo(outMsg,nbytes);
715727

716-
XLogRead(&outMsg->data[outMsg->len],startptr,nbytes);
717-
outMsg->len+=nbytes;
718-
outMsg->data[outMsg->len]='\0';
728+
XLogRead(&outMsg->data[outMsg->len],startptr,nbytes);
729+
outMsg->len+=nbytes;
730+
outMsg->data[outMsg->len]='\0';
719731

720-
pq_putmessage('d',outMsg->data,outMsg->len);
721-
resetStringInfo(outMsg);
722-
}
732+
pq_putmessage('d',outMsg->data,outMsg->len);
733+
resetStringInfo(outMsg);
723734

724735
/* Update shared memory status */
725736
SpinLockAcquire(&walsnd->mutex);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp