30
30
*
31
31
*
32
32
* IDENTIFICATION
33
- * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.20 2010/05/09 18:11:55 tgl Exp $
33
+ * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.21 2010/05/26 22:21:33 heikki Exp $
34
34
*
35
35
*-------------------------------------------------------------------------
36
36
*/
@@ -100,13 +100,19 @@ static void InitWalSnd(void);
100
100
static void WalSndHandshake (void );
101
101
static void WalSndKill (int code ,Datum arg );
102
102
static void XLogRead (char * buf ,XLogRecPtr recptr ,Size nbytes );
103
- static bool XLogSend (StringInfo outMsg );
103
+ static bool XLogSend (StringInfo outMsg , bool * caughtup );
104
104
static void CheckClosedConnection (void );
105
105
106
106
/*
107
107
* How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
108
+ *
109
+ * We don't have a good idea of what a good value would be; there's some
110
+ * overhead per message in both walsender and walreceiver, but on the other
111
+ * hand sending large batches makes walsender less responsive to signals
112
+ * because signals are checked only between messages. 128kB seems like
113
+ * a reasonable guess for now.
108
114
*/
109
- #define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2 )
115
+ #define MAX_SEND_SIZE (128 * 1024 )
110
116
111
117
/* Main entry point for walsender process */
112
118
int
@@ -360,6 +366,7 @@ static int
360
366
WalSndLoop (void )
361
367
{
362
368
StringInfoData output_message ;
369
+ bool caughtup = false;
363
370
364
371
initStringInfo (& output_message );
365
372
@@ -387,7 +394,7 @@ WalSndLoop(void)
387
394
*/
388
395
if (ready_to_stop )
389
396
{
390
- XLogSend (& output_message );
397
+ XLogSend (& output_message , & caughtup );
391
398
shutdown_requested = true;
392
399
}
393
400
@@ -402,31 +409,32 @@ WalSndLoop(void)
402
409
}
403
410
404
411
/*
405
- * Nap for the configured time or until a message arrives.
412
+ * If we had sent all accumulated WAL in last round, nap for the
413
+ * configured time before retrying.
406
414
*
407
415
* On some platforms, signals won't interrupt the sleep. To ensure we
408
416
* respond reasonably promptly when someone signals us, break down the
409
417
* sleep into NAPTIME_PER_CYCLE increments, and check for
410
418
* interrupts after each nap.
411
419
*/
412
- remain = WalSndDelay * 1000L ;
413
- while (remain > 0 )
420
+ if (caughtup )
414
421
{
415
- if (got_SIGHUP || shutdown_requested || ready_to_stop )
416
- break ;
422
+ remain = WalSndDelay * 1000L ;
423
+ while (remain > 0 )
424
+ {
425
+ /* Check for interrupts */
426
+ if (got_SIGHUP || shutdown_requested || ready_to_stop )
427
+ break ;
417
428
418
- /*
419
- * Check to see whether a message from the standby or an interrupt
420
- * from other processes has arrived.
421
- */
422
- pg_usleep (remain > NAPTIME_PER_CYCLE ?NAPTIME_PER_CYCLE :remain );
423
- CheckClosedConnection ();
429
+ /* Sleep and check that the connection is still alive */
430
+ pg_usleep (remain > NAPTIME_PER_CYCLE ?NAPTIME_PER_CYCLE :remain );
431
+ CheckClosedConnection ();
424
432
425
- remain -= NAPTIME_PER_CYCLE ;
433
+ remain -= NAPTIME_PER_CYCLE ;
434
+ }
426
435
}
427
-
428
436
/* Attempt to send the log once every loop */
429
- if (!XLogSend (& output_message ))
437
+ if (!XLogSend (& output_message , & caughtup ))
430
438
gotoeof ;
431
439
}
432
440
@@ -623,15 +631,20 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
623
631
}
624
632
625
633
/*
626
- * Read all WAL that's been written (and flushed) since last cycle, and send
627
- * it to client.
634
+ * Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed),
635
+ * but not yet sent to the client, and send it. If there is no unsent WAL,
636
+ * *caughtup is set to true and nothing is sent, otherwise *caughtup is set
637
+ * to false.
628
638
*
629
639
* Returns true if OK, false if trouble.
630
640
*/
631
641
static bool
632
- XLogSend (StringInfo outMsg )
642
+ XLogSend (StringInfo outMsg , bool * caughtup )
633
643
{
634
644
XLogRecPtr SendRqstPtr ;
645
+ XLogRecPtr startptr ;
646
+ XLogRecPtr endptr ;
647
+ Size nbytes ;
635
648
char activitymsg [50 ];
636
649
637
650
/* use volatile pointer to prevent code rearrangement */
@@ -642,84 +655,82 @@ XLogSend(StringInfo outMsg)
642
655
643
656
/* Quick exit if nothing to do */
644
657
if (!XLByteLT (sentPtr ,SendRqstPtr ))
658
+ {
659
+ * caughtup = true;
645
660
return true;
661
+ }
662
+ /*
663
+ * Otherwise let the caller know that we're not fully caught up. Unless
664
+ * there's a huge backlog, we'll be caught up to the current WriteRecPtr
665
+ * after we've sent everything below, but more WAL could accumulate while
666
+ * we're busy sending.
667
+ */
668
+ * caughtup = false;
646
669
647
670
/*
648
- * We gather multiple records together by issuing just one XLogRead() of a
649
- * suitable size, and send them as one CopyData message. Repeat until
650
- * we've sent everything we can.
671
+ * Figure out how much to send in one message. If there's less than
672
+ * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
673
+ * MAX_SEND_SIZE bytes, but round to page boundary.
674
+ *
675
+ * The rounding is not only for performance reasons. Walreceiver
676
+ * relies on the fact that we never split a WAL record across two
677
+ * messages. Since a long WAL record is split at page boundary into
678
+ * continuation records, page boundary is always a safe cut-off point.
679
+ * We also assume that SendRqstPtr never points in the middle of a WAL
680
+ * record.
651
681
*/
652
- while (XLByteLT (sentPtr ,SendRqstPtr ))
682
+ startptr = sentPtr ;
683
+ if (startptr .xrecoff >=XLogFileSize )
653
684
{
654
- XLogRecPtr startptr ;
655
- XLogRecPtr endptr ;
656
- Size nbytes ;
657
-
658
685
/*
659
- * Figure out how much to send in one message. If there's less than
660
- * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
661
- * MAX_SEND_SIZE bytes, but round to page boundary.
662
- *
663
- * The rounding is not only for performance reasons. Walreceiver
664
- * relies on the fact that we never split a WAL record across two
665
- * messages. Since a long WAL record is split at page boundary into
666
- * continuation records, page boundary is always a safe cut-off point.
667
- * We also assume that SendRqstPtr never points in the middle of a WAL
668
- * record.
686
+ * crossing a logid boundary, skip the non-existent last log
687
+ * segment in previous logical log file.
669
688
*/
670
- startptr = sentPtr ;
671
- if (startptr .xrecoff >=XLogFileSize )
672
- {
673
- /*
674
- * crossing a logid boundary, skip the non-existent last log
675
- * segment in previous logical log file.
676
- */
677
- startptr .xlogid += 1 ;
678
- startptr .xrecoff = 0 ;
679
- }
689
+ startptr .xlogid += 1 ;
690
+ startptr .xrecoff = 0 ;
691
+ }
680
692
681
- endptr = startptr ;
682
- XLByteAdvance (endptr ,MAX_SEND_SIZE );
683
- /* round down to page boundary. */
684
- endptr .xrecoff -= (endptr .xrecoff %XLOG_BLCKSZ );
685
- /* if we went beyond SendRqstPtr, back off */
686
- if (XLByteLT (SendRqstPtr ,endptr ))
687
- endptr = SendRqstPtr ;
693
+ endptr = startptr ;
694
+ XLByteAdvance (endptr ,MAX_SEND_SIZE );
695
+ /* round down to page boundary. */
696
+ endptr .xrecoff -= (endptr .xrecoff %XLOG_BLCKSZ );
697
+ /* if we went beyond SendRqstPtr, back off */
698
+ if (XLByteLT (SendRqstPtr ,endptr ))
699
+ endptr = SendRqstPtr ;
688
700
689
- /*
690
- * OK to read and send the slice.
691
- *
692
- * We don't need to convert the xlogid/xrecoff from host byte order to
693
- * network byte order because the both server can be expected to have
694
- * the same byte order. If they have different byte order, we don't
695
- * reach here.
696
- */
697
- pq_sendbyte (outMsg ,'w' );
698
- pq_sendbytes (outMsg , (char * )& startptr ,sizeof (startptr ));
701
+ /*
702
+ * OK to read and send the slice.
703
+ *
704
+ * We don't need to convert the xlogid/xrecoff from host byte order to
705
+ * network byte order because the both server can be expected to have
706
+ * the same byte order. If they have different byte order, we don't
707
+ * reach here.
708
+ */
709
+ pq_sendbyte (outMsg ,'w' );
710
+ pq_sendbytes (outMsg , (char * )& startptr ,sizeof (startptr ));
699
711
700
- if (endptr .xlogid != startptr .xlogid )
701
- {
702
- Assert (endptr .xlogid == startptr .xlogid + 1 );
703
- nbytes = endptr .xrecoff + XLogFileSize - startptr .xrecoff ;
704
- }
705
- else
706
- nbytes = endptr .xrecoff - startptr .xrecoff ;
712
+ if (endptr .xlogid != startptr .xlogid )
713
+ {
714
+ Assert (endptr .xlogid == startptr .xlogid + 1 );
715
+ nbytes = endptr .xrecoff + XLogFileSize - startptr .xrecoff ;
716
+ }
717
+ else
718
+ nbytes = endptr .xrecoff - startptr .xrecoff ;
707
719
708
- sentPtr = endptr ;
720
+ sentPtr = endptr ;
709
721
710
- /*
711
- * Read the log directly into the output buffer to prevent extra
712
- * memcpy calls.
713
- */
714
- enlargeStringInfo (outMsg ,nbytes );
722
+ /*
723
+ * Read the log directly into the output buffer to prevent extra
724
+ * memcpy calls.
725
+ */
726
+ enlargeStringInfo (outMsg ,nbytes );
715
727
716
- XLogRead (& outMsg -> data [outMsg -> len ],startptr ,nbytes );
717
- outMsg -> len += nbytes ;
718
- outMsg -> data [outMsg -> len ]= '\0' ;
728
+ XLogRead (& outMsg -> data [outMsg -> len ],startptr ,nbytes );
729
+ outMsg -> len += nbytes ;
730
+ outMsg -> data [outMsg -> len ]= '\0' ;
719
731
720
- pq_putmessage ('d' ,outMsg -> data ,outMsg -> len );
721
- resetStringInfo (outMsg );
722
- }
732
+ pq_putmessage ('d' ,outMsg -> data ,outMsg -> len );
733
+ resetStringInfo (outMsg );
723
734
724
735
/* Update shared memory status */
725
736
SpinLockAcquire (& walsnd -> mutex );