Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit808969d

Browse files
committed
Add a message type header to the CopyData messages sent from primary
to standby in streaming replication. While we only have one message typeat the moment, adding a message type header makes this easier to extend.
1 parent47c5b8f commit808969d

File tree

5 files changed

+117
-35
lines changed

5 files changed

+117
-35
lines changed

‎doc/src/sgml/protocol.sgml

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<!-- $PostgreSQL: pgsql/doc/src/sgml/protocol.sgml,v 1.77 2010/01/15 09:18:59 heikki Exp $ -->
1+
<!-- $PostgreSQL: pgsql/doc/src/sgml/protocol.sgml,v 1.78 2010/02/03 09:47:19 heikki Exp $ -->
22

33
<chapter id="protocol">
44
<title>Frontend/Backend Protocol</title>
@@ -4179,12 +4179,65 @@ The commands accepted in walsender mode are:
41794179
already been recycled. On success, server responds with a
41804180
CopyOutResponse message, and backend starts to stream WAL as CopyData
41814181
messages.
4182+
The payload in CopyData message consists of the following format.
41824183
</para>
41834184

41844185
<para>
4185-
The payload in each CopyData message consists of an XLogRecPtr,
4186-
indicating the starting point of the WAL in the message, immediately
4187-
followed by the WAL data itself.
4186+
<variablelist>
4187+
<varlistentry>
4188+
<term>
4189+
XLogData (B)
4190+
</term>
4191+
<listitem>
4192+
<para>
4193+
<variablelist>
4194+
<varlistentry>
4195+
<term>
4196+
Byte1('w')
4197+
</term>
4198+
<listitem>
4199+
<para>
4200+
Identifies the message as WAL data.
4201+
</para>
4202+
</listitem>
4203+
</varlistentry>
4204+
<varlistentry>
4205+
<term>
4206+
Int32
4207+
</term>
4208+
<listitem>
4209+
<para>
4210+
The log file number of the LSN, indicating the starting point of
4211+
the WAL in the message.
4212+
</para>
4213+
</listitem>
4214+
</varlistentry>
4215+
<varlistentry>
4216+
<term>
4217+
Int32
4218+
</term>
4219+
<listitem>
4220+
<para>
4221+
The byte offset of the LSN, indicating the starting point of
4222+
the WAL in the message.
4223+
</para>
4224+
</listitem>
4225+
</varlistentry>
4226+
<varlistentry>
4227+
<term>
4228+
Byte<replaceable>n</replaceable>
4229+
</term>
4230+
<listitem>
4231+
<para>
4232+
Data that forms part of WAL data stream.
4233+
</para>
4234+
</listitem>
4235+
</varlistentry>
4236+
</variablelist>
4237+
</para>
4238+
</listitem>
4239+
</varlistentry>
4240+
</variablelist>
41884241
</para>
41894242
<para>
41904243
A single WAL record is never split across two CopyData messages. When

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
*
1111
*
1212
* IDENTIFICATION
13-
* $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.2 2010/01/20 11:58:44 heikki Exp $
13+
* $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.3 2010/02/03 09:47:19 heikki Exp $
1414
*
1515
*-------------------------------------------------------------------------
1616
*/
@@ -48,8 +48,8 @@ static char *recvBuf = NULL;
4848

4949
/* Prototypes for interface functions */
5050
staticboollibpqrcv_connect(char*conninfo,XLogRecPtrstartpoint);
51-
staticboollibpqrcv_receive(inttimeout,XLogRecPtr*recptr,char**buffer,
52-
int*len);
51+
staticboollibpqrcv_receive(inttimeout,unsignedchar*type,
52+
char**buffer,int*len);
5353
staticvoidlibpqrcv_disconnect(void);
5454

5555
/* Prototypes for private functions */
@@ -236,13 +236,13 @@ libpqrcv_disconnect(void)
236236
}
237237

238238
/*
239-
* Receiveany WAL records available from XLOG stream, blocking for
239+
* Receivea message available from XLOG stream, blocking for
240240
* maximum of 'timeout' ms.
241241
*
242242
* Returns:
243243
*
244-
* True if data was received. *recptr, *buffer and *len are set to
245-
* theWAL location of the received data, buffer holding it, and length,
244+
* True if data was received. *type, *buffer and *len are set to
245+
* thetype of the received data, buffer holding it, and length,
246246
* respectively.
247247
*
248248
* False if no data was available within timeout, or wait was interrupted
@@ -254,7 +254,7 @@ libpqrcv_disconnect(void)
254254
* ereports on error.
255255
*/
256256
staticbool
257-
libpqrcv_receive(inttimeout,XLogRecPtr*recptr,char**buffer,int*len)
257+
libpqrcv_receive(inttimeout,unsignedchar*type,char**buffer,int*len)
258258
{
259259
intrawlen;
260260

@@ -275,14 +275,14 @@ libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
275275

276276
if (PQconsumeInput(streamConn)==0)
277277
ereport(ERROR,
278-
(errmsg("could notread xlog records: %s",
278+
(errmsg("could notreceive data from XLOG stream: %s",
279279
PQerrorMessage(streamConn))));
280280
}
281281
justconnected= false;
282282

283283
/* Receive CopyData message */
284284
rawlen=PQgetCopyData(streamConn,&recvBuf,1);
285-
if (rawlen==0)/* norecords available yet, then return */
285+
if (rawlen==0)/* nodata available yet, then return */
286286
return false;
287287
if (rawlen==-1)/* end-of-streaming or error */
288288
{
@@ -297,22 +297,18 @@ libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
297297
}
298298
PQclear(res);
299299
ereport(ERROR,
300-
(errmsg("could notread xlog records: %s",
300+
(errmsg("could notreceive data from XLOG stream: %s",
301301
PQerrorMessage(streamConn))));
302302
}
303303
if (rawlen<-1)
304304
ereport(ERROR,
305-
(errmsg("could notread xlog records: %s",
305+
(errmsg("could notreceive data from XLOG stream: %s",
306306
PQerrorMessage(streamConn))));
307307

308-
if (rawlen<sizeof(XLogRecPtr))
309-
ereport(ERROR,
310-
(errmsg("invalid WAL message received from primary")));
311-
312-
/* Return received WAL records to caller */
313-
*recptr=*((XLogRecPtr*)recvBuf);
314-
*buffer=recvBuf+sizeof(XLogRecPtr);
315-
*len=rawlen-sizeof(XLogRecPtr);
308+
/* Return received messages to caller */
309+
*type=*((unsignedchar*)recvBuf);
310+
*buffer=recvBuf+sizeof(*type);
311+
*len=rawlen-sizeof(*type);
316312

317313
return true;
318314
}

‎src/backend/replication/walreceiver.c

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
*
3030
*
3131
* IDENTIFICATION
32-
* $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $
32+
* $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.3 2010/02/03 09:47:19 heikki Exp $
3333
*
3434
*-------------------------------------------------------------------------
3535
*/
@@ -135,6 +135,7 @@ static void WalRcvQuickDieHandler(SIGNAL_ARGS);
135135

136136
/* Prototypes for private functions */
137137
staticvoidWalRcvDie(intcode,Datumarg);
138+
staticvoidXLogWalRcvProcessMsg(unsignedchartype,char*buf,Sizelen);
138139
staticvoidXLogWalRcvWrite(char*buf,Sizenbytes,XLogRecPtrrecptr);
139140
staticvoidXLogWalRcvFlush(void);
140141

@@ -258,7 +259,7 @@ WalReceiverMain(void)
258259
/* Loop until end-of-streaming or error */
259260
for (;;)
260261
{
261-
XLogRecPtrrecptr;
262+
unsignedchartype;
262263
char*buf;
263264
intlen;
264265

@@ -287,17 +288,17 @@ WalReceiverMain(void)
287288
}
288289

289290
/* Wait a while for data to arrive */
290-
if (walrcv_receive(NAPTIME_PER_CYCLE,&recptr,&buf,&len))
291+
if (walrcv_receive(NAPTIME_PER_CYCLE,&type,&buf,&len))
291292
{
292-
/*WritereceivedWAL records to disk */
293-
XLogWalRcvWrite(buf,len,recptr);
293+
/*Accept thereceiveddata, and process it */
294+
XLogWalRcvProcessMsg(type,buf,len);
294295

295-
/* Receive any moreWAL records we can without sleeping */
296-
while(walrcv_receive(0,&recptr,&buf,&len))
297-
XLogWalRcvWrite(buf,len,recptr);
296+
/* Receive any moredata we can without sleeping */
297+
while(walrcv_receive(0,&type,&buf,&len))
298+
XLogWalRcvProcessMsg(type,buf,len);
298299

299300
/*
300-
*Now that we've written some records, flush them to disk and
301+
*If we've written some records, flush them to disk and
301302
* let the startup process know about them.
302303
*/
303304
XLogWalRcvFlush();
@@ -375,6 +376,36 @@ WalRcvQuickDieHandler(SIGNAL_ARGS)
375376
exit(2);
376377
}
377378

379+
/*
380+
* Accept the message from XLOG stream, and process it.
381+
*/
382+
staticvoid
383+
XLogWalRcvProcessMsg(unsignedchartype,char*buf,Sizelen)
384+
{
385+
switch (type)
386+
{
387+
case'w':/* WAL records */
388+
{
389+
XLogRecPtrrecptr;
390+
391+
if (len<sizeof(XLogRecPtr))
392+
ereport(ERROR,
393+
(errmsg("invalid WAL message received from primary")));
394+
395+
recptr=*((XLogRecPtr*)buf);
396+
buf+=sizeof(XLogRecPtr);
397+
len-=sizeof(XLogRecPtr);
398+
XLogWalRcvWrite(buf,len,recptr);
399+
break;
400+
}
401+
default:
402+
ereport(ERROR,
403+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
404+
errmsg("invalid replication message type %d",
405+
type)));
406+
}
407+
}
408+
378409
/*
379410
* Write XLOG data to disk.
380411
*/

‎src/backend/replication/walsender.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
*
3131
*
3232
* IDENTIFICATION
33-
* $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.4 2010/01/27 16:41:09 heikki Exp $
33+
* $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.5 2010/02/03 09:47:19 heikki Exp $
3434
*
3535
*-------------------------------------------------------------------------
3636
*/
@@ -659,6 +659,7 @@ XLogSend(StringInfo outMsg)
659659
* have the same byte order. If they have different byte order, we
660660
* don't reach here.
661661
*/
662+
pq_sendbyte(outMsg,'w');
662663
pq_sendbytes(outMsg, (char*)&startptr,sizeof(startptr));
663664

664665
if (endptr.xlogid!=startptr.xlogid)

‎src/include/replication/walreceiver.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*
66
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
77
*
8-
* $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.5 2010/01/27 15:27:51 heikki Exp $
8+
* $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.6 2010/02/03 09:47:19 heikki Exp $
99
*
1010
*-------------------------------------------------------------------------
1111
*/
@@ -66,7 +66,8 @@ extern WalRcvData *WalRcv;
6666
typedefbool (*walrcv_connect_type) (char*conninfo,XLogRecPtrstartpoint);
6767
externPGDLLIMPORTwalrcv_connect_typewalrcv_connect;
6868

69-
typedefbool (*walrcv_receive_type) (inttimeout,XLogRecPtr*recptr,char**buffer,int*len);
69+
typedefbool (*walrcv_receive_type) (inttimeout,unsignedchar*type,
70+
char**buffer,int*len);
7071
externPGDLLIMPORTwalrcv_receive_typewalrcv_receive;
7172

7273
typedefvoid (*walrcv_disconnect_type) (void);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp