Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7a5f6b4

Browse files
committed
Make logical decoding a part of the rmgr.
Add a new rmgr method, rm_decode, and use that rather than a switchstatement.In preparation for rmgr extensibility.Reviewed-by: Julien RouhaudDiscussion:https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.comDiscussion:https://postgr.es/m/20220118095332.6xtlcjoyxobv6cbk@jrouhaud
1 parenta3d6264 commit7a5f6b4

File tree

8 files changed

+69
-112
lines changed

8 files changed

+69
-112
lines changed

‎src/backend/access/transam/rmgr.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@
2424
#include"commands/dbcommands_xlog.h"
2525
#include"commands/sequence.h"
2626
#include"commands/tablespace.h"
27+
#include"replication/decode.h"
2728
#include"replication/message.h"
2829
#include"replication/origin.h"
2930
#include"storage/standby.h"
3031
#include"utils/relmapper.h"
3132

3233
/* must be kept in sync with RmgrData definition in xlog_internal.h */
33-
#definePG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
34-
{ name, redo, desc, identify, startup, cleanup, mask },
34+
#definePG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
35+
{ name, redo, desc, identify, startup, cleanup, mask, decode },
3536

3637
constRmgrDataRmgrTable[RM_MAX_ID+1]= {
3738
#include"access/rmgrlist.h"

‎src/backend/replication/logical/decode.c

Lines changed: 21 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -43,21 +43,6 @@
4343
#include"replication/snapbuild.h"
4444
#include"storage/standby.h"
4545

46-
typedefstructXLogRecordBuffer
47-
{
48-
XLogRecPtrorigptr;
49-
XLogRecPtrendptr;
50-
XLogReaderState*record;
51-
}XLogRecordBuffer;
52-
53-
/* RMGR Handlers */
54-
staticvoidDecodeXLogOp(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
55-
staticvoidDecodeHeapOp(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
56-
staticvoidDecodeHeap2Op(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
57-
staticvoidDecodeXactOp(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
58-
staticvoidDecodeStandbyOp(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
59-
staticvoidDecodeLogicalMsgOp(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
60-
6146
/* individual record(group)'s handlers */
6247
staticvoidDecodeInsert(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
6348
staticvoidDecodeUpdate(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
@@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
10792
{
10893
XLogRecordBufferbuf;
10994
TransactionIdtxid;
95+
RmgrIdrmid;
11096

11197
buf.origptr=ctx->reader->ReadRecPtr;
11298
buf.endptr=ctx->reader->EndRecPtr;
@@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
127113
buf.origptr);
128114
}
129115

130-
/* cast so we get a warning when new rmgrs are added */
131-
switch ((RmgrId)XLogRecGetRmid(record))
132-
{
133-
/*
134-
* Rmgrs we care about for logical decoding. Add new rmgrs in
135-
* rmgrlist.h's order.
136-
*/
137-
caseRM_XLOG_ID:
138-
DecodeXLogOp(ctx,&buf);
139-
break;
140-
141-
caseRM_XACT_ID:
142-
DecodeXactOp(ctx,&buf);
143-
break;
116+
rmid=XLogRecGetRmid(record);
144117

145-
caseRM_STANDBY_ID:
146-
DecodeStandbyOp(ctx,&buf);
147-
break;
148-
149-
caseRM_HEAP2_ID:
150-
DecodeHeap2Op(ctx,&buf);
151-
break;
152-
153-
caseRM_HEAP_ID:
154-
DecodeHeapOp(ctx,&buf);
155-
break;
156-
157-
caseRM_LOGICALMSG_ID:
158-
DecodeLogicalMsgOp(ctx,&buf);
159-
break;
160-
161-
/*
162-
* Rmgrs irrelevant for logical decoding; they describe stuff not
163-
* represented in logical decoding. Add new rmgrs in rmgrlist.h's
164-
* order.
165-
*/
166-
caseRM_SMGR_ID:
167-
caseRM_CLOG_ID:
168-
caseRM_DBASE_ID:
169-
caseRM_TBLSPC_ID:
170-
caseRM_MULTIXACT_ID:
171-
caseRM_RELMAP_ID:
172-
caseRM_BTREE_ID:
173-
caseRM_HASH_ID:
174-
caseRM_GIN_ID:
175-
caseRM_GIST_ID:
176-
caseRM_SEQ_ID:
177-
caseRM_SPGIST_ID:
178-
caseRM_BRIN_ID:
179-
caseRM_COMMIT_TS_ID:
180-
caseRM_REPLORIGIN_ID:
181-
caseRM_GENERIC_ID:
182-
/* just deal with xid, and done */
183-
ReorderBufferProcessXid(ctx->reorder,XLogRecGetXid(record),
184-
buf.origptr);
185-
break;
186-
caseRM_NEXT_ID:
187-
elog(ERROR,"unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds)XLogRecGetRmid(buf.record));
118+
if (RmgrTable[rmid].rm_decode!=NULL)
119+
RmgrTable[rmid].rm_decode(ctx,&buf);
120+
else
121+
{
122+
/* just deal with xid, and done */
123+
ReorderBufferProcessXid(ctx->reorder,XLogRecGetXid(record),
124+
buf.origptr);
188125
}
189126
}
190127

191128
/*
192129
* Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
193130
*/
194-
staticvoid
195-
DecodeXLogOp(LogicalDecodingContext*ctx,XLogRecordBuffer*buf)
131+
void
132+
xlog_decode(LogicalDecodingContext*ctx,XLogRecordBuffer*buf)
196133
{
197134
SnapBuild*builder=ctx->snapshot_builder;
198135
uint8info=XLogRecGetInfo(buf->record)& ~XLR_INFO_MASK;
@@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
234171
/*
235172
* Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
236173
*/
237-
staticvoid
238-
DecodeXactOp(LogicalDecodingContext*ctx,XLogRecordBuffer*buf)
174+
void
175+
xact_decode(LogicalDecodingContext*ctx,XLogRecordBuffer*buf)
239176
{
240177
SnapBuild*builder=ctx->snapshot_builder;
241178
ReorderBuffer*reorder=ctx->reorder;
@@ -391,8 +328,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
391328
/*
392329
* Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
393330
*/
394-
staticvoid
395-
DecodeStandbyOp(LogicalDecodingContext*ctx,XLogRecordBuffer*buf)
331+
void
332+
standby_decode(LogicalDecodingContext*ctx,XLogRecordBuffer*buf)
396333
{
397334
SnapBuild*builder=ctx->snapshot_builder;
398335
XLogReaderState*r=buf->record;
@@ -437,8 +374,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
437374
/*
438375
* Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
439376
*/
440-
staticvoid
441-
DecodeHeap2Op(LogicalDecodingContext*ctx,XLogRecordBuffer*buf)
377+
void
378+
heap2_decode(LogicalDecodingContext*ctx,XLogRecordBuffer*buf)
442379
{
443380
uint8info=XLogRecGetInfo(buf->record)&XLOG_HEAP_OPMASK;
444381
TransactionIdxid=XLogRecGetXid(buf->record);
@@ -497,8 +434,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
497434
/*
498435
* Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
499436
*/
500-
staticvoid
501-
DecodeHeapOp(LogicalDecodingContext*ctx,XLogRecordBuffer*buf)
437+
void
438+
heap_decode(LogicalDecodingContext*ctx,XLogRecordBuffer*buf)
502439
{
503440
uint8info=XLogRecGetInfo(buf->record)&XLOG_HEAP_OPMASK;
504441
TransactionIdxid=XLogRecGetXid(buf->record);
@@ -619,8 +556,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
619556
/*
620557
* Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
621558
*/
622-
staticvoid
623-
DecodeLogicalMsgOp(LogicalDecodingContext*ctx,XLogRecordBuffer*buf)
559+
void
560+
logicalmsg_decode(LogicalDecodingContext*ctx,XLogRecordBuffer*buf)
624561
{
625562
SnapBuild*builder=ctx->snapshot_builder;
626563
XLogReaderState*r=buf->record;

‎src/bin/pg_rewind/parsexlog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* RmgrNames is an array of resource manager names, to make error messages
2929
* a bit nicer.
3030
*/
31-
#definePG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
31+
#definePG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
3232
name,
3333

3434
staticconstchar*RmgrNames[RM_MAX_ID+1]= {

‎src/bin/pg_waldump/rmgrdesc.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
#include"storage/standbydefs.h"
3333
#include"utils/relmapper.h"
3434

35-
#definePG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
35+
#definePG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
3636
{ name, desc, identify},
3737

3838
constRmgrDescDataRmgrDescTable[RM_MAX_ID+1]= {

‎src/include/access/rmgr.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
1919
* Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
2020
* file format.
2121
*/
22-
#definePG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
22+
#definePG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
2323
symname,
2424

2525
typedefenumRmgrIds

‎src/include/access/rmgrlist.h

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,25 @@
2525
*/
2626

2727
/* symbol name, textual name, redo, desc, identify, startup, cleanup */
28-
PG_RMGR(RM_XLOG_ID,"XLOG",xlog_redo,xlog_desc,xlog_identify,NULL,NULL,NULL)
29-
PG_RMGR(RM_XACT_ID,"Transaction",xact_redo,xact_desc,xact_identify,NULL,NULL,NULL)
30-
PG_RMGR(RM_SMGR_ID,"Storage",smgr_redo,smgr_desc,smgr_identify,NULL,NULL,NULL)
31-
PG_RMGR(RM_CLOG_ID,"CLOG",clog_redo,clog_desc,clog_identify,NULL,NULL,NULL)
32-
PG_RMGR(RM_DBASE_ID,"Database",dbase_redo,dbase_desc,dbase_identify,NULL,NULL,NULL)
33-
PG_RMGR(RM_TBLSPC_ID,"Tablespace",tblspc_redo,tblspc_desc,tblspc_identify,NULL,NULL,NULL)
34-
PG_RMGR(RM_MULTIXACT_ID,"MultiXact",multixact_redo,multixact_desc,multixact_identify,NULL,NULL,NULL)
35-
PG_RMGR(RM_RELMAP_ID,"RelMap",relmap_redo,relmap_desc,relmap_identify,NULL,NULL,NULL)
36-
PG_RMGR(RM_STANDBY_ID,"Standby",standby_redo,standby_desc,standby_identify,NULL,NULL,NULL)
37-
PG_RMGR(RM_HEAP2_ID,"Heap2",heap2_redo,heap2_desc,heap2_identify,NULL,NULL,heap_mask)
38-
PG_RMGR(RM_HEAP_ID,"Heap",heap_redo,heap_desc,heap_identify,NULL,NULL,heap_mask)
39-
PG_RMGR(RM_BTREE_ID,"Btree",btree_redo,btree_desc,btree_identify,btree_xlog_startup,btree_xlog_cleanup,btree_mask)
40-
PG_RMGR(RM_HASH_ID,"Hash",hash_redo,hash_desc,hash_identify,NULL,NULL,hash_mask)
41-
PG_RMGR(RM_GIN_ID,"Gin",gin_redo,gin_desc,gin_identify,gin_xlog_startup,gin_xlog_cleanup,gin_mask)
42-
PG_RMGR(RM_GIST_ID,"Gist",gist_redo,gist_desc,gist_identify,gist_xlog_startup,gist_xlog_cleanup,gist_mask)
43-
PG_RMGR(RM_SEQ_ID,"Sequence",seq_redo,seq_desc,seq_identify,NULL,NULL,seq_mask)
44-
PG_RMGR(RM_SPGIST_ID,"SPGist",spg_redo,spg_desc,spg_identify,spg_xlog_startup,spg_xlog_cleanup,spg_mask)
45-
PG_RMGR(RM_BRIN_ID,"BRIN",brin_redo,brin_desc,brin_identify,NULL,NULL,brin_mask)
46-
PG_RMGR(RM_COMMIT_TS_ID,"CommitTs",commit_ts_redo,commit_ts_desc,commit_ts_identify,NULL,NULL,NULL)
47-
PG_RMGR(RM_REPLORIGIN_ID,"ReplicationOrigin",replorigin_redo,replorigin_desc,replorigin_identify,NULL,NULL,NULL)
48-
PG_RMGR(RM_GENERIC_ID,"Generic",generic_redo,generic_desc,generic_identify,NULL,NULL,generic_mask)
49-
PG_RMGR(RM_LOGICALMSG_ID,"LogicalMessage",logicalmsg_redo,logicalmsg_desc,logicalmsg_identify,NULL,NULL,NULL)
28+
PG_RMGR(RM_XLOG_ID,"XLOG",xlog_redo,xlog_desc,xlog_identify,NULL,NULL,NULL,xlog_decode)
29+
PG_RMGR(RM_XACT_ID,"Transaction",xact_redo,xact_desc,xact_identify,NULL,NULL,NULL,xact_decode)
30+
PG_RMGR(RM_SMGR_ID,"Storage",smgr_redo,smgr_desc,smgr_identify,NULL,NULL,NULL,NULL)
31+
PG_RMGR(RM_CLOG_ID,"CLOG",clog_redo,clog_desc,clog_identify,NULL,NULL,NULL,NULL)
32+
PG_RMGR(RM_DBASE_ID,"Database",dbase_redo,dbase_desc,dbase_identify,NULL,NULL,NULL,NULL)
33+
PG_RMGR(RM_TBLSPC_ID,"Tablespace",tblspc_redo,tblspc_desc,tblspc_identify,NULL,NULL,NULL,NULL)
34+
PG_RMGR(RM_MULTIXACT_ID,"MultiXact",multixact_redo,multixact_desc,multixact_identify,NULL,NULL,NULL,NULL)
35+
PG_RMGR(RM_RELMAP_ID,"RelMap",relmap_redo,relmap_desc,relmap_identify,NULL,NULL,NULL,NULL)
36+
PG_RMGR(RM_STANDBY_ID,"Standby",standby_redo,standby_desc,standby_identify,NULL,NULL,NULL,standby_decode)
37+
PG_RMGR(RM_HEAP2_ID,"Heap2",heap2_redo,heap2_desc,heap2_identify,NULL,NULL,heap_mask,heap2_decode)
38+
PG_RMGR(RM_HEAP_ID,"Heap",heap_redo,heap_desc,heap_identify,NULL,NULL,heap_mask,heap_decode)
39+
PG_RMGR(RM_BTREE_ID,"Btree",btree_redo,btree_desc,btree_identify,btree_xlog_startup,btree_xlog_cleanup,btree_mask,NULL)
40+
PG_RMGR(RM_HASH_ID,"Hash",hash_redo,hash_desc,hash_identify,NULL,NULL,hash_mask,NULL)
41+
PG_RMGR(RM_GIN_ID,"Gin",gin_redo,gin_desc,gin_identify,gin_xlog_startup,gin_xlog_cleanup,gin_mask,NULL)
42+
PG_RMGR(RM_GIST_ID,"Gist",gist_redo,gist_desc,gist_identify,gist_xlog_startup,gist_xlog_cleanup,gist_mask,NULL)
43+
PG_RMGR(RM_SEQ_ID,"Sequence",seq_redo,seq_desc,seq_identify,NULL,NULL,seq_mask,NULL)
44+
PG_RMGR(RM_SPGIST_ID,"SPGist",spg_redo,spg_desc,spg_identify,spg_xlog_startup,spg_xlog_cleanup,spg_mask,NULL)
45+
PG_RMGR(RM_BRIN_ID,"BRIN",brin_redo,brin_desc,brin_identify,NULL,NULL,brin_mask,NULL)
46+
PG_RMGR(RM_COMMIT_TS_ID,"CommitTs",commit_ts_redo,commit_ts_desc,commit_ts_identify,NULL,NULL,NULL,NULL)
47+
PG_RMGR(RM_REPLORIGIN_ID,"ReplicationOrigin",replorigin_redo,replorigin_desc,replorigin_identify,NULL,NULL,NULL,NULL)
48+
PG_RMGR(RM_GENERIC_ID,"Generic",generic_redo,generic_desc,generic_identify,NULL,NULL,generic_mask,NULL)
49+
PG_RMGR(RM_LOGICALMSG_ID,"LogicalMessage",logicalmsg_redo,logicalmsg_desc,logicalmsg_identify,NULL,NULL,NULL,logicalmsg_decode)

‎src/include/access/xlog_internal.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,9 @@ typedef enum
287287
RECOVERY_TARGET_ACTION_SHUTDOWN
288288
}RecoveryTargetAction;
289289

290+
structLogicalDecodingContext;
291+
structXLogRecordBuffer;
292+
290293
/*
291294
* Method table for resource managers.
292295
*
@@ -312,6 +315,8 @@ typedef struct RmgrData
312315
void(*rm_startup) (void);
313316
void(*rm_cleanup) (void);
314317
void(*rm_mask) (char*pagedata,BlockNumberblkno);
318+
void(*rm_decode) (structLogicalDecodingContext*ctx,
319+
structXLogRecordBuffer*buf);
315320
}RmgrData;
316321

317322
externconstRmgrDataRmgrTable[];

‎src/include/replication/decode.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,21 @@
1414
#include"replication/logical.h"
1515
#include"replication/reorderbuffer.h"
1616

17-
voidLogicalDecodingProcessRecord(LogicalDecodingContext*ctx,
17+
typedefstructXLogRecordBuffer
18+
{
19+
XLogRecPtrorigptr;
20+
XLogRecPtrendptr;
21+
XLogReaderState*record;
22+
}XLogRecordBuffer;
23+
24+
externvoidxlog_decode(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
25+
externvoidheap_decode(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
26+
externvoidheap2_decode(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
27+
externvoidxact_decode(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
28+
externvoidstandby_decode(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
29+
externvoidlogicalmsg_decode(LogicalDecodingContext*ctx,XLogRecordBuffer*buf);
30+
31+
externvoidLogicalDecodingProcessRecord(LogicalDecodingContext*ctx,
1832
XLogReaderState*record);
1933

2034
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp