3030 *
3131 *
3232 * IDENTIFICATION
33- * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.20 2010/05/09 18:11:55 tgl Exp $
33+ * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.21 2010/05/26 22:21:33 heikki Exp $
3434 *
3535 *-------------------------------------------------------------------------
3636 */
@@ -100,13 +100,19 @@ static void InitWalSnd(void);
100100static void WalSndHandshake (void );
101101static void WalSndKill (int code ,Datum arg );
102102static void XLogRead (char * buf ,XLogRecPtr recptr ,Size nbytes );
103- static bool XLogSend (StringInfo outMsg );
103+ static bool XLogSend (StringInfo outMsg , bool * caughtup );
104104static void CheckClosedConnection (void );
105105
106106/*
107107 * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
108+ *
109+ * We don't have a good idea of what a good value would be; there's some
110+ * overhead per message in both walsender and walreceiver, but on the other
111+ * hand sending large batches makes walsender less responsive to signals
112+ * because signals are checked only between messages. 128kB seems like
113+ * a reasonable guess for now.
108114 */
109- #define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2 )
115+ #define MAX_SEND_SIZE (128 * 1024 )
110116
111117/* Main entry point for walsender process */
112118int
@@ -360,6 +366,7 @@ static int
360366WalSndLoop (void )
361367{
362368StringInfoData output_message ;
369+ bool caughtup = false;
363370
364371initStringInfo (& output_message );
365372
@@ -387,7 +394,7 @@ WalSndLoop(void)
387394 */
388395if (ready_to_stop )
389396{
390- XLogSend (& output_message );
397+ XLogSend (& output_message , & caughtup );
391398shutdown_requested = true;
392399}
393400
@@ -402,31 +409,32 @@ WalSndLoop(void)
402409}
403410
404411/*
405- * Nap for the configured time or until a message arrives.
412+ * If we had sent all accumulated WAL in last round, nap for the
413+ * configured time before retrying.
406414 *
407415 * On some platforms, signals won't interrupt the sleep. To ensure we
408416 * respond reasonably promptly when someone signals us, break down the
409417 * sleep into NAPTIME_PER_CYCLE increments, and check for
410418 * interrupts after each nap.
411419 */
412- remain = WalSndDelay * 1000L ;
413- while (remain > 0 )
420+ if (caughtup )
414421{
415- if (got_SIGHUP || shutdown_requested || ready_to_stop )
416- break ;
422+ remain = WalSndDelay * 1000L ;
423+ while (remain > 0 )
424+ {
425+ /* Check for interrupts */
426+ if (got_SIGHUP || shutdown_requested || ready_to_stop )
427+ break ;
417428
418- /*
419- * Check to see whether a message from the standby or an interrupt
420- * from other processes has arrived.
421- */
422- pg_usleep (remain > NAPTIME_PER_CYCLE ?NAPTIME_PER_CYCLE :remain );
423- CheckClosedConnection ();
429+ /* Sleep and check that the connection is still alive */
430+ pg_usleep (remain > NAPTIME_PER_CYCLE ?NAPTIME_PER_CYCLE :remain );
431+ CheckClosedConnection ();
424432
425- remain -= NAPTIME_PER_CYCLE ;
433+ remain -= NAPTIME_PER_CYCLE ;
434+ }
426435}
427-
428436/* Attempt to send the log once every loop */
429- if (!XLogSend (& output_message ))
437+ if (!XLogSend (& output_message , & caughtup ))
430438gotoeof ;
431439}
432440
@@ -623,15 +631,20 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
623631}
624632
625633/*
626- * Read all WAL that's been written (and flushed) since last cycle, and send
627- * it to client.
634+ * Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed),
635+ * but not yet sent to the client, and send it. If there is no unsent WAL,
636+ * *caughtup is set to true and nothing is sent, otherwise *caughtup is set
637+ * to false.
628638 *
629639 * Returns true if OK, false if trouble.
630640 */
631641static bool
632- XLogSend (StringInfo outMsg )
642+ XLogSend (StringInfo outMsg , bool * caughtup )
633643{
634644XLogRecPtr SendRqstPtr ;
645+ XLogRecPtr startptr ;
646+ XLogRecPtr endptr ;
647+ Size nbytes ;
635648char activitymsg [50 ];
636649
637650/* use volatile pointer to prevent code rearrangement */
@@ -642,84 +655,82 @@ XLogSend(StringInfo outMsg)
642655
643656/* Quick exit if nothing to do */
644657if (!XLByteLT (sentPtr ,SendRqstPtr ))
658+ {
659+ * caughtup = true;
645660return true;
661+ }
662+ /*
663+ * Otherwise let the caller know that we're not fully caught up. Unless
664+ * there's a huge backlog, we'll be caught up to the current WriteRecPtr
665+ * after we've sent everything below, but more WAL could accumulate while
666+ * we're busy sending.
667+ */
668+ * caughtup = false;
646669
647670/*
648- * We gather multiple records together by issuing just one XLogRead() of a
649- * suitable size, and send them as one CopyData message. Repeat until
650- * we've sent everything we can.
671+ * Figure out how much to send in one message. If there's less than
672+ * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
673+ * MAX_SEND_SIZE bytes, but round to page boundary.
674+ *
675+ * The rounding is not only for performance reasons. Walreceiver
676+ * relies on the fact that we never split a WAL record across two
677+ * messages. Since a long WAL record is split at page boundary into
678+ * continuation records, page boundary is always a safe cut-off point.
679+ * We also assume that SendRqstPtr never points in the middle of a WAL
680+ * record.
651681 */
652- while (XLByteLT (sentPtr ,SendRqstPtr ))
682+ startptr = sentPtr ;
683+ if (startptr .xrecoff >=XLogFileSize )
653684{
654- XLogRecPtr startptr ;
655- XLogRecPtr endptr ;
656- Size nbytes ;
657-
658685/*
659- * Figure out how much to send in one message. If there's less than
660- * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
661- * MAX_SEND_SIZE bytes, but round to page boundary.
662- *
663- * The rounding is not only for performance reasons. Walreceiver
664- * relies on the fact that we never split a WAL record across two
665- * messages. Since a long WAL record is split at page boundary into
666- * continuation records, page boundary is always a safe cut-off point.
667- * We also assume that SendRqstPtr never points in the middle of a WAL
668- * record.
686+ * crossing a logid boundary, skip the non-existent last log
687+ * segment in previous logical log file.
669688 */
670- startptr = sentPtr ;
671- if (startptr .xrecoff >=XLogFileSize )
672- {
673- /*
674- * crossing a logid boundary, skip the non-existent last log
675- * segment in previous logical log file.
676- */
677- startptr .xlogid += 1 ;
678- startptr .xrecoff = 0 ;
679- }
689+ startptr .xlogid += 1 ;
690+ startptr .xrecoff = 0 ;
691+ }
680692
681- endptr = startptr ;
682- XLByteAdvance (endptr ,MAX_SEND_SIZE );
683- /* round down to page boundary. */
684- endptr .xrecoff -= (endptr .xrecoff %XLOG_BLCKSZ );
685- /* if we went beyond SendRqstPtr, back off */
686- if (XLByteLT (SendRqstPtr ,endptr ))
687- endptr = SendRqstPtr ;
693+ endptr = startptr ;
694+ XLByteAdvance (endptr ,MAX_SEND_SIZE );
695+ /* round down to page boundary. */
696+ endptr .xrecoff -= (endptr .xrecoff %XLOG_BLCKSZ );
697+ /* if we went beyond SendRqstPtr, back off */
698+ if (XLByteLT (SendRqstPtr ,endptr ))
699+ endptr = SendRqstPtr ;
688700
689- /*
690- * OK to read and send the slice.
691- *
692- * We don't need to convert the xlogid/xrecoff from host byte order to
693- * network byte order because the both server can be expected to have
694- * the same byte order. If they have different byte order, we don't
695- * reach here.
696- */
697- pq_sendbyte (outMsg ,'w' );
698- pq_sendbytes (outMsg , (char * )& startptr ,sizeof (startptr ));
701+ /*
702+ * OK to read and send the slice.
703+ *
704+ * We don't need to convert the xlogid/xrecoff from host byte order to
705+ * network byte order because the both server can be expected to have
706+ * the same byte order. If they have different byte order, we don't
707+ * reach here.
708+ */
709+ pq_sendbyte (outMsg ,'w' );
710+ pq_sendbytes (outMsg , (char * )& startptr ,sizeof (startptr ));
699711
700- if (endptr .xlogid != startptr .xlogid )
701- {
702- Assert (endptr .xlogid == startptr .xlogid + 1 );
703- nbytes = endptr .xrecoff + XLogFileSize - startptr .xrecoff ;
704- }
705- else
706- nbytes = endptr .xrecoff - startptr .xrecoff ;
712+ if (endptr .xlogid != startptr .xlogid )
713+ {
714+ Assert (endptr .xlogid == startptr .xlogid + 1 );
715+ nbytes = endptr .xrecoff + XLogFileSize - startptr .xrecoff ;
716+ }
717+ else
718+ nbytes = endptr .xrecoff - startptr .xrecoff ;
707719
708- sentPtr = endptr ;
720+ sentPtr = endptr ;
709721
710- /*
711- * Read the log directly into the output buffer to prevent extra
712- * memcpy calls.
713- */
714- enlargeStringInfo (outMsg ,nbytes );
722+ /*
723+ * Read the log directly into the output buffer to prevent extra
724+ * memcpy calls.
725+ */
726+ enlargeStringInfo (outMsg ,nbytes );
715727
716- XLogRead (& outMsg -> data [outMsg -> len ],startptr ,nbytes );
717- outMsg -> len += nbytes ;
718- outMsg -> data [outMsg -> len ]= '\0' ;
728+ XLogRead (& outMsg -> data [outMsg -> len ],startptr ,nbytes );
729+ outMsg -> len += nbytes ;
730+ outMsg -> data [outMsg -> len ]= '\0' ;
719731
720- pq_putmessage ('d' ,outMsg -> data ,outMsg -> len );
721- resetStringInfo (outMsg );
722- }
732+ pq_putmessage ('d' ,outMsg -> data ,outMsg -> len );
733+ resetStringInfo (outMsg );
723734
724735/* Update shared memory status */
725736SpinLockAcquire (& walsnd -> mutex );