Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit644f0d7

Browse files
author
Amit Kapila
committed
Use Enum for top level logical replication message types.
Logical replication protocol uses a single byte character to identify amessage type in logical replication protocol. The code uses stringliterals for the same. Use Enum so that1. All the string literals used can be found at a single place. Thismakes it easy to add more types without the risk of conflicts.2. It's easy to locate the code handling a given message type.3. When used with switch statements, it is easy to identify the missingcases using -Wswitch.Author: Ashutosh BapatReviewed-by: Kyotaro Horiguchi, Andres Freund, Peter Smith and Amit KapilaDiscussion:https://postgr.es/m/CAExHW5uPzQ7L0oAd_ENyvaiYMOPgkrAoJpE+ZY5-obdcVT6NPg@mail.gmail.com
1 parenta929e17 commit644f0d7

File tree

3 files changed

+83
-57
lines changed

3 files changed

+83
-57
lines changed

‎src/backend/replication/logical/proto.c

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ static const char *logicalrep_read_namespace(StringInfo in);
4444
void
4545
logicalrep_write_begin(StringInfoout,ReorderBufferTXN*txn)
4646
{
47-
pq_sendbyte(out,'B');/* BEGIN */
47+
pq_sendbyte(out,LOGICAL_REP_MSG_BEGIN);
4848

4949
/* fixed fields */
5050
pq_sendint64(out,txn->final_lsn);
@@ -76,7 +76,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
7676
{
7777
uint8flags=0;
7878

79-
pq_sendbyte(out,'C');/* sending COMMIT */
79+
pq_sendbyte(out,LOGICAL_REP_MSG_COMMIT);
8080

8181
/* send the flags field (unused for now) */
8282
pq_sendbyte(out,flags);
@@ -112,7 +112,7 @@ void
112112
logicalrep_write_origin(StringInfoout,constchar*origin,
113113
XLogRecPtrorigin_lsn)
114114
{
115-
pq_sendbyte(out,'O');/* ORIGIN */
115+
pq_sendbyte(out,LOGICAL_REP_MSG_ORIGIN);
116116

117117
/* fixed fields */
118118
pq_sendint64(out,origin_lsn);
@@ -141,7 +141,7 @@ void
141141
logicalrep_write_insert(StringInfoout,TransactionIdxid,Relationrel,
142142
HeapTuplenewtuple,boolbinary)
143143
{
144-
pq_sendbyte(out,'I');/* action INSERT */
144+
pq_sendbyte(out,LOGICAL_REP_MSG_INSERT);
145145

146146
/* transaction ID (if not valid, we're not streaming) */
147147
if (TransactionIdIsValid(xid))
@@ -185,7 +185,7 @@ void
185185
logicalrep_write_update(StringInfoout,TransactionIdxid,Relationrel,
186186
HeapTupleoldtuple,HeapTuplenewtuple,boolbinary)
187187
{
188-
pq_sendbyte(out,'U');/* action UPDATE */
188+
pq_sendbyte(out,LOGICAL_REP_MSG_UPDATE);
189189

190190
Assert(rel->rd_rel->relreplident==REPLICA_IDENTITY_DEFAULT||
191191
rel->rd_rel->relreplident==REPLICA_IDENTITY_FULL||
@@ -263,7 +263,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
263263
rel->rd_rel->relreplident==REPLICA_IDENTITY_FULL||
264264
rel->rd_rel->relreplident==REPLICA_IDENTITY_INDEX);
265265

266-
pq_sendbyte(out,'D');/* action DELETE */
266+
pq_sendbyte(out,LOGICAL_REP_MSG_DELETE);
267267

268268
/* transaction ID (if not valid, we're not streaming) */
269269
if (TransactionIdIsValid(xid))
@@ -317,7 +317,7 @@ logicalrep_write_truncate(StringInfo out,
317317
inti;
318318
uint8flags=0;
319319

320-
pq_sendbyte(out,'T');/* action TRUNCATE */
320+
pq_sendbyte(out,LOGICAL_REP_MSG_TRUNCATE);
321321

322322
/* transaction ID (if not valid, we're not streaming) */
323323
if (TransactionIdIsValid(xid))
@@ -369,7 +369,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
369369
{
370370
char*relname;
371371

372-
pq_sendbyte(out,'R');/* sending RELATION */
372+
pq_sendbyte(out,LOGICAL_REP_MSG_RELATION);
373373

374374
/* transaction ID (if not valid, we're not streaming) */
375375
if (TransactionIdIsValid(xid))
@@ -425,7 +425,7 @@ logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
425425
HeapTupletup;
426426
Form_pg_typetyptup;
427427

428-
pq_sendbyte(out,'Y');/* sending TYPE */
428+
pq_sendbyte(out,LOGICAL_REP_MSG_TYPE);
429429

430430
/* transaction ID (if not valid, we're not streaming) */
431431
if (TransactionIdIsValid(xid))
@@ -755,7 +755,7 @@ void
755755
logicalrep_write_stream_start(StringInfoout,
756756
TransactionIdxid,boolfirst_segment)
757757
{
758-
pq_sendbyte(out,'S');/* action STREAM START */
758+
pq_sendbyte(out,LOGICAL_REP_MSG_STREAM_START);
759759

760760
Assert(TransactionIdIsValid(xid));
761761

@@ -788,7 +788,7 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment)
788788
void
789789
logicalrep_write_stream_stop(StringInfoout)
790790
{
791-
pq_sendbyte(out,'E');/* action STREAM END */
791+
pq_sendbyte(out,LOGICAL_REP_MSG_STREAM_END);
792792
}
793793

794794
/*
@@ -800,7 +800,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
800800
{
801801
uint8flags=0;
802802

803-
pq_sendbyte(out,'c');/* action STREAM COMMIT */
803+
pq_sendbyte(out,LOGICAL_REP_MSG_STREAM_COMMIT);
804804

805805
Assert(TransactionIdIsValid(txn->xid));
806806

@@ -849,7 +849,7 @@ void
849849
logicalrep_write_stream_abort(StringInfoout,TransactionIdxid,
850850
TransactionIdsubxid)
851851
{
852-
pq_sendbyte(out,'A');/* action STREAM ABORT */
852+
pq_sendbyte(out,LOGICAL_REP_MSG_STREAM_ABORT);
853853

854854
Assert(TransactionIdIsValid(xid)&&TransactionIdIsValid(subxid));
855855

‎src/backend/replication/logical/worker.c

Lines changed: 43 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1896,67 +1896,66 @@ apply_handle_truncate(StringInfo s)
18961896
staticvoid
18971897
apply_dispatch(StringInfos)
18981898
{
1899-
charaction=pq_getmsgbyte(s);
1899+
LogicalRepMsgTypeaction=pq_getmsgbyte(s);
19001900

19011901
switch (action)
19021902
{
1903-
/* BEGIN */
1904-
case'B':
1903+
caseLOGICAL_REP_MSG_BEGIN:
19051904
apply_handle_begin(s);
1906-
break;
1907-
/* COMMIT */
1908-
case'C':
1905+
return;
1906+
1907+
caseLOGICAL_REP_MSG_COMMIT:
19091908
apply_handle_commit(s);
1910-
break;
1911-
/* INSERT */
1912-
case'I':
1909+
return;
1910+
1911+
caseLOGICAL_REP_MSG_INSERT:
19131912
apply_handle_insert(s);
1914-
break;
1915-
/* UPDATE */
1916-
case'U':
1913+
return;
1914+
1915+
caseLOGICAL_REP_MSG_UPDATE:
19171916
apply_handle_update(s);
1918-
break;
1919-
/* DELETE */
1920-
case'D':
1917+
return;
1918+
1919+
caseLOGICAL_REP_MSG_DELETE:
19211920
apply_handle_delete(s);
1922-
break;
1923-
/* TRUNCATE */
1924-
case'T':
1921+
return;
1922+
1923+
caseLOGICAL_REP_MSG_TRUNCATE:
19251924
apply_handle_truncate(s);
1926-
break;
1927-
/* RELATION */
1928-
case'R':
1925+
return;
1926+
1927+
caseLOGICAL_REP_MSG_RELATION:
19291928
apply_handle_relation(s);
1930-
break;
1931-
/* TYPE */
1932-
case'Y':
1929+
return;
1930+
1931+
caseLOGICAL_REP_MSG_TYPE:
19331932
apply_handle_type(s);
1934-
break;
1935-
/* ORIGIN */
1936-
case'O':
1933+
return;
1934+
1935+
caseLOGICAL_REP_MSG_ORIGIN:
19371936
apply_handle_origin(s);
1938-
break;
1939-
/* STREAM START */
1940-
case'S':
1937+
return;
1938+
1939+
caseLOGICAL_REP_MSG_STREAM_START:
19411940
apply_handle_stream_start(s);
1942-
break;
1943-
/* STREAM END */
1944-
case'E':
1941+
return;
1942+
1943+
caseLOGICAL_REP_MSG_STREAM_END:
19451944
apply_handle_stream_stop(s);
1946-
break;
1947-
/* STREAM ABORT */
1948-
case'A':
1945+
return;
1946+
1947+
caseLOGICAL_REP_MSG_STREAM_ABORT:
19491948
apply_handle_stream_abort(s);
1950-
break;
1951-
/* STREAM COMMIT */
1952-
case'c':
1949+
return;
1950+
1951+
caseLOGICAL_REP_MSG_STREAM_COMMIT:
19531952
apply_handle_stream_commit(s);
1954-
break;
1955-
default:
1956-
ereport(ERROR,
1957-
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1958-
errmsg("invalid logical replication message type \"%c\"",action)));
1953+
return;
19591954
}
1955+
1956+
ereport(ERROR,
1957+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1958+
errmsg("invalid logical replication message type \"%c\"",action)));
19601959
}
19611960

19621961
/*

‎src/include/replication/logicalproto.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,33 @@
3333
#defineLOGICALREP_PROTO_STREAM_VERSION_NUM 2
3434
#defineLOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM
3535

36+
/*
37+
* Logical message types
38+
*
39+
* Used by logical replication wire protocol.
40+
*
41+
* Note: though this is an enum, the values are used to identify message types
42+
* in logical replication protocol, which uses a single byte to identify a
43+
* message type. Hence the values should be single byte wide and preferrably
44+
* human readable characters.
45+
*/
46+
typedefenumLogicalRepMsgType
47+
{
48+
LOGICAL_REP_MSG_BEGIN='B',
49+
LOGICAL_REP_MSG_COMMIT='C',
50+
LOGICAL_REP_MSG_ORIGIN='O',
51+
LOGICAL_REP_MSG_INSERT='I',
52+
LOGICAL_REP_MSG_UPDATE='U',
53+
LOGICAL_REP_MSG_DELETE='D',
54+
LOGICAL_REP_MSG_TRUNCATE='T',
55+
LOGICAL_REP_MSG_RELATION='R',
56+
LOGICAL_REP_MSG_TYPE='Y',
57+
LOGICAL_REP_MSG_STREAM_START='S',
58+
LOGICAL_REP_MSG_STREAM_END='E',
59+
LOGICAL_REP_MSG_STREAM_COMMIT='c',
60+
LOGICAL_REP_MSG_STREAM_ABORT='A'
61+
}LogicalRepMsgType;
62+
3663
/*
3764
* This struct stores a tuple received via logical replication.
3865
* Keep in mind that the columns correspond to the *remote* table.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp