Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6423390

Browse files
Send new protocol keepalive messages to standby servers.
Allows streaming replication users to calculate transfer latencyand apply delay via internal functions. No external functions yet.
1 parent2ae2e9c commit6423390

File tree

8 files changed

+258
-16
lines changed

8 files changed

+258
-16
lines changed

‎doc/src/sgml/protocol.sgml

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1463,6 +1463,54 @@ The commands accepted in walsender mode are:
14631463
CopyData message):
14641464
</para>
14651465

1466+
<para>
1467+
<variablelist>
1468+
<varlistentry>
1469+
<term>
1470+
Primary keepalive message (B)
1471+
</term>
1472+
<listitem>
1473+
<para>
1474+
<variablelist>
1475+
<varlistentry>
1476+
<term>
1477+
Byte1('k')
1478+
</term>
1479+
<listitem>
1480+
<para>
1481+
Identifies the message as a sender keepalive.
1482+
</para>
1483+
</listitem>
1484+
</varlistentry>
1485+
<varlistentry>
1486+
<term>
1487+
Byte8
1488+
</term>
1489+
<listitem>
1490+
<para>
1491+
The current end of WAL on the server, given in
1492+
XLogRecPtr format.
1493+
</para>
1494+
</listitem>
1495+
</varlistentry>
1496+
<varlistentry>
1497+
<term>
1498+
Byte8
1499+
</term>
1500+
<listitem>
1501+
<para>
1502+
The server's system clock at the time of transmission,
1503+
given in TimestampTz format.
1504+
</para>
1505+
</listitem>
1506+
</varlistentry>
1507+
</variablelist>
1508+
</para>
1509+
</listitem>
1510+
</varlistentry>
1511+
</variablelist>
1512+
</para>
1513+
14661514
<para>
14671515
<variablelist>
14681516
<varlistentry>

‎src/backend/access/transam/xlog.c

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,9 @@ typedef struct XLogCtlData
452452
XLogRecPtrrecoveryLastRecPtr;
453453
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
454454
TimestampTzrecoveryLastXTime;
455+
/* timestamp of when we started replaying the current chunk of WAL data,
456+
* only relevant for replication or archive recovery */
457+
TimestampTzcurrentChunkStartTime;
455458
/* end of the last record restored from the archive */
456459
XLogRecPtrrestoreLastRecPtr;
457460
/* Are we requested to pause recovery? */
@@ -606,6 +609,7 @@ static void exitArchiveRecovery(TimeLineID endTLI,
606609
staticboolrecoveryStopsHere(XLogRecord*record,bool*includeThis);
607610
staticvoidrecoveryPausesHere(void);
608611
staticvoidSetLatestXTime(TimestampTzxtime);
612+
staticvoidSetCurrentChunkStartTime(TimestampTzxtime);
609613
staticvoidCheckRequiredParameterValues(void);
610614
staticvoidXLogReportParameters(void);
611615
staticvoidLocalSetXLogInsertAllowed(void);
@@ -5847,6 +5851,41 @@ GetLatestXTime(void)
58475851
returnxtime;
58485852
}
58495853

5854+
/*
5855+
* Save timestamp of the next chunk of WAL records to apply.
5856+
*
5857+
* We keep this in XLogCtl, not a simple static variable, so that it can be
5858+
* seen by all backends.
5859+
*/
5860+
staticvoid
5861+
SetCurrentChunkStartTime(TimestampTzxtime)
5862+
{
5863+
/* use volatile pointer to prevent code rearrangement */
5864+
volatileXLogCtlData*xlogctl=XLogCtl;
5865+
5866+
SpinLockAcquire(&xlogctl->info_lck);
5867+
xlogctl->currentChunkStartTime=xtime;
5868+
SpinLockRelease(&xlogctl->info_lck);
5869+
}
5870+
5871+
/*
5872+
* Fetch timestamp of latest processed commit/abort record.
5873+
* Startup process maintains an accurate local copy in XLogReceiptTime
5874+
*/
5875+
TimestampTz
5876+
GetCurrentChunkReplayStartTime(void)
5877+
{
5878+
/* use volatile pointer to prevent code rearrangement */
5879+
volatileXLogCtlData*xlogctl=XLogCtl;
5880+
TimestampTzxtime;
5881+
5882+
SpinLockAcquire(&xlogctl->info_lck);
5883+
xtime=xlogctl->currentChunkStartTime;
5884+
SpinLockRelease(&xlogctl->info_lck);
5885+
5886+
returnxtime;
5887+
}
5888+
58505889
/*
58515890
* Returns time of receipt of current chunk of XLOG data, as well as
58525891
* whether it was received from streaming replication or from archives.
@@ -6390,6 +6429,7 @@ StartupXLOG(void)
63906429
xlogctl->replayEndRecPtr=ReadRecPtr;
63916430
xlogctl->recoveryLastRecPtr=ReadRecPtr;
63926431
xlogctl->recoveryLastXTime=0;
6432+
xlogctl->currentChunkStartTime=0;
63936433
xlogctl->recoveryPause= false;
63946434
SpinLockRelease(&xlogctl->info_lck);
63956435

@@ -9696,7 +9736,10 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
96969736
{
96979737
havedata= true;
96989738
if (!XLByteLT(*RecPtr,latestChunkStart))
9739+
{
96999740
XLogReceiptTime=GetCurrentTimestamp();
9741+
SetCurrentChunkStartTime(XLogReceiptTime);
9742+
}
97009743
}
97019744
else
97029745
havedata= false;

‎src/backend/replication/walreceiver.c

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
124124
staticvoidXLogWalRcvFlush(booldying);
125125
staticvoidXLogWalRcvSendReply(void);
126126
staticvoidXLogWalRcvSendHSFeedback(void);
127+
staticvoidProcessWalSndrMessage(XLogRecPtrwalEnd,TimestampTzsendTime);
127128

128129
/* Signal handlers */
129130
staticvoidWalRcvSigHupHandler(SIGNAL_ARGS);
@@ -218,6 +219,10 @@ WalReceiverMain(void)
218219
/* Fetch information required to start streaming */
219220
strlcpy(conninfo, (char*)walrcv->conninfo,MAXCONNINFO);
220221
startpoint=walrcv->receiveStart;
222+
223+
/* Initialise to a sanish value */
224+
walrcv->lastMsgSendTime=walrcv->lastMsgReceiptTime=GetCurrentTimestamp();
225+
221226
SpinLockRelease(&walrcv->mutex);
222227

223228
/* Arrange to clean up at walreceiver exit */
@@ -433,12 +438,28 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
433438
errmsg_internal("invalid WAL message received from primary")));
434439
/* memcpy is required here for alignment reasons */
435440
memcpy(&msghdr,buf,sizeof(WalDataMessageHeader));
441+
442+
ProcessWalSndrMessage(msghdr.walEnd,msghdr.sendTime);
443+
436444
buf+=sizeof(WalDataMessageHeader);
437445
len-=sizeof(WalDataMessageHeader);
438-
439446
XLogWalRcvWrite(buf,len,msghdr.dataStart);
440447
break;
441448
}
449+
case'k':/* Keepalive */
450+
{
451+
PrimaryKeepaliveMessagekeepalive;
452+
453+
if (len!=sizeof(PrimaryKeepaliveMessage))
454+
ereport(ERROR,
455+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
456+
errmsg_internal("invalid keepalive message received from primary")));
457+
/* memcpy is required here for alignment reasons */
458+
memcpy(&keepalive,buf,sizeof(PrimaryKeepaliveMessage));
459+
460+
ProcessWalSndrMessage(keepalive.walEnd,keepalive.sendTime);
461+
break;
462+
}
442463
default:
443464
ereport(ERROR,
444465
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -711,3 +732,27 @@ XLogWalRcvSendHSFeedback(void)
711732
memcpy(&buf[1],&feedback_message,sizeof(StandbyHSFeedbackMessage));
712733
walrcv_send(buf,sizeof(StandbyHSFeedbackMessage)+1);
713734
}
735+
736+
/*
737+
* Keep track of important messages from primary.
738+
*/
739+
staticvoid
740+
ProcessWalSndrMessage(XLogRecPtrwalEnd,TimestampTzsendTime)
741+
{
742+
/* use volatile pointer to prevent code rearrangement */
743+
volatileWalRcvData*walrcv=WalRcv;
744+
745+
TimestampTzlastMsgReceiptTime=GetCurrentTimestamp();
746+
747+
/* Update shared-memory status */
748+
SpinLockAcquire(&walrcv->mutex);
749+
walrcv->lastMsgSendTime=sendTime;
750+
walrcv->lastMsgReceiptTime=lastMsgReceiptTime;
751+
SpinLockRelease(&walrcv->mutex);
752+
753+
elog(DEBUG2,"sendtime %s receipttime %s replication apply delay %d transfer latency %d",
754+
timestamptz_to_str(sendTime),
755+
timestamptz_to_str(lastMsgReceiptTime),
756+
GetReplicationApplyDelay(),
757+
GetReplicationTransferLatency());
758+
}

‎src/backend/replication/walreceiverfuncs.c

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include"replication/walreceiver.h"
2929
#include"storage/pmsignal.h"
3030
#include"storage/shmem.h"
31+
#include"utils/timestamp.h"
3132

3233
WalRcvData*WalRcv=NULL;
3334

@@ -238,3 +239,65 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
238239

239240
returnrecptr;
240241
}
242+
243+
/*
244+
* Returns the replication apply delay in ms
245+
*/
246+
int
247+
GetReplicationApplyDelay(void)
248+
{
249+
/* use volatile pointer to prevent code rearrangement */
250+
volatileWalRcvData*walrcv=WalRcv;
251+
252+
XLogRecPtrreceivePtr;
253+
XLogRecPtrreplayPtr;
254+
255+
longsecs;
256+
intusecs;
257+
258+
SpinLockAcquire(&walrcv->mutex);
259+
receivePtr=walrcv->receivedUpto;
260+
SpinLockRelease(&walrcv->mutex);
261+
262+
replayPtr=GetXLogReplayRecPtr(NULL);
263+
264+
if (XLByteLE(receivePtr,replayPtr))
265+
return0;
266+
267+
TimestampDifference(GetCurrentChunkReplayStartTime(),
268+
GetCurrentTimestamp(),
269+
&secs,&usecs);
270+
271+
return (((int)secs*1000)+ (usecs /1000));
272+
}
273+
274+
/*
275+
* Returns the network latency in ms, note that this includes any
276+
* difference in clock settings between the servers, as well as timezone.
277+
*/
278+
int
279+
GetReplicationTransferLatency(void)
280+
{
281+
/* use volatile pointer to prevent code rearrangement */
282+
volatileWalRcvData*walrcv=WalRcv;
283+
284+
TimestampTzlastMsgSendTime;
285+
TimestampTzlastMsgReceiptTime;
286+
287+
longsecs=0;
288+
intusecs=0;
289+
intms;
290+
291+
SpinLockAcquire(&walrcv->mutex);
292+
lastMsgSendTime=walrcv->lastMsgSendTime;
293+
lastMsgReceiptTime=walrcv->lastMsgReceiptTime;
294+
SpinLockRelease(&walrcv->mutex);
295+
296+
TimestampDifference(lastMsgSendTime,
297+
lastMsgReceiptTime,
298+
&secs,&usecs);
299+
300+
ms= ((int)secs*1000)+ (usecs /1000);
301+
302+
returnms;
303+
}

‎src/backend/replication/walsender.c

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ static void ProcessStandbyMessage(void);
131131
staticvoidProcessStandbyReplyMessage(void);
132132
staticvoidProcessStandbyHSFeedbackMessage(void);
133133
staticvoidProcessRepliesIfAny(void);
134+
staticvoidWalSndKeepalive(char*msgbuf);
134135

135136

136137
/* Main entry point for walsender process */
@@ -823,30 +824,24 @@ WalSndLoop(void)
823824
*/
824825
if (caughtup||pq_is_send_pending())
825826
{
826-
TimestampTzfinish_time=0;
827-
longsleeptime=-1;
827+
TimestampTztimeout=0;
828+
longsleeptime=10000;/* 10 s */
828829
intwakeEvents;
829830

830831
wakeEvents=WL_LATCH_SET |WL_POSTMASTER_DEATH |
831-
WL_SOCKET_READABLE;
832+
WL_SOCKET_READABLE |WL_TIMEOUT;
833+
832834
if (pq_is_send_pending())
833835
wakeEvents |=WL_SOCKET_WRITEABLE;
836+
else
837+
WalSndKeepalive(output_message);
834838

835839
/* Determine time until replication timeout */
836840
if (replication_timeout>0)
837841
{
838-
longsecs;
839-
intusecs;
840-
841-
finish_time=TimestampTzPlusMilliseconds(last_reply_timestamp,
842+
timeout=TimestampTzPlusMilliseconds(last_reply_timestamp,
842843
replication_timeout);
843-
TimestampDifference(GetCurrentTimestamp(),
844-
finish_time,&secs,&usecs);
845-
sleeptime=secs*1000+usecs /1000;
846-
/* Avoid Assert in WaitLatchOrSocket if timeout is past */
847-
if (sleeptime<0)
848-
sleeptime=0;
849-
wakeEvents |=WL_TIMEOUT;
844+
sleeptime=1+ (replication_timeout /10);
850845
}
851846

852847
/* Sleep until something happens or replication timeout */
@@ -859,7 +854,7 @@ WalSndLoop(void)
859854
* timeout ... he's supposed to reply *before* that.
860855
*/
861856
if (replication_timeout>0&&
862-
GetCurrentTimestamp() >=finish_time)
857+
GetCurrentTimestamp() >=timeout)
863858
{
864859
/*
865860
* Since typically expiration of replication timeout means
@@ -1627,6 +1622,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
16271622
return (Datum)0;
16281623
}
16291624

1625+
staticvoid
1626+
WalSndKeepalive(char*msgbuf)
1627+
{
1628+
PrimaryKeepaliveMessagekeepalive_message;
1629+
1630+
/* Construct a new message */
1631+
keepalive_message.walEnd=sentPtr;
1632+
keepalive_message.sendTime=GetCurrentTimestamp();
1633+
1634+
elog(DEBUG2,"sending replication keepalive");
1635+
1636+
/* Prepend with the message type and send it. */
1637+
msgbuf[0]='k';
1638+
memcpy(msgbuf+1,&keepalive_message,sizeof(PrimaryKeepaliveMessage));
1639+
pq_putmessage_noblock('d',msgbuf,sizeof(PrimaryKeepaliveMessage)+1);
1640+
}
1641+
16301642
/*
16311643
* This isn't currently used for anything. Monitoring tools might be
16321644
* interested in the future, and we'll need something like this in the

‎src/include/access/xlog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ extern XLogRecPtr GetXLogWriteRecPtr(void);
293293
externboolRecoveryIsPaused(void);
294294
externvoidSetRecoveryPause(boolrecoveryPause);
295295
externTimestampTzGetLatestXTime(void);
296+
externTimestampTzGetCurrentChunkReplayStartTime(void);
296297

297298
externvoidUpdateControlFile(void);
298299
externuint64GetSystemIdentifier(void);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp