Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit06828c5

Browse files
Separate messages for standby replies and hot standby feedback.
Allow messages to be sent at different times, and greatly reducethe frequency of hot standby feedback. Refactor to allow additionalmessage types.
1 parent45a6d79 commit06828c5

File tree

3 files changed

+168
-75
lines changed

3 files changed

+168
-75
lines changed

‎src/backend/replication/walreceiver.c

Lines changed: 75 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ static struct
9595
}LogstreamResult;
9696

9797
staticStandbyReplyMessagereply_message;
98+
staticStandbyHSFeedbackMessagefeedback_message;
9899

99100
/*
100101
* About SIGTERM handling:
@@ -123,6 +124,7 @@ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
123124
staticvoidXLogWalRcvWrite(char*buf,Sizenbytes,XLogRecPtrrecptr);
124125
staticvoidXLogWalRcvFlush(booldying);
125126
staticvoidXLogWalRcvSendReply(void);
127+
staticvoidXLogWalRcvSendHSFeedback(void);
126128

127129
/* Signal handlers */
128130
staticvoidWalRcvSigHupHandler(SIGNAL_ARGS);
@@ -317,6 +319,7 @@ WalReceiverMain(void)
317319

318320
/* Let the master know that we received some data. */
319321
XLogWalRcvSendReply();
322+
XLogWalRcvSendHSFeedback();
320323

321324
/*
322325
* If we've written some records, flush them to disk and let the
@@ -331,6 +334,7 @@ WalReceiverMain(void)
331334
* the master anyway, to report any progress in applying WAL.
332335
*/
333336
XLogWalRcvSendReply();
337+
XLogWalRcvSendHSFeedback();
334338
}
335339
}
336340
}
@@ -619,40 +623,82 @@ XLogWalRcvSendReply(void)
619623
reply_message.apply=GetXLogReplayRecPtr();
620624
reply_message.sendTime=now;
621625

622-
/*
623-
* Get the OldestXmin and its associated epoch
624-
*/
625-
if (hot_standby_feedback&&HotStandbyActive())
626-
{
627-
TransactionIdnextXid;
628-
uint32nextEpoch;
629-
630-
reply_message.xmin=GetOldestXmin(true, false);
631-
632-
/*
633-
* Get epoch and adjust if nextXid and oldestXmin are different
634-
* sides of the epoch boundary.
635-
*/
636-
GetNextXidAndEpoch(&nextXid,&nextEpoch);
637-
if (nextXid<reply_message.xmin)
638-
nextEpoch--;
639-
reply_message.epoch=nextEpoch;
640-
}
641-
else
642-
{
643-
reply_message.xmin=InvalidTransactionId;
644-
reply_message.epoch=0;
645-
}
646-
647-
elog(DEBUG2,"sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
626+
elog(DEBUG2,"sending write %X/%X flush %X/%X apply %X/%X",
648627
reply_message.write.xlogid,reply_message.write.xrecoff,
649628
reply_message.flush.xlogid,reply_message.flush.xrecoff,
650-
reply_message.apply.xlogid,reply_message.apply.xrecoff,
651-
reply_message.xmin,
652-
reply_message.epoch);
629+
reply_message.apply.xlogid,reply_message.apply.xrecoff);
653630

654631
/* Prepend with the message type and send it. */
655632
buf[0]='r';
656633
memcpy(&buf[1],&reply_message,sizeof(StandbyReplyMessage));
657634
walrcv_send(buf,sizeof(StandbyReplyMessage)+1);
658635
}
636+
637+
/*
638+
* Send hot standby feedback message to primary, plus the current time,
639+
* in case they don't have a watch.
640+
*/
641+
staticvoid
642+
XLogWalRcvSendHSFeedback(void)
643+
{
644+
charbuf[sizeof(StandbyHSFeedbackMessage)+1];
645+
TimestampTznow;
646+
TransactionIdnextXid;
647+
uint32nextEpoch;
648+
TransactionIdxmin;
649+
650+
/*
651+
* If the user doesn't want status to be reported to the master, be sure
652+
* to exit before doing anything at all.
653+
*/
654+
if (!hot_standby_feedback)
655+
return;
656+
657+
/* Get current timestamp. */
658+
now=GetCurrentTimestamp();
659+
660+
/*
661+
* Send feedback at most once per wal_receiver_status_interval.
662+
*/
663+
if (!TimestampDifferenceExceeds(feedback_message.sendTime,now,
664+
wal_receiver_status_interval*1000))
665+
return;
666+
667+
/*
668+
* If Hot Standby is not yet active there is nothing to send.
669+
* Check this after the interval has expired to reduce number of
670+
* calls.
671+
*/
672+
if (!HotStandbyActive())
673+
return;
674+
675+
/*
676+
* Make the expensive call to get the oldest xmin once we are
677+
* certain everything else has been checked.
678+
*/
679+
xmin=GetOldestXmin(true, false);
680+
681+
/*
682+
* Get epoch and adjust if nextXid and oldestXmin are different
683+
* sides of the epoch boundary.
684+
*/
685+
GetNextXidAndEpoch(&nextXid,&nextEpoch);
686+
if (nextXid<xmin)
687+
nextEpoch--;
688+
689+
/*
690+
* Always send feedback message.
691+
*/
692+
feedback_message.sendTime=now;
693+
feedback_message.xmin=xmin;
694+
feedback_message.epoch=nextEpoch;
695+
696+
elog(DEBUG2,"sending hot standby feedback xmin %u epoch %u",
697+
feedback_message.xmin,
698+
feedback_message.epoch);
699+
700+
/* Prepend with the message type and send it. */
701+
buf[0]='h';
702+
memcpy(&buf[1],&feedback_message,sizeof(StandbyHSFeedbackMessage));
703+
walrcv_send(buf,sizeof(StandbyHSFeedbackMessage)+1);
704+
}

‎src/backend/replication/walsender.c

Lines changed: 80 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ static void WalSndKill(int code, Datum arg);
116116
staticboolXLogSend(char*msgbuf,bool*caughtup);
117117
staticvoidIdentifySystem(void);
118118
staticvoidStartReplication(StartReplicationCmd*cmd);
119+
staticvoidProcessStandbyMessage(void);
119120
staticvoidProcessStandbyReplyMessage(void);
121+
staticvoidProcessStandbyHSFeedbackMessage(void);
120122
staticvoidProcessRepliesIfAny(void);
121123

122124

@@ -456,54 +458,55 @@ ProcessRepliesIfAny(void)
456458
unsignedcharfirstchar;
457459
intr;
458460

459-
r=pq_getbyte_if_available(&firstchar);
460-
if (r<0)
461-
{
462-
/* unexpected error or EOF */
463-
ereport(COMMERROR,
464-
(errcode(ERRCODE_PROTOCOL_VIOLATION),
465-
errmsg("unexpected EOF on standby connection")));
466-
proc_exit(0);
467-
}
468-
if (r==0)
461+
for (;;)
469462
{
470-
/* no data available without blocking */
471-
return;
472-
}
463+
r=pq_getbyte_if_available(&firstchar);
464+
if (r<0)
465+
{
466+
/* unexpected error or EOF */
467+
ereport(COMMERROR,
468+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
469+
errmsg("unexpected EOF on standby connection")));
470+
proc_exit(0);
471+
}
472+
if (r==0)
473+
{
474+
/* no data available without blocking */
475+
return;
476+
}
473477

474-
/* Handle the very limited subset of commands expected in this phase */
475-
switch (firstchar)
476-
{
477-
/*
478-
* 'd' means a standby reply wrapped in a CopyData packet.
479-
*/
480-
case'd':
481-
ProcessStandbyReplyMessage();
482-
break;
478+
/* Handle the very limited subset of commands expected in this phase */
479+
switch (firstchar)
480+
{
481+
/*
482+
* 'd' means a standby reply wrapped in a CopyData packet.
483+
*/
484+
case'd':
485+
ProcessStandbyMessage();
486+
break;
483487

484-
/*
485-
* 'X' means that the standby is closing down the socket.
486-
*/
487-
case'X':
488-
proc_exit(0);
488+
/*
489+
* 'X' means that the standby is closing down the socket.
490+
*/
491+
case'X':
492+
proc_exit(0);
489493

490-
default:
491-
ereport(FATAL,
492-
(errcode(ERRCODE_PROTOCOL_VIOLATION),
493-
errmsg("invalid standby closing message type %d",
494-
firstchar)));
494+
default:
495+
ereport(FATAL,
496+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
497+
errmsg("invalid standby closing message type %d",
498+
firstchar)));
499+
}
495500
}
496501
}
497502

498503
/*
499504
* Process a status update message received from standby.
500505
*/
501506
staticvoid
502-
ProcessStandbyReplyMessage(void)
507+
ProcessStandbyMessage(void)
503508
{
504-
StandbyReplyMessagereply;
505509
charmsgtype;
506-
TransactionIdnewxmin=InvalidTransactionId;
507510

508511
resetStringInfo(&reply_message);
509512

@@ -523,22 +526,39 @@ ProcessStandbyReplyMessage(void)
523526
* one type.
524527
*/
525528
msgtype=pq_getmsgbyte(&reply_message);
526-
if (msgtype!='r')
529+
530+
switch (msgtype)
527531
{
528-
ereport(COMMERROR,
529-
(errcode(ERRCODE_PROTOCOL_VIOLATION),
530-
errmsg("unexpected message type %c",msgtype)));
531-
proc_exit(0);
532+
case'r':
533+
ProcessStandbyReplyMessage();
534+
break;
535+
536+
case'h':
537+
ProcessStandbyHSFeedbackMessage();
538+
break;
539+
540+
default:
541+
ereport(COMMERROR,
542+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
543+
errmsg("unexpected message type %c",msgtype)));
544+
proc_exit(0);
532545
}
546+
}
547+
548+
/*
549+
* Regular reply from standby advising of WAL positions on standby server.
550+
*/
551+
staticvoid
552+
ProcessStandbyReplyMessage(void)
553+
{
554+
StandbyReplyMessagereply;
533555

534556
pq_copymsgbytes(&reply_message, (char*)&reply,sizeof(StandbyReplyMessage));
535557

536-
elog(DEBUG2,"write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
558+
elog(DEBUG2,"write %X/%X flush %X/%X apply %X/%X",
537559
reply.write.xlogid,reply.write.xrecoff,
538560
reply.flush.xlogid,reply.flush.xrecoff,
539-
reply.apply.xlogid,reply.apply.xrecoff,
540-
reply.xmin,
541-
reply.epoch);
561+
reply.apply.xlogid,reply.apply.xrecoff);
542562

543563
/*
544564
* Update shared state for this WalSender process
@@ -554,6 +574,22 @@ ProcessStandbyReplyMessage(void)
554574
walsnd->apply=reply.apply;
555575
SpinLockRelease(&walsnd->mutex);
556576
}
577+
}
578+
579+
/*
580+
* Hot Standby feedback
581+
*/
582+
staticvoid
583+
ProcessStandbyHSFeedbackMessage(void)
584+
{
585+
StandbyHSFeedbackMessagereply;
586+
TransactionIdnewxmin=InvalidTransactionId;
587+
588+
pq_copymsgbytes(&reply_message, (char*)&reply,sizeof(StandbyHSFeedbackMessage));
589+
590+
elog(DEBUG2,"hot standby feedback xmin %u epoch %u",
591+
reply.xmin,
592+
reply.epoch);
557593

558594
/*
559595
* Update the WalSender's proc xmin to allow it to be visible

‎src/include/replication/walprotocol.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,18 @@ typedef struct
5656
XLogRecPtrflush;
5757
XLogRecPtrapply;
5858

59+
/* Sender's system clock at the time of transmission */
60+
TimestampTzsendTime;
61+
}StandbyReplyMessage;
62+
63+
/*
64+
* Hot Standby feedback from standby (message type 'h'). This is wrapped within
65+
* a CopyData message at the FE/BE protocol level.
66+
*
67+
* Note that the data length is not specified here.
68+
*/
69+
typedefstruct
70+
{
5971
/*
6072
* The current xmin and epoch from the standby, for Hot Standby feedback.
6173
* This may be invalid if the standby-side does not support feedback,
@@ -64,10 +76,9 @@ typedef struct
6476
TransactionIdxmin;
6577
uint32epoch;
6678

67-
6879
/* Sender's system clock at the time of transmission */
6980
TimestampTzsendTime;
70-
}StandbyReplyMessage;
81+
}StandbyHSFeedbackMessage;
7182

7283
/*
7384
* Maximum data payload in a WAL data message.Must be >= XLOG_BLCKSZ.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp