Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9ea3b6f

Browse files
Expand usage of macros for protocol characters.
This commit makes use of the existing PqMsg_* macros in more placesand adds new PqReplMsg_* and PqBackupMsg_* macros for use inspecial replication and backup messages, respectively.Author: Dave Cramer <davecramer@gmail.com>Co-authored-by: Fabrízio de Royes Mello <fabriziomello@gmail.com>Reviewed-by: Jacob Champion <jacob.champion@enterprisedb.com>Reviewed-by: Álvaro Herrera <alvherre@kurilemu.de>Reviewed-by: Euler Taveira <euler@eulerto.com>Discussion:https://postgr.es/m/aIECfYfevCUpenBT@nathanDiscussion:https://postgr.es/m/CAFcNs%2Br73NOUb7%2BqKrV4HHEki02CS96Z%2Bx19WaFgE087BWwEng%40mail.gmail.com
1 parent35baa60 commit9ea3b6f

File tree

9 files changed

+70
-43
lines changed

9 files changed

+70
-43
lines changed

‎src/backend/backup/basebackup_copy.c‎

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ bbsink_copystream_begin_backup(bbsink *sink)
143143
buf=palloc(mysink->base.bbs_buffer_length+MAXIMUM_ALIGNOF);
144144
mysink->msgbuffer=buf+ (MAXIMUM_ALIGNOF-1);
145145
mysink->base.bbs_buffer=buf+MAXIMUM_ALIGNOF;
146-
mysink->msgbuffer[0]='d';/* archive or manifest data */
146+
mysink->msgbuffer[0]=PqMsg_CopyData;/* archive or manifest data */
147147

148148
/* Tell client the backup start location. */
149149
SendXlogRecPtrResult(state->startptr,state->starttli);
@@ -170,7 +170,7 @@ bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
170170

171171
ti=list_nth(state->tablespaces,state->tablespace_num);
172172
pq_beginmessage(&buf,PqMsg_CopyData);
173-
pq_sendbyte(&buf,'n');/* New archive */
173+
pq_sendbyte(&buf,PqBackupMsg_NewArchive);
174174
pq_sendstring(&buf,archive_name);
175175
pq_sendstring(&buf,ti->path==NULL ?"" :ti->path);
176176
pq_endmessage(&buf);
@@ -191,7 +191,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
191191
if (mysink->send_to_client)
192192
{
193193
/* Add one because we're also sending a leading type byte. */
194-
pq_putmessage('d',mysink->msgbuffer,len+1);
194+
pq_putmessage(PqMsg_CopyData,mysink->msgbuffer,len+1);
195195
}
196196

197197
/* Consider whether to send a progress report to the client. */
@@ -221,7 +221,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
221221
mysink->last_progress_report_time=now;
222222

223223
pq_beginmessage(&buf,PqMsg_CopyData);
224-
pq_sendbyte(&buf,'p');/* Progress report */
224+
pq_sendbyte(&buf,PqBackupMsg_ProgressReport);
225225
pq_sendint64(&buf,state->bytes_done);
226226
pq_endmessage(&buf);
227227
pq_flush_if_writable();
@@ -247,7 +247,7 @@ bbsink_copystream_end_archive(bbsink *sink)
247247
mysink->bytes_done_at_last_time_check=state->bytes_done;
248248
mysink->last_progress_report_time=GetCurrentTimestamp();
249249
pq_beginmessage(&buf,PqMsg_CopyData);
250-
pq_sendbyte(&buf,'p');/* Progress report */
250+
pq_sendbyte(&buf,PqBackupMsg_ProgressReport);
251251
pq_sendint64(&buf,state->bytes_done);
252252
pq_endmessage(&buf);
253253
pq_flush_if_writable();
@@ -262,7 +262,7 @@ bbsink_copystream_begin_manifest(bbsink *sink)
262262
StringInfoDatabuf;
263263

264264
pq_beginmessage(&buf,PqMsg_CopyData);
265-
pq_sendbyte(&buf,'m');/* Manifest */
265+
pq_sendbyte(&buf,PqBackupMsg_Manifest);
266266
pq_endmessage(&buf);
267267
}
268268

@@ -277,7 +277,7 @@ bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
277277
if (mysink->send_to_client)
278278
{
279279
/* Add one because we're also sending a leading type byte. */
280-
pq_putmessage('d',mysink->msgbuffer,len+1);
280+
pq_putmessage(PqMsg_CopyData,mysink->msgbuffer,len+1);
281281
}
282282
}
283283

‎src/backend/replication/logical/applyparallelworker.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -778,10 +778,10 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
778778

779779
/*
780780
* The first byte of messages sent from leader apply worker to
781-
* parallel apply workers can only be'w'.
781+
* parallel apply workers can only bePqReplMsg_WALData.
782782
*/
783783
c=pq_getmsgbyte(&s);
784-
if (c!='w')
784+
if (c!=PqReplMsg_WALData)
785785
elog(ERROR,"unexpected message \"%c\"",c);
786786

787787
/*

‎src/backend/replication/logical/worker.c‎

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3994,7 +3994,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
39943994

39953995
c=pq_getmsgbyte(&s);
39963996

3997-
if (c=='w')
3997+
if (c==PqReplMsg_WALData)
39983998
{
39993999
XLogRecPtrstart_lsn;
40004000
XLogRecPtrend_lsn;
@@ -4016,7 +4016,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
40164016

40174017
maybe_advance_nonremovable_xid(&rdt_data, false);
40184018
}
4019-
elseif (c=='k')
4019+
elseif (c==PqReplMsg_Keepalive)
40204020
{
40214021
XLogRecPtrend_lsn;
40224022
TimestampTztimestamp;
@@ -4035,7 +4035,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
40354035

40364036
UpdateWorkerStats(last_received,timestamp, true);
40374037
}
4038-
elseif (c=='s')/* Primary status update */
4038+
elseif (c==PqReplMsg_PrimaryStatusUpdate)
40394039
{
40404040
rdt_data.remote_lsn=pq_getmsgint64(&s);
40414041
rdt_data.remote_oldestxid=FullTransactionIdFromU64((uint64)pq_getmsgint64(&s));
@@ -4267,7 +4267,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
42674267
else
42684268
resetStringInfo(reply_message);
42694269

4270-
pq_sendbyte(reply_message,'r');
4270+
pq_sendbyte(reply_message,PqReplMsg_StandbyStatusUpdate);
42714271
pq_sendint64(reply_message,recvpos);/* write */
42724272
pq_sendint64(reply_message,flushpos);/* flush */
42734273
pq_sendint64(reply_message,writepos);/* apply */
@@ -4438,7 +4438,7 @@ request_publisher_status(RetainDeadTuplesData *rdt_data)
44384438
* Send the current time to update the remote walsender's latest reply
44394439
* message received time.
44404440
*/
4441-
pq_sendbyte(request_message,'p');
4441+
pq_sendbyte(request_message,PqReplMsg_PrimaryStatusRequest);
44424442
pq_sendint64(request_message,GetCurrentTimestamp());
44434443

44444444
elog(DEBUG2,"sending publisher status request message");

‎src/backend/replication/walreceiver.c‎

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
826826

827827
switch (type)
828828
{
829-
case'w':/* WAL records */
829+
casePqReplMsg_WALData:
830830
{
831831
StringInfoDataincoming_message;
832832

@@ -850,7 +850,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
850850
XLogWalRcvWrite(buf,len,dataStart,tli);
851851
break;
852852
}
853-
case'k':/* Keepalive */
853+
casePqReplMsg_Keepalive:
854854
{
855855
StringInfoDataincoming_message;
856856

@@ -1130,7 +1130,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
11301130
applyPtr=GetXLogReplayRecPtr(NULL);
11311131

11321132
resetStringInfo(&reply_message);
1133-
pq_sendbyte(&reply_message,'r');
1133+
pq_sendbyte(&reply_message,PqReplMsg_StandbyStatusUpdate);
11341134
pq_sendint64(&reply_message,writePtr);
11351135
pq_sendint64(&reply_message,flushPtr);
11361136
pq_sendint64(&reply_message,applyPtr);
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
12341234

12351235
/* Construct the message and send it. */
12361236
resetStringInfo(&reply_message);
1237-
pq_sendbyte(&reply_message,'h');
1237+
pq_sendbyte(&reply_message,PqReplMsg_HotStandbyFeedback);
12381238
pq_sendint64(&reply_message,GetCurrentTimestamp());
12391239
pq_sendint32(&reply_message,xmin);
12401240
pq_sendint32(&reply_message,xmin_epoch);

‎src/backend/replication/walsender.c‎

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,7 +1534,7 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
15341534

15351535
resetStringInfo(ctx->out);
15361536

1537-
pq_sendbyte(ctx->out,'w');
1537+
pq_sendbyte(ctx->out,PqReplMsg_WALData);
15381538
pq_sendint64(ctx->out,lsn);/* dataStart */
15391539
pq_sendint64(ctx->out,lsn);/* walEnd */
15401540

@@ -2292,16 +2292,18 @@ ProcessRepliesIfAny(void)
22922292
switch (firstchar)
22932293
{
22942294
/*
2295-
* 'd' means a standby reply wrapped in a CopyData packet.
2295+
* PqMsg_CopyData means a standby reply wrapped in a CopyData
2296+
* packet.
22962297
*/
22972298
casePqMsg_CopyData:
22982299
ProcessStandbyMessage();
22992300
received= true;
23002301
break;
23012302

23022303
/*
2303-
* CopyDone means the standby requested to finish streaming.
2304-
* Reply with CopyDone, if we had not sent that already.
2304+
* PqMsg_CopyDone means the standby requested to finish
2305+
* streaming. Reply with CopyDone, if we had not sent that
2306+
* already.
23052307
*/
23062308
casePqMsg_CopyDone:
23072309
if (!streamingDoneSending)
@@ -2315,7 +2317,8 @@ ProcessRepliesIfAny(void)
23152317
break;
23162318

23172319
/*
2318-
* 'X' means that the standby is closing down the socket.
2320+
* PqMsg_Terminate means that the standby is closing down the
2321+
* socket.
23192322
*/
23202323
casePqMsg_Terminate:
23212324
proc_exit(0);
@@ -2350,15 +2353,15 @@ ProcessStandbyMessage(void)
23502353

23512354
switch (msgtype)
23522355
{
2353-
case'r':
2356+
casePqReplMsg_StandbyStatusUpdate:
23542357
ProcessStandbyReplyMessage();
23552358
break;
23562359

2357-
case'h':
2360+
casePqReplMsg_HotStandbyFeedback:
23582361
ProcessStandbyHSFeedbackMessage();
23592362
break;
23602363

2361-
case'p':
2364+
casePqReplMsg_PrimaryStatusRequest:
23622365
ProcessStandbyPSRequestMessage();
23632366
break;
23642367

@@ -2752,7 +2755,7 @@ ProcessStandbyPSRequestMessage(void)
27522755

27532756
/* construct the message... */
27542757
resetStringInfo(&output_message);
2755-
pq_sendbyte(&output_message,'s');
2758+
pq_sendbyte(&output_message,PqReplMsg_PrimaryStatusUpdate);
27562759
pq_sendint64(&output_message,lsn);
27572760
pq_sendint64(&output_message, (int64)U64FromFullTransactionId(fullOldestXidInCommit));
27582761
pq_sendint64(&output_message, (int64)U64FromFullTransactionId(nextFullXid));
@@ -3364,7 +3367,7 @@ XLogSendPhysical(void)
33643367
* OK to read and send the slice.
33653368
*/
33663369
resetStringInfo(&output_message);
3367-
pq_sendbyte(&output_message,'w');
3370+
pq_sendbyte(&output_message,PqReplMsg_WALData);
33683371

33693372
pq_sendint64(&output_message,startptr);/* dataStart */
33703373
pq_sendint64(&output_message,SendRqstPtr);/* walEnd */
@@ -4135,7 +4138,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
41354138

41364139
/* construct the message... */
41374140
resetStringInfo(&output_message);
4138-
pq_sendbyte(&output_message,'k');
4141+
pq_sendbyte(&output_message,PqReplMsg_Keepalive);
41394142
pq_sendint64(&output_message,XLogRecPtrIsInvalid(writePtr) ?sentPtr :writePtr);
41404143
pq_sendint64(&output_message,GetCurrentTimestamp());
41414144
pq_sendbyte(&output_message,requestReply ?1 :0);

‎src/bin/pg_basebackup/pg_basebackup.c‎

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include"fe_utils/option_utils.h"
3636
#include"fe_utils/recovery_gen.h"
3737
#include"getopt_long.h"
38+
#include"libpq/protocol.h"
3839
#include"receivelog.h"
3940
#include"streamutil.h"
4041

@@ -1338,7 +1339,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
13381339
/* Each CopyData message begins with a type byte. */
13391340
switch (GetCopyDataByte(r,copybuf,&cursor))
13401341
{
1341-
case'n':
1342+
casePqBackupMsg_NewArchive:
13421343
{
13431344
/* New archive. */
13441345
char*archive_name;
@@ -1410,7 +1411,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
14101411
break;
14111412
}
14121413

1413-
case'd':
1414+
casePqMsg_CopyData:
14141415
{
14151416
/* Archive or manifest data. */
14161417
if (state->manifest_buffer!=NULL)
@@ -1446,7 +1447,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
14461447
break;
14471448
}
14481449

1449-
case'p':
1450+
casePqBackupMsg_ProgressReport:
14501451
{
14511452
/*
14521453
* Progress report.
@@ -1465,7 +1466,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
14651466
break;
14661467
}
14671468

1468-
case'm':
1469+
casePqBackupMsg_Manifest:
14691470
{
14701471
/*
14711472
* Manifest data will be sent next. This message is not

‎src/bin/pg_basebackup/pg_recvlogical.c‎

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include"getopt_long.h"
2525
#include"libpq-fe.h"
2626
#include"libpq/pqsignal.h"
27+
#include"libpq/protocol.h"
2728
#include"pqexpbuffer.h"
2829
#include"streamutil.h"
2930

@@ -149,7 +150,7 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
149150
LSN_FORMAT_ARGS(output_fsync_lsn),
150151
replication_slot);
151152

152-
replybuf[len]='r';
153+
replybuf[len]=PqReplMsg_StandbyStatusUpdate;
153154
len+=1;
154155
fe_sendint64(output_written_lsn,&replybuf[len]);/* write */
155156
len+=8;
@@ -454,7 +455,7 @@ StreamLogicalLog(void)
454455
}
455456

456457
/* Check the message type. */
457-
if (copybuf[0]=='k')
458+
if (copybuf[0]==PqReplMsg_Keepalive)
458459
{
459460
intpos;
460461
boolreplyRequested;
@@ -466,7 +467,7 @@ StreamLogicalLog(void)
466467
* We just check if the server requested a reply, and ignore the
467468
* rest.
468469
*/
469-
pos=1;/* skip msgtype'k' */
470+
pos=1;/* skip msgtypePqReplMsg_Keepalive */
470471
walEnd=fe_recvint64(&copybuf[pos]);
471472
output_written_lsn=Max(walEnd,output_written_lsn);
472473

@@ -509,7 +510,7 @@ StreamLogicalLog(void)
509510

510511
continue;
511512
}
512-
elseif (copybuf[0]!='w')
513+
elseif (copybuf[0]!=PqReplMsg_WALData)
513514
{
514515
pg_log_error("unrecognized streaming header: \"%c\"",
515516
copybuf[0]);
@@ -521,7 +522,7 @@ StreamLogicalLog(void)
521522
* message. We only need the WAL location field (dataStart), the rest
522523
* of the header is ignored.
523524
*/
524-
hdr_len=1;/* msgtype'w' */
525+
hdr_len=1;/* msgtypePqReplMsg_WALData */
525526
hdr_len+=8;/* dataStart */
526527
hdr_len+=8;/* walEnd */
527528
hdr_len+=8;/* sendTime */

‎src/bin/pg_basebackup/receivelog.c‎

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include"access/xlog_internal.h"
2222
#include"common/logging.h"
2323
#include"libpq-fe.h"
24+
#include"libpq/protocol.h"
2425
#include"receivelog.h"
2526
#include"streamutil.h"
2627

@@ -338,7 +339,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyReque
338339
charreplybuf[1+8+8+8+8+1];
339340
intlen=0;
340341

341-
replybuf[len]='r';
342+
replybuf[len]=PqReplMsg_StandbyStatusUpdate;
342343
len+=1;
343344
fe_sendint64(blockpos,&replybuf[len]);/* write */
344345
len+=8;
@@ -823,13 +824,13 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
823824
}
824825

825826
/* Check the message type. */
826-
if (copybuf[0]=='k')
827+
if (copybuf[0]==PqReplMsg_Keepalive)
827828
{
828829
if (!ProcessKeepaliveMsg(conn,stream,copybuf,r,blockpos,
829830
&last_status))
830831
gotoerror;
831832
}
832-
elseif (copybuf[0]=='w')
833+
elseif (copybuf[0]==PqReplMsg_WALData)
833834
{
834835
if (!ProcessWALDataMsg(conn,stream,copybuf,r,&blockpos))
835836
gotoerror;
@@ -1001,7 +1002,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
10011002
* Parse the keepalive message, enclosed in the CopyData message. We just
10021003
* check if the server requested a reply, and ignore the rest.
10031004
*/
1004-
pos=1;/* skip msgtype'k' */
1005+
pos=1;/* skip msgtypePqReplMsg_Keepalive */
10051006
pos+=8;/* skip walEnd */
10061007
pos+=8;/* skip sendTime */
10071008

@@ -1064,7 +1065,7 @@ ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
10641065
* message. We only need the WAL location field (dataStart), the rest of
10651066
* the header is ignored.
10661067
*/
1067-
hdr_len=1;/* msgtype'w' */
1068+
hdr_len=1;/* msgtypePqReplMsg_WALData */
10681069
hdr_len+=8;/* dataStart */
10691070
hdr_len+=8;/* walEnd */
10701071
hdr_len+=8;/* sendTime */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp