Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit42fd4af

Browse files
committed
Add pglogical_hooks.c
1 parent6296fba commit42fd4af

File tree

1 file changed

+232
-0
lines changed

1 file changed

+232
-0
lines changed
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
#include"postgres.h"
2+
3+
#include"access/xact.h"
4+
5+
#include"catalog/pg_proc.h"
6+
#include"catalog/pg_type.h"
7+
8+
#include"replication/origin.h"
9+
10+
#include"parser/parse_func.h"
11+
12+
#include"utils/acl.h"
13+
#include"utils/lsyscache.h"
14+
15+
#include"miscadmin.h"
16+
17+
#include"pglogical_hooks.h"
18+
#include"pglogical_output.h"
19+
20+
/*
21+
* Returns Oid of the hooks function specified in funcname.
22+
*
23+
* Error is thrown if function doesn't exist or doen't return correct datatype
24+
* or is volatile.
25+
*/
26+
staticOid
27+
get_hooks_function_oid(List*funcname)
28+
{
29+
Oidfuncid;
30+
Oidfuncargtypes[1];
31+
32+
funcargtypes[0]=INTERNALOID;
33+
34+
/* find the the function */
35+
funcid=LookupFuncName(funcname,1,funcargtypes, false);
36+
37+
/* Validate that the function returns void */
38+
if (get_func_rettype(funcid)!=VOIDOID)
39+
{
40+
ereport(ERROR,
41+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
42+
errmsg("function %s must return void",
43+
NameListToString(funcname))));
44+
}
45+
46+
if (func_volatile(funcid)==PROVOLATILE_VOLATILE)
47+
{
48+
ereport(ERROR,
49+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
50+
errmsg("function %s must not be VOLATILE",
51+
NameListToString(funcname))));
52+
}
53+
54+
if (pg_proc_aclcheck(funcid,GetUserId(),ACL_EXECUTE)!=ACLCHECK_OK)
55+
{
56+
constchar*username;
57+
#ifPG_VERSION_NUM >=90500
58+
username=GetUserNameFromId(GetUserId(), false);
59+
#else
60+
username=GetUserNameFromId(GetUserId());
61+
#endif
62+
ereport(ERROR,
63+
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
64+
errmsg("current user %s does not have permission to call function %s",
65+
username,NameListToString(funcname))));
66+
}
67+
68+
returnfuncid;
69+
}
70+
71+
/*
72+
* If a hook setup function was specified in the startup parameters, look it up
73+
* in the catalogs, check permissions, call it, and store the resulting hook
74+
* info struct.
75+
*/
76+
void
77+
load_hooks(PGLogicalOutputData*data)
78+
{
79+
Oidhooks_func;
80+
MemoryContextold_ctxt;
81+
booltxn_started= false;
82+
83+
if (!IsTransactionState())
84+
{
85+
txn_started= true;
86+
StartTransactionCommand();
87+
}
88+
89+
if (data->hooks_setup_funcname!=NIL)
90+
{
91+
hooks_func=get_hooks_function_oid(data->hooks_setup_funcname);
92+
93+
old_ctxt=MemoryContextSwitchTo(data->hooks_mctxt);
94+
(void)OidFunctionCall1(hooks_func,PointerGetDatum(&data->hooks));
95+
MemoryContextSwitchTo(old_ctxt);
96+
97+
elog(DEBUG3,"pglogical_output: Loaded hooks from function %u. Hooks are: \n"
98+
"\tstartup_hook: %p\n"
99+
"\tshutdown_hook: %p\n"
100+
"\trow_filter_hook: %p\n"
101+
"\ttxn_filter_hook: %p\n"
102+
"\thooks_private_data: %p\n",
103+
hooks_func,
104+
data->hooks.startup_hook,
105+
data->hooks.shutdown_hook,
106+
data->hooks.row_filter_hook,
107+
data->hooks.txn_filter_hook,
108+
data->hooks.hooks_private_data);
109+
}
110+
111+
if (txn_started)
112+
CommitTransactionCommand();
113+
}
114+
115+
void
116+
call_startup_hook(PGLogicalOutputData*data,List*plugin_params)
117+
{
118+
structPGLogicalStartupHookArgsargs;
119+
MemoryContextold_ctxt;
120+
121+
if (data->hooks.startup_hook!=NULL)
122+
{
123+
booltx_started= false;
124+
125+
args.private_data=data->hooks.hooks_private_data;
126+
args.in_params=plugin_params;
127+
args.out_params=NIL;
128+
129+
elog(DEBUG3,"calling pglogical startup hook");
130+
131+
if (!IsTransactionState())
132+
{
133+
tx_started= true;
134+
StartTransactionCommand();
135+
}
136+
137+
old_ctxt=MemoryContextSwitchTo(data->hooks_mctxt);
138+
(void) (*data->hooks.startup_hook)(&args);
139+
MemoryContextSwitchTo(old_ctxt);
140+
141+
if (tx_started)
142+
CommitTransactionCommand();
143+
144+
data->extra_startup_params=args.out_params;
145+
/* The startup hook might change the private data seg */
146+
data->hooks.hooks_private_data=args.private_data;
147+
148+
elog(DEBUG3,"called pglogical startup hook");
149+
}
150+
}
151+
152+
void
153+
call_shutdown_hook(PGLogicalOutputData*data)
154+
{
155+
structPGLogicalShutdownHookArgsargs;
156+
MemoryContextold_ctxt;
157+
158+
if (data->hooks.shutdown_hook!=NULL)
159+
{
160+
args.private_data=data->hooks.hooks_private_data;
161+
162+
elog(DEBUG3,"calling pglogical shutdown hook");
163+
164+
old_ctxt=MemoryContextSwitchTo(data->hooks_mctxt);
165+
(void) (*data->hooks.shutdown_hook)(&args);
166+
MemoryContextSwitchTo(old_ctxt);
167+
168+
data->hooks.hooks_private_data=args.private_data;
169+
170+
elog(DEBUG3,"called pglogical shutdown hook");
171+
}
172+
}
173+
174+
/*
175+
* Decide if the individual change should be filtered out by
176+
* calling a client-provided hook.
177+
*/
178+
bool
179+
call_row_filter_hook(PGLogicalOutputData*data,ReorderBufferTXN*txn,
180+
Relationrel,ReorderBufferChange*change)
181+
{
182+
structPGLogicalRowFilterArgshook_args;
183+
MemoryContextold_ctxt;
184+
boolret= true;
185+
186+
if (data->hooks.row_filter_hook!=NULL)
187+
{
188+
hook_args.change_type=change->action;
189+
hook_args.private_data=data->hooks.hooks_private_data;
190+
hook_args.changed_rel=rel;
191+
192+
elog(DEBUG3,"calling pglogical row filter hook");
193+
194+
old_ctxt=MemoryContextSwitchTo(data->hooks_mctxt);
195+
ret= (*data->hooks.row_filter_hook)(&hook_args);
196+
MemoryContextSwitchTo(old_ctxt);
197+
198+
/* Filter hooks shouldn't change the private data ptr */
199+
Assert(data->hooks.hooks_private_data==hook_args.private_data);
200+
201+
elog(DEBUG3,"called pglogical row filter hook, returned %d", (int)ret);
202+
}
203+
204+
returnret;
205+
}
206+
207+
bool
208+
call_txn_filter_hook(PGLogicalOutputData*data,RepOriginIdtxn_origin)
209+
{
210+
structPGLogicalTxnFilterArgshook_args;
211+
boolret= true;
212+
MemoryContextold_ctxt;
213+
214+
if (data->hooks.txn_filter_hook!=NULL)
215+
{
216+
hook_args.private_data=data->hooks.hooks_private_data;
217+
hook_args.origin_id=txn_origin;
218+
219+
elog(DEBUG3,"calling pglogical txn filter hook");
220+
221+
old_ctxt=MemoryContextSwitchTo(data->hooks_mctxt);
222+
ret= (*data->hooks.txn_filter_hook)(&hook_args);
223+
MemoryContextSwitchTo(old_ctxt);
224+
225+
/* Filter hooks shouldn't change the private data ptr */
226+
Assert(data->hooks.hooks_private_data==hook_args.private_data);
227+
228+
elog(DEBUG3,"called pglogical txn filter hook, returned %d", (int)ret);
229+
}
230+
231+
returnret;
232+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp