Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit27adf95

Browse files
knizhnikkelvich
authored andcommitted
Improve DDL handling
1 parent23e0701 commit27adf95

File tree

1 file changed

+67
-28
lines changed

1 file changed

+67
-28
lines changed

‎multimaster.c

Lines changed: 67 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
#include"catalog/indexing.h"
6565
#include"catalog/namespace.h"
6666
#include"catalog/pg_constraint_fn.h"
67+
#include"catalog/pg_proc.h"
6768
#include"pglogical_output/hooks.h"
6869
#include"parser/analyze.h"
6970
#include"parser/parse_relation.h"
@@ -258,8 +259,6 @@ bool MtmPreserveCommitOrder;
258259
boolMtmVolksWagenMode;/* Pretend to be normal postgres. This means skip some NOTICE's and use local sequences */
259260
boolMtmMajorNode;
260261

261-
TransactionIdMtmUtilityProcessedInXid;
262-
263262
staticchar*MtmConnStrs;
264263
staticchar*MtmRemoteFunctionsList;
265264
staticchar*MtmClusterName;
@@ -277,6 +276,7 @@ static bool MtmClusterLocked;
277276
staticboolMtmInsideTransaction;
278277
staticboolMtmReferee;
279278
staticboolMtmMonotonicSequences;
279+
staticvoidconst*MtmDDLStatement;
280280

281281
staticExecutorStart_hook_typePreviousExecutorStartHook;
282282
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
@@ -923,6 +923,7 @@ MtmResetTransaction()
923923
x->csn=INVALID_CSN;
924924
x->status=TRANSACTION_STATUS_UNKNOWN;
925925
x->gid[0]='\0';
926+
MtmDDLStatement=NULL;
926927
}
927928

928929
#if0
@@ -986,6 +987,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
986987
MtmCheckClusterLock();
987988
}
988989
MtmInsideTransaction= true;
990+
MtmDDLStatement=NULL;
989991
Mtm->nRunningTransactions+=1;
990992

991993
x->snapshot=MtmAssignCSN();
@@ -3117,7 +3119,7 @@ _PG_init(void)
31173119
&MtmRemoteFunctionsList,
31183120
"lo_create,lo_unlink",
31193121
PGC_USERSET,/* context */
3120-
0,/* flags */
3122+
GUC_LIST_INPUT |GUC_LIST_QUOTE,/* flags */
31213123
NULL,/* GucStringCheckHook check_hook */
31223124
MtmSetRemoteFunction,/* GucStringAssignHook assign_hook */
31233125
NULL/* GucShowHook show_hook */
@@ -4630,14 +4632,17 @@ static void MtmGucDiscard()
46304632
dlist_init(&MtmGucList);
46314633

46324634
hash_destroy(MtmGucHash);
4633-
MtmGucInit();
4635+
MtmGucHash=NULL;
46344636
}
46354637

46364638
staticinlinevoidMtmGucUpdate(constchar*key,char*value)
46374639
{
46384640
MtmGucEntry*hentry;
46394641
boolfound;
46404642

4643+
if (!MtmGucHash)
4644+
MtmGucInit();
4645+
46414646
hentry= (MtmGucEntry*)hash_search(MtmGucHash,key,HASH_ENTER,&found);
46424647
if (found)
46434648
{
@@ -4653,6 +4658,9 @@ static inline void MtmGucRemove(const char *key)
46534658
MtmGucEntry*hentry;
46544659
boolfound;
46554660

4661+
if (!MtmGucHash)
4662+
MtmGucInit();
4663+
46564664
hentry= (MtmGucEntry*)hash_search(MtmGucHash,key,HASH_FIND,&found);
46574665
if (found)
46584666
{
@@ -4711,23 +4719,19 @@ char* MtmGucSerialize(void)
47114719

47124720
serialized_gucs=makeStringInfo();
47134721

4714-
/*
4715-
* Crutch for scheduler. It sets search_path through SetConfigOption()
4716-
* so our callback do not react on that.
4717-
*/
4718-
search_path=GetConfigOption("search_path", false, true);
4719-
appendStringInfo(serialized_gucs,"SET search_path TO %s; ",search_path);
4720-
47214722
dlist_foreach(iter,&MtmGucList)
47224723
{
47234724
MtmGucEntry*cur_entry=dlist_container(MtmGucEntry,list_node,iter.cur);
47244725

4726+
if (strcmp(cur_entry->key,"search_path")==0)
4727+
continue;
4728+
47254729
appendStringInfoString(serialized_gucs,"SET ");
47264730
appendStringInfoString(serialized_gucs,cur_entry->key);
47274731
appendStringInfoString(serialized_gucs," TO ");
47284732

47294733
/* quite a crutch */
4730-
if (strstr(cur_entry->key,"_mem")!=NULL||*(cur_entry->value)=='\0'||strchr(cur_entry->value,',')!=NULL)
4734+
if (strstr(cur_entry->key,"_mem")!=NULL||*(cur_entry->value)=='\0')
47314735
{
47324736
appendStringInfoString(serialized_gucs,"'");
47334737
appendStringInfoString(serialized_gucs,cur_entry->value);
@@ -4740,6 +4744,13 @@ char* MtmGucSerialize(void)
47404744
appendStringInfoString(serialized_gucs,"; ");
47414745
}
47424746

4747+
/*
4748+
* Crutch for scheduler. It sets search_path through SetConfigOption()
4749+
* so our callback do not react on that.
4750+
*/
4751+
search_path=GetConfigOption("search_path", false, true);
4752+
appendStringInfo(serialized_gucs,"SET search_path TO %s; ",search_path);
4753+
47434754
returnserialized_gucs->data;
47444755
}
47454756

@@ -5032,6 +5043,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
50325043
return;
50335044
}
50345045
}
5046+
elseif (stmt->removeType==OBJECT_FUNCTION&&MtmTx.isReplicated)
5047+
{
5048+
/* Make it possible to drop functions which were not replicated */
5049+
stmt->missing_ok= true;
5050+
}
50355051
}
50365052
break;
50375053

@@ -5064,16 +5080,14 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
50645080
break;
50655081
}
50665082

5067-
if (!skipCommand&& !MtmTx.isReplicated&&(context==PROCESS_UTILITY_TOPLEVEL||MtmUtilityProcessedInXid!=GetCurrentTransactionId()))
5083+
if (!skipCommand&& !MtmTx.isReplicated&&!MtmDDLStatement)
50685084
{
5069-
MtmUtilityProcessedInXid=GetCurrentTransactionId();
5070-
if (context==PROCESS_UTILITY_TOPLEVEL|| !ActivePortal) {
5071-
MtmProcessDDLCommand(queryString, true);
5072-
}else {
5073-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5074-
}
5085+
MTM_LOG3("Process DDL statement '%s', MtmTx.isReplicated=%d, MtmIsLogicalReceiver=%d",queryString,MtmTx.isReplicated,MtmIsLogicalReceiver);
5086+
MtmProcessDDLCommand(queryString, true);
50755087
executed= true;
5088+
MtmDDLStatement=queryString;
50765089
}
5090+
elseMTM_LOG3("Skip utility statement '%s': skip=%d, insideDDL=%d",queryString,skipCommand,MtmDDLStatement!=NULL);
50775091

50785092
if (PreviousProcessUtilityHook!=NULL)
50795093
{
@@ -5092,16 +5106,17 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
50925106
#endif
50935107
if (MyXactAccessedTempRel)
50945108
{
5095-
MTM_LOG1("Xact accessed temp table, stopping replication");
5109+
MTM_LOG1("Xact accessed temp table, stopping replication of statement '%s'",queryString);
50965110
MtmTx.isDistributed= false;/* Skip */
50975111
MtmTx.snapshot=INVALID_CSN;
50985112
}
50995113

51005114
if (executed)
51015115
{
51025116
MtmFinishDDLCommand();
5117+
MtmDDLStatement=NULL;
51035118
}
5104-
if (nodeTag(parsetree)==T_CreateStmt)
5119+
if (IsA(parsetree,CreateStmt))
51055120
{
51065121
CreateStmt*create= (CreateStmt*)parsetree;
51075122
Oidrelid=RangeVarGetRelid(create->relation,NoLock, true);
@@ -5118,15 +5133,12 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
51185133
}
51195134
}
51205135
}
5121-
if (context==PROCESS_UTILITY_TOPLEVEL) {
5122-
MtmUtilityProcessedInXid=InvalidTransactionId;
5123-
}
51245136
}
51255137

51265138
staticvoid
51275139
MtmExecutorStart(QueryDesc*queryDesc,inteflags)
51285140
{
5129-
if (!MtmTx.isReplicated&&ActivePortal)
5141+
if (!MtmTx.isReplicated&&!MtmDDLStatement)
51305142
{
51315143
ListCell*tlist;
51325144

@@ -5140,11 +5152,32 @@ MtmExecutorStart(QueryDesc *queryDesc, int eflags)
51405152
TargetEntry*tle= (TargetEntry*)lfirst(tlist);
51415153
if (tle->expr&&IsA(tle->expr,FuncExpr))
51425154
{
5143-
if (hash_search(MtmRemoteFunctions,&((FuncExpr*)tle->expr)->funcid,HASH_FIND,NULL))
5155+
Oidfunc_oid= ((FuncExpr*)tle->expr)->funcid;
5156+
if (!hash_search(MtmRemoteFunctions,&func_oid,HASH_FIND,NULL))
51445157
{
5145-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5146-
break;
5158+
Form_pg_procfuncform;
5159+
boolis_sec_def;
5160+
HeapTuplefunc_tuple=SearchSysCache1(PROCOID,ObjectIdGetDatum(func_oid));
5161+
if (!HeapTupleIsValid(func_tuple))
5162+
elog(ERROR,"cache lookup failed for function %u",func_oid);
5163+
funcform= (Form_pg_proc)GETSTRUCT(func_tuple);
5164+
is_sec_def=funcform->prosecdef;
5165+
ReleaseSysCache(func_tuple);
5166+
elog(LOG,"Function %s security defined=%d",tle->resname,is_sec_def);
5167+
if (!is_sec_def)
5168+
{
5169+
continue;
5170+
}
51475171
}
5172+
/*
5173+
* Execute security defined functions or functions marked as remote at replicated nodes.
5174+
* Them are executed as DDL statements.
5175+
* All data modifications done inside this function are not replicated.
5176+
* As a result generated content can vary at different nodes.
5177+
*/
5178+
MtmProcessDDLCommand(queryDesc->sourceText, true);
5179+
MtmDDLStatement=queryDesc;
5180+
break;
51485181
}
51495182
}
51505183
}
@@ -5193,6 +5226,12 @@ MtmExecutorFinish(QueryDesc *queryDesc)
51935226
{
51945227
standard_ExecutorFinish(queryDesc);
51955228
}
5229+
5230+
if (MtmDDLStatement==queryDesc)
5231+
{
5232+
MtmFinishDDLCommand();
5233+
MtmDDLStatement=NULL;
5234+
}
51965235
}
51975236

51985237
staticvoidMtmSeqNextvalHook(Oidseqid,int64next)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp