Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdd8675f

Browse files
committed
Set logical decoding hooks
1 parent552f945 commitdd8675f

File tree

8 files changed

+133
-77
lines changed

8 files changed

+133
-77
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#include"nodes/makefuncs.h"
5454
#include"access/htup_details.h"
5555
#include"catalog/indexing.h"
56+
#include"pglogical_output/hooks.h"
5657

5758
#include"multimaster.h"
5859
#include"ddd.h"
@@ -162,6 +163,7 @@ bool MtmDoReplication;
162163
char*MtmDatabaseName;
163164

164165
intMtmNodeId;
166+
intMtmReplicationNodeId;
165167
intMtmArbiterPort;
166168
intMtmNodes;
167169
intMtmConnectAttempts;
@@ -1639,6 +1641,27 @@ void MtmDropNode(int nodeId, bool dropSlot)
16391641
}
16401642
}
16411643

1644+
staticvoid
1645+
MtmReplicationShutdownHook(structPGLogicalShutdownHookArgs*args)
1646+
{
1647+
MtmOnNodeDisconnect(MtmReplicationNodeId);
1648+
}
1649+
1650+
staticbool
1651+
MtmReplicationTxnFilterHook(structPGLogicalTxnFilterArgs*args)
1652+
{
1653+
elog(WARNING,"MtmReplicationTxnFilterHook: args->origin_id=%d, MtmReplicationNodeId=%d",args->origin_id,MtmReplicationNodeId);
1654+
returnargs->origin_id==InvalidRepOriginId||MtmIsRecoveredNode(MtmReplicationNodeId);
1655+
}
1656+
1657+
voidMtmSetupReplicationHooks(structPGLogicalHooks*hooks)
1658+
{
1659+
hooks->shutdown_hook=MtmReplicationShutdownHook;
1660+
hooks->txn_filter_hook=MtmReplicationTxnFilterHook;
1661+
}
1662+
1663+
1664+
16421665
/*
16431666
* -------------------------------------------
16441667
* SQL API functions
@@ -1994,16 +2017,42 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
19942017
caseT_ClosePortalStmt:
19952018
caseT_FetchStmt:
19962019
caseT_DoStmt:
2020+
caseT_CreateTableSpaceStmt:
2021+
caseT_DropTableSpaceStmt:
2022+
caseT_AlterTableSpaceOptionsStmt:
2023+
caseT_TruncateStmt:
2024+
caseT_CommentStmt:/* XXX: we could replicate these */;
19972025
caseT_CopyStmt:
19982026
caseT_PrepareStmt:
19992027
caseT_ExecuteStmt:
2028+
caseT_DeallocateStmt:
2029+
caseT_GrantStmt:/* XXX: we could replicate some of these these */;
2030+
caseT_GrantRoleStmt:
2031+
caseT_AlterDatabaseStmt:
2032+
caseT_AlterDatabaseSetStmt:
20002033
caseT_NotifyStmt:
20012034
caseT_ListenStmt:
20022035
caseT_UnlistenStmt:
20032036
caseT_LoadStmt:
2037+
caseT_ClusterStmt:/* XXX: we could replicate these */;
2038+
caseT_VacuumStmt:
2039+
caseT_ExplainStmt:
2040+
caseT_AlterSystemStmt:
20042041
caseT_VariableSetStmt:
20052042
caseT_VariableShowStmt:
2006-
skipCommand= true;
2043+
caseT_DiscardStmt:
2044+
caseT_CreateEventTrigStmt:
2045+
caseT_AlterEventTrigStmt:
2046+
caseT_CreateRoleStmt:
2047+
caseT_AlterRoleStmt:
2048+
caseT_AlterRoleSetStmt:
2049+
caseT_DropRoleStmt:
2050+
caseT_ReassignOwnedStmt:
2051+
caseT_LockStmt:
2052+
caseT_ConstraintsSetStmt:
2053+
caseT_CheckPointStmt:
2054+
caseT_ReindexStmt:
2055+
skipCommand= true;
20072056
break;
20082057
default:
20092058
skipCommand= false;

‎contrib/mmts/multimaster.h‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#include"bgwpool.h"
66
#include"bkb.h"
77

8+
#include"pglogical_output/hooks.h"
9+
810
#defineMTM_TUPLE_TRACE(fmt, ...)
911
/*
1012
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
@@ -149,6 +151,7 @@ extern char const* const MtmNodeStatusMnem[];
149151
externMtmState*Mtm;
150152

151153
externintMtmNodeId;
154+
externintMtmReplicationNodeId;
152155
externintMtmNodes;
153156
externintMtmArbiterPort;
154157
externchar*MtmDatabaseName;
@@ -192,5 +195,5 @@ extern bool MtmIsRecoveredNode(int nodeId);
192195
externvoidMtmRefreshClusterStatus(boolnowait);
193196
externvoidMtmSwitchClusterMode(MtmNodeStatusmode);
194197
externvoidMtmUpdateNodeConnectionInfo(MtmConnectionInfo*conn,charconst*connStr);
195-
198+
externvoidMtmSetupReplicationHooks(structPGLogicalHooks*hooks);
196199
#endif

‎contrib/mmts/pglogical_hooks.c‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ load_hooks(PGLogicalOutputData *data)
106106
data->hooks.row_filter_hook,
107107
data->hooks.txn_filter_hook,
108108
data->hooks.hooks_private_data);
109+
}
110+
elseif (data->api->setup_hooks)
111+
{
112+
old_ctxt=MemoryContextSwitchTo(data->hooks_mctxt);
113+
(*data->api->setup_hooks)(&data->hooks);
114+
MemoryContextSwitchTo(old_ctxt);
109115
}
110116

111117
if (txn_started)

‎contrib/mmts/pglogical_output.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
343343
data->forward_changeset_origins= false;
344344
}
345345

346-
if (data->hooks_setup_funcname!=NIL)
346+
if (data->hooks_setup_funcname!=NIL||data->api->setup_hooks)
347347
{
348348

349349
data->hooks_mctxt=AllocSetContextCreate(ctx->context,

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 66 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,7 @@
3737

3838
#include"multimaster.h"
3939

40-
typedefstructPGLogicalProtoMM
41-
{
42-
PGLogicalProtoAPIapi;
43-
intnodeId;
44-
boolisLocal;
45-
}PGLogicalProtoMM;
40+
staticboolMtmIsFilteredTxn;
4641

4742
staticvoidpglogical_write_rel(StringInfoout,PGLogicalOutputData*data,Relationrel);
4843

@@ -72,30 +67,31 @@ static char decide_datum_transfer(Form_pg_attribute att,
7267
staticvoid
7368
pglogical_write_rel(StringInfoout,PGLogicalOutputData*data,Relationrel)
7469
{
75-
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
76-
if (!mm->isLocal) {
77-
constchar*nspname;
78-
uint8nspnamelen;
79-
constchar*relname;
80-
uint8relnamelen;
81-
82-
pq_sendbyte(out,'R');/* sending RELATION */
83-
84-
nspname=get_namespace_name(rel->rd_rel->relnamespace);
85-
if (nspname==NULL)
86-
elog(ERROR,"cache lookup failed for namespace %u",
87-
rel->rd_rel->relnamespace);
88-
nspnamelen=strlen(nspname)+1;
89-
90-
relname=NameStr(rel->rd_rel->relname);
91-
relnamelen=strlen(relname)+1;
92-
93-
pq_sendbyte(out,nspnamelen);/* schema name length */
94-
pq_sendbytes(out,nspname,nspnamelen);
95-
96-
pq_sendbyte(out,relnamelen);/* table name length */
97-
pq_sendbytes(out,relname,relnamelen);
98-
}
70+
constchar*nspname;
71+
uint8nspnamelen;
72+
constchar*relname;
73+
uint8relnamelen;
74+
75+
if (MtmIsFilteredTxn) {
76+
return;
77+
}
78+
79+
pq_sendbyte(out,'R');/* sending RELATION */
80+
81+
nspname=get_namespace_name(rel->rd_rel->relnamespace);
82+
if (nspname==NULL)
83+
elog(ERROR,"cache lookup failed for namespace %u",
84+
rel->rd_rel->relnamespace);
85+
nspnamelen=strlen(nspname)+1;
86+
87+
relname=NameStr(rel->rd_rel->relname);
88+
relnamelen=strlen(relname)+1;
89+
90+
pq_sendbyte(out,nspnamelen);/* schema name length */
91+
pq_sendbytes(out,nspname,nspnamelen);
92+
93+
pq_sendbyte(out,relnamelen);/* table name length */
94+
pq_sendbytes(out,relname,relnamelen);
9995
}
10096

10197
/*
@@ -105,21 +101,19 @@ static void
105101
pglogical_write_begin(StringInfoout,PGLogicalOutputData*data,
106102
ReorderBufferTXN*txn)
107103
{
108-
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
104+
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
109105
csn_tcsn=MtmTransactionSnapshot(txn->xid);
110-
boolisRecovery=MtmIsRecoveredNode(mm->nodeId);
111106
MTM_TRACE("pglogical_write_begin %d CSN=%ld\n",txn->xid,csn);
112-
if (csn==INVALID_CSN&& !isRecovery) {
113-
//Assert(txn->origin_id != InvalidRepOriginId);
114-
mm->isLocal= true;
115-
}else {
116-
mm->isLocal= false;
117-
//Assert(txn->origin_id == InvalidRepOriginId || isRecovery);
118-
pq_sendbyte(out,'B');/* BEGIN */
107+
108+
if (csn==INVALID_CSN&& !isRecovery) {
109+
MtmIsFilteredTxn= true;
110+
}else {
111+
pq_sendbyte(out,'B');/* BEGIN */
119112
pq_sendint(out,MtmNodeId,4);
120113
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
121-
pq_sendint64(out,csn);
122-
}
114+
pq_sendint64(out,csn);
115+
MtmIsFilteredTxn= false;
116+
}
123117
}
124118

125119
/*
@@ -129,9 +123,11 @@ static void
129123
pglogical_write_commit(StringInfoout,PGLogicalOutputData*data,
130124
ReorderBufferTXN*txn,XLogRecPtrcommit_lsn)
131125
{
132-
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
133126
uint8flags=0;
134127

128+
if (MtmIsFilteredTxn) {
129+
return;
130+
}
135131
if (txn->xact_action==XLOG_XACT_COMMIT)
136132
flags=PGLOGICAL_COMMIT;
137133
elseif (txn->xact_action==XLOG_XACT_PREPARE)
@@ -143,18 +139,19 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
143139
else
144140
Assert(false);
145141

146-
142+
#if0
147143
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
148144
if (mm->isLocal) {
149145
return;
150146
}
151147
}else {
152148
csn_tcsn=MtmTransactionSnapshot(txn->xid);
153-
boolisRecovery=MtmIsRecoveredNode(mm->nodeId);
149+
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
154150
if (csn==INVALID_CSN&& !isRecovery) {
155151
return;
156152
}
157153
}
154+
#endif
158155

159156
pq_sendbyte(out,'C');/* sending COMMIT */
160157

@@ -185,11 +182,10 @@ static void
185182
pglogical_write_insert(StringInfoout,PGLogicalOutputData*data,
186183
Relationrel,HeapTuplenewtuple)
187184
{
188-
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
189-
if (!mm->isLocal) {
190-
pq_sendbyte(out,'I');/* action INSERT */
191-
pglogical_write_tuple(out,data,rel,newtuple);
192-
}
185+
if (!MtmIsFilteredTxn) {
186+
pq_sendbyte(out,'I');/* action INSERT */
187+
pglogical_write_tuple(out,data,rel,newtuple);
188+
}
193189
}
194190

195191
/*
@@ -199,32 +195,31 @@ static void
199195
pglogical_write_update(StringInfoout,PGLogicalOutputData*data,
200196
Relationrel,HeapTupleoldtuple,HeapTuplenewtuple)
201197
{
202-
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
203-
if (!mm->isLocal) {
204-
pq_sendbyte(out,'U');/* action UPDATE */
205-
/* FIXME support whole tuple (O tuple type) */
206-
if (oldtuple!=NULL)
207-
{
208-
pq_sendbyte(out,'K');/* old key follows */
209-
pglogical_write_tuple(out,data,rel,oldtuple);
210-
}
211-
212-
pq_sendbyte(out,'N');/* new tuple follows */
213-
pglogical_write_tuple(out,data,rel,newtuple);
214-
}
198+
if (!MtmIsFilteredTxn) {
199+
pq_sendbyte(out,'U');/* action UPDATE */
200+
/* FIXME support whole tuple (O tuple type) */
201+
if (oldtuple!=NULL)
202+
{
203+
pq_sendbyte(out,'K');/* old key follows */
204+
pglogical_write_tuple(out,data,rel,oldtuple);
205+
}
206+
207+
pq_sendbyte(out,'N');/* new tuple follows */
208+
pglogical_write_tuple(out,data,rel,newtuple);
209+
}
215210
}
211+
216212
/*
217213
* Write DELETE to the output stream.
218214
*/
219215
staticvoid
220216
pglogical_write_delete(StringInfoout,PGLogicalOutputData*data,
221217
Relationrel,HeapTupleoldtuple)
222218
{
223-
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
224-
if (!mm->isLocal) {
225-
pq_sendbyte(out,'D');/* action DELETE */
226-
pglogical_write_tuple(out,data,rel,oldtuple);
227-
}
219+
if (!MtmIsFilteredTxn) {
220+
pq_sendbyte(out,'D');/* action DELETE */
221+
pglogical_write_tuple(out,data,rel,oldtuple);
222+
}
228223
}
229224

230225
/*
@@ -422,16 +417,16 @@ decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
422417
PGLogicalProtoAPI*
423418
pglogical_init_api(PGLogicalProtoTypetyp)
424419
{
425-
PGLogicalProtoMM*pmm=palloc0(sizeof(PGLogicalProtoMM));
426-
PGLogicalProtoAPI*res=&pmm->api;
427-
pmm->isLocal= false;
428-
sscanf(MyReplicationSlot->data.name.data,MULTIMASTER_SLOT_PATTERN,&pmm->nodeId);
420+
PGLogicalProtoAPI*res=palloc0(sizeof(PGLogicalProtoAPI));
421+
sscanf(MyReplicationSlot->data.name.data,MULTIMASTER_SLOT_PATTERN,&MtmReplicationNodeId);
422+
elog(WARNING,"%d: PRGLOGICAL init API for slot %s node %d",MyProcPid,MyReplicationSlot->data.name.data,MtmReplicationNodeId);
429423
res->write_rel=pglogical_write_rel;
430424
res->write_begin=pglogical_write_begin;
431425
res->write_commit=pglogical_write_commit;
432426
res->write_insert=pglogical_write_insert;
433427
res->write_update=pglogical_write_update;
434428
res->write_delete=pglogical_write_delete;
429+
res->setup_hooks=MtmSetupReplicationHooks;
435430
res->write_startup_message=write_startup_message;
436431
returnres;
437432
}

‎contrib/mmts/pglogical_proto.h‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ typedef void (*pglogical_write_delete_fn)(StringInfo out, PGLogicalOutputData *d
3333

3434
typedefvoid (*write_startup_message_fn)(StringInfoout,List*msg);
3535

36+
typedefvoid (*pglogical_setup_hooks_fn)(structPGLogicalHooks*hooks);
37+
3638
typedefstructPGLogicalProtoAPI
3739
{
3840
pglogical_write_rel_fnwrite_rel;
@@ -42,7 +44,8 @@ typedef struct PGLogicalProtoAPI
4244
pglogical_write_insert_fnwrite_insert;
4345
pglogical_write_update_fnwrite_update;
4446
pglogical_write_delete_fnwrite_delete;
45-
write_startup_message_fnwrite_startup_message;
47+
pglogical_setup_hooks_fnsetup_hooks;
48+
write_startup_message_fnwrite_startup_message;
4649
}PGLogicalProtoAPI;
4750

4851

‎contrib/pglogical_output/pglogical_output.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ static void pg_decode_shutdown(LogicalDecodingContext * ctx)
559559
{
560560
PGLogicalOutputData*data= (PGLogicalOutputData*)ctx->output_plugin_private;
561561

562-
call_shutdown_hook(data);
562+
call_shu\tdown_hook(data);
563563

564564
pglogical_destroy_relmetacache();
565565

‎src/backend/tcop/utility.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2313,7 +2313,7 @@ CreateCommandTag(Node *parsetree)
23132313
tag="ALTER DATABASE";
23142314
break;
23152315

2316-
caseT_AlterDatabaseSetStmt:
2316+
caseT_AlterDatabaseSetStmt:
23172317
tag="ALTER DATABASE";
23182318
break;
23192319

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp