Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit32297b0

Browse files
knizhnikkelvich
authored andcommitted
Set logical decoding hooks
1 parent0fcf740 commit32297b0

File tree

6 files changed

+131
-75
lines changed

6 files changed

+131
-75
lines changed

‎multimaster.c

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#include"nodes/makefuncs.h"
5454
#include"access/htup_details.h"
5555
#include"catalog/indexing.h"
56+
#include"pglogical_output/hooks.h"
5657

5758
#include"multimaster.h"
5859
#include"ddd.h"
@@ -161,6 +162,7 @@ bool MtmDoReplication;
161162
char*MtmDatabaseName;
162163

163164
intMtmNodeId;
165+
intMtmReplicationNodeId;
164166
intMtmArbiterPort;
165167
intMtmNodes;
166168
intMtmConnectAttempts;
@@ -1637,6 +1639,27 @@ void MtmDropNode(int nodeId, bool dropSlot)
16371639
}
16381640
}
16391641

1642+
staticvoid
1643+
MtmReplicationShutdownHook(structPGLogicalShutdownHookArgs*args)
1644+
{
1645+
MtmOnNodeDisconnect(MtmReplicationNodeId);
1646+
}
1647+
1648+
staticbool
1649+
MtmReplicationTxnFilterHook(structPGLogicalTxnFilterArgs*args)
1650+
{
1651+
elog(WARNING,"MtmReplicationTxnFilterHook: args->origin_id=%d, MtmReplicationNodeId=%d",args->origin_id,MtmReplicationNodeId);
1652+
returnargs->origin_id==InvalidRepOriginId||MtmIsRecoveredNode(MtmReplicationNodeId);
1653+
}
1654+
1655+
voidMtmSetupReplicationHooks(structPGLogicalHooks*hooks)
1656+
{
1657+
hooks->shutdown_hook=MtmReplicationShutdownHook;
1658+
hooks->txn_filter_hook=MtmReplicationTxnFilterHook;
1659+
}
1660+
1661+
1662+
16401663
/*
16411664
* -------------------------------------------
16421665
* SQL API functions
@@ -1988,16 +2011,42 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
19882011
caseT_ClosePortalStmt:
19892012
caseT_FetchStmt:
19902013
caseT_DoStmt:
2014+
caseT_CreateTableSpaceStmt:
2015+
caseT_DropTableSpaceStmt:
2016+
caseT_AlterTableSpaceOptionsStmt:
2017+
caseT_TruncateStmt:
2018+
caseT_CommentStmt:/* XXX: we could replicate these */;
19912019
caseT_CopyStmt:
19922020
caseT_PrepareStmt:
19932021
caseT_ExecuteStmt:
2022+
caseT_DeallocateStmt:
2023+
caseT_GrantStmt:/* XXX: we could replicate some of these these */;
2024+
caseT_GrantRoleStmt:
2025+
caseT_AlterDatabaseStmt:
2026+
caseT_AlterDatabaseSetStmt:
19942027
caseT_NotifyStmt:
19952028
caseT_ListenStmt:
19962029
caseT_UnlistenStmt:
19972030
caseT_LoadStmt:
2031+
caseT_ClusterStmt:/* XXX: we could replicate these */;
2032+
caseT_VacuumStmt:
2033+
caseT_ExplainStmt:
2034+
caseT_AlterSystemStmt:
19982035
caseT_VariableSetStmt:
19992036
caseT_VariableShowStmt:
2000-
skipCommand= true;
2037+
caseT_DiscardStmt:
2038+
caseT_CreateEventTrigStmt:
2039+
caseT_AlterEventTrigStmt:
2040+
caseT_CreateRoleStmt:
2041+
caseT_AlterRoleStmt:
2042+
caseT_AlterRoleSetStmt:
2043+
caseT_DropRoleStmt:
2044+
caseT_ReassignOwnedStmt:
2045+
caseT_LockStmt:
2046+
caseT_ConstraintsSetStmt:
2047+
caseT_CheckPointStmt:
2048+
caseT_ReindexStmt:
2049+
skipCommand= true;
20012050
break;
20022051
default:
20032052
skipCommand= false;

‎multimaster.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#include"bgwpool.h"
66
#include"bkb.h"
77

8+
#include"pglogical_output/hooks.h"
9+
810
#defineMTM_TUPLE_TRACE(fmt, ...)
911
/*
1012
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
@@ -149,6 +151,7 @@ extern char const* const MtmNodeStatusMnem[];
149151
externMtmState*Mtm;
150152

151153
externintMtmNodeId;
154+
externintMtmReplicationNodeId;
152155
externintMtmNodes;
153156
externintMtmArbiterPort;
154157
externchar*MtmDatabaseName;
@@ -192,5 +195,5 @@ extern bool MtmIsRecoveredNode(int nodeId);
192195
externvoidMtmRefreshClusterStatus(boolnowait);
193196
externvoidMtmSwitchClusterMode(MtmNodeStatusmode);
194197
externvoidMtmUpdateNodeConnectionInfo(MtmConnectionInfo*conn,charconst*connStr);
195-
198+
externvoidMtmSetupReplicationHooks(structPGLogicalHooks*hooks);
196199
#endif

‎pglogical_hooks.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ load_hooks(PGLogicalOutputData *data)
106106
data->hooks.row_filter_hook,
107107
data->hooks.txn_filter_hook,
108108
data->hooks.hooks_private_data);
109+
}
110+
elseif (data->api->setup_hooks)
111+
{
112+
old_ctxt=MemoryContextSwitchTo(data->hooks_mctxt);
113+
(*data->api->setup_hooks)(&data->hooks);
114+
MemoryContextSwitchTo(old_ctxt);
109115
}
110116

111117
if (txn_started)

‎pglogical_output.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
343343
data->forward_changeset_origins= false;
344344
}
345345

346-
if (data->hooks_setup_funcname!=NIL)
346+
if (data->hooks_setup_funcname!=NIL||data->api->setup_hooks)
347347
{
348348

349349
data->hooks_mctxt=AllocSetContextCreate(ctx->context,

‎pglogical_proto.c

Lines changed: 66 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,7 @@
3737

3838
#include"multimaster.h"
3939

40-
typedefstructPGLogicalProtoMM
41-
{
42-
PGLogicalProtoAPIapi;
43-
intnodeId;
44-
boolisLocal;
45-
}PGLogicalProtoMM;
40+
staticboolMtmIsFilteredTxn;
4641

4742
staticvoidpglogical_write_rel(StringInfoout,PGLogicalOutputData*data,Relationrel);
4843

@@ -72,30 +67,31 @@ static char decide_datum_transfer(Form_pg_attribute att,
7267
staticvoid
7368
pglogical_write_rel(StringInfoout,PGLogicalOutputData*data,Relationrel)
7469
{
75-
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
76-
if (!mm->isLocal) {
77-
constchar*nspname;
78-
uint8nspnamelen;
79-
constchar*relname;
80-
uint8relnamelen;
81-
82-
pq_sendbyte(out,'R');/* sending RELATION */
83-
84-
nspname=get_namespace_name(rel->rd_rel->relnamespace);
85-
if (nspname==NULL)
86-
elog(ERROR,"cache lookup failed for namespace %u",
87-
rel->rd_rel->relnamespace);
88-
nspnamelen=strlen(nspname)+1;
89-
90-
relname=NameStr(rel->rd_rel->relname);
91-
relnamelen=strlen(relname)+1;
92-
93-
pq_sendbyte(out,nspnamelen);/* schema name length */
94-
pq_sendbytes(out,nspname,nspnamelen);
95-
96-
pq_sendbyte(out,relnamelen);/* table name length */
97-
pq_sendbytes(out,relname,relnamelen);
98-
}
70+
constchar*nspname;
71+
uint8nspnamelen;
72+
constchar*relname;
73+
uint8relnamelen;
74+
75+
if (MtmIsFilteredTxn) {
76+
return;
77+
}
78+
79+
pq_sendbyte(out,'R');/* sending RELATION */
80+
81+
nspname=get_namespace_name(rel->rd_rel->relnamespace);
82+
if (nspname==NULL)
83+
elog(ERROR,"cache lookup failed for namespace %u",
84+
rel->rd_rel->relnamespace);
85+
nspnamelen=strlen(nspname)+1;
86+
87+
relname=NameStr(rel->rd_rel->relname);
88+
relnamelen=strlen(relname)+1;
89+
90+
pq_sendbyte(out,nspnamelen);/* schema name length */
91+
pq_sendbytes(out,nspname,nspnamelen);
92+
93+
pq_sendbyte(out,relnamelen);/* table name length */
94+
pq_sendbytes(out,relname,relnamelen);
9995
}
10096

10197
/*
@@ -105,21 +101,19 @@ static void
105101
pglogical_write_begin(StringInfoout,PGLogicalOutputData*data,
106102
ReorderBufferTXN*txn)
107103
{
108-
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
104+
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
109105
csn_tcsn=MtmTransactionSnapshot(txn->xid);
110-
boolisRecovery=MtmIsRecoveredNode(mm->nodeId);
111106
MTM_TRACE("pglogical_write_begin %d CSN=%ld\n",txn->xid,csn);
112-
if (csn==INVALID_CSN&& !isRecovery) {
113-
//Assert(txn->origin_id != InvalidRepOriginId);
114-
mm->isLocal= true;
115-
}else {
116-
mm->isLocal= false;
117-
//Assert(txn->origin_id == InvalidRepOriginId || isRecovery);
118-
pq_sendbyte(out,'B');/* BEGIN */
107+
108+
if (csn==INVALID_CSN&& !isRecovery) {
109+
MtmIsFilteredTxn= true;
110+
}else {
111+
pq_sendbyte(out,'B');/* BEGIN */
119112
pq_sendint(out,MtmNodeId,4);
120113
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
121-
pq_sendint64(out,csn);
122-
}
114+
pq_sendint64(out,csn);
115+
MtmIsFilteredTxn= false;
116+
}
123117
}
124118

125119
/*
@@ -129,9 +123,11 @@ static void
129123
pglogical_write_commit(StringInfoout,PGLogicalOutputData*data,
130124
ReorderBufferTXN*txn,XLogRecPtrcommit_lsn)
131125
{
132-
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
133126
uint8flags=0;
134127

128+
if (MtmIsFilteredTxn) {
129+
return;
130+
}
135131
if (txn->xact_action==XLOG_XACT_COMMIT)
136132
flags=PGLOGICAL_COMMIT;
137133
elseif (txn->xact_action==XLOG_XACT_PREPARE)
@@ -143,18 +139,19 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
143139
else
144140
Assert(false);
145141

146-
142+
#if0
147143
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
148144
if (mm->isLocal) {
149145
return;
150146
}
151147
}else {
152148
csn_tcsn=MtmTransactionSnapshot(txn->xid);
153-
boolisRecovery=MtmIsRecoveredNode(mm->nodeId);
149+
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
154150
if (csn==INVALID_CSN&& !isRecovery) {
155151
return;
156152
}
157153
}
154+
#endif
158155

159156
pq_sendbyte(out,'C');/* sending COMMIT */
160157

@@ -185,11 +182,10 @@ static void
185182
pglogical_write_insert(StringInfoout,PGLogicalOutputData*data,
186183
Relationrel,HeapTuplenewtuple)
187184
{
188-
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
189-
if (!mm->isLocal) {
190-
pq_sendbyte(out,'I');/* action INSERT */
191-
pglogical_write_tuple(out,data,rel,newtuple);
192-
}
185+
if (!MtmIsFilteredTxn) {
186+
pq_sendbyte(out,'I');/* action INSERT */
187+
pglogical_write_tuple(out,data,rel,newtuple);
188+
}
193189
}
194190

195191
/*
@@ -199,32 +195,31 @@ static void
199195
pglogical_write_update(StringInfoout,PGLogicalOutputData*data,
200196
Relationrel,HeapTupleoldtuple,HeapTuplenewtuple)
201197
{
202-
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
203-
if (!mm->isLocal) {
204-
pq_sendbyte(out,'U');/* action UPDATE */
205-
/* FIXME support whole tuple (O tuple type) */
206-
if (oldtuple!=NULL)
207-
{
208-
pq_sendbyte(out,'K');/* old key follows */
209-
pglogical_write_tuple(out,data,rel,oldtuple);
210-
}
211-
212-
pq_sendbyte(out,'N');/* new tuple follows */
213-
pglogical_write_tuple(out,data,rel,newtuple);
214-
}
198+
if (!MtmIsFilteredTxn) {
199+
pq_sendbyte(out,'U');/* action UPDATE */
200+
/* FIXME support whole tuple (O tuple type) */
201+
if (oldtuple!=NULL)
202+
{
203+
pq_sendbyte(out,'K');/* old key follows */
204+
pglogical_write_tuple(out,data,rel,oldtuple);
205+
}
206+
207+
pq_sendbyte(out,'N');/* new tuple follows */
208+
pglogical_write_tuple(out,data,rel,newtuple);
209+
}
215210
}
211+
216212
/*
217213
* Write DELETE to the output stream.
218214
*/
219215
staticvoid
220216
pglogical_write_delete(StringInfoout,PGLogicalOutputData*data,
221217
Relationrel,HeapTupleoldtuple)
222218
{
223-
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
224-
if (!mm->isLocal) {
225-
pq_sendbyte(out,'D');/* action DELETE */
226-
pglogical_write_tuple(out,data,rel,oldtuple);
227-
}
219+
if (!MtmIsFilteredTxn) {
220+
pq_sendbyte(out,'D');/* action DELETE */
221+
pglogical_write_tuple(out,data,rel,oldtuple);
222+
}
228223
}
229224

230225
/*
@@ -422,16 +417,16 @@ decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
422417
PGLogicalProtoAPI*
423418
pglogical_init_api(PGLogicalProtoTypetyp)
424419
{
425-
PGLogicalProtoMM*pmm=palloc0(sizeof(PGLogicalProtoMM));
426-
PGLogicalProtoAPI*res=&pmm->api;
427-
pmm->isLocal= false;
428-
sscanf(MyReplicationSlot->data.name.data,MULTIMASTER_SLOT_PATTERN,&pmm->nodeId);
420+
PGLogicalProtoAPI*res=palloc0(sizeof(PGLogicalProtoAPI));
421+
sscanf(MyReplicationSlot->data.name.data,MULTIMASTER_SLOT_PATTERN,&MtmReplicationNodeId);
422+
elog(WARNING,"%d: PRGLOGICAL init API for slot %s node %d",MyProcPid,MyReplicationSlot->data.name.data,MtmReplicationNodeId);
429423
res->write_rel=pglogical_write_rel;
430424
res->write_begin=pglogical_write_begin;
431425
res->write_commit=pglogical_write_commit;
432426
res->write_insert=pglogical_write_insert;
433427
res->write_update=pglogical_write_update;
434428
res->write_delete=pglogical_write_delete;
429+
res->setup_hooks=MtmSetupReplicationHooks;
435430
res->write_startup_message=write_startup_message;
436431
returnres;
437432
}

‎pglogical_proto.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ typedef void (*pglogical_write_delete_fn)(StringInfo out, PGLogicalOutputData *d
3333

3434
typedefvoid (*write_startup_message_fn)(StringInfoout,List*msg);
3535

36+
typedefvoid (*pglogical_setup_hooks_fn)(structPGLogicalHooks*hooks);
37+
3638
typedefstructPGLogicalProtoAPI
3739
{
3840
pglogical_write_rel_fnwrite_rel;
@@ -42,7 +44,8 @@ typedef struct PGLogicalProtoAPI
4244
pglogical_write_insert_fnwrite_insert;
4345
pglogical_write_update_fnwrite_update;
4446
pglogical_write_delete_fnwrite_delete;
45-
write_startup_message_fnwrite_startup_message;
47+
pglogical_setup_hooks_fnsetup_hooks;
48+
write_startup_message_fnwrite_startup_message;
4649
}PGLogicalProtoAPI;
4750

4851

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp