Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb4a0f07

Browse files
committed
Improve DDL handling
1 parent7f077cf commitb4a0f07

File tree

2 files changed

+68
-51
lines changed

2 files changed

+68
-51
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 67 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
#include"catalog/indexing.h"
6565
#include"catalog/namespace.h"
6666
#include"catalog/pg_constraint_fn.h"
67+
#include"catalog/pg_proc.h"
6768
#include"pglogical_output/hooks.h"
6869
#include"parser/analyze.h"
6970
#include"parser/parse_relation.h"
@@ -255,8 +256,6 @@ bool MtmUseDtm;
255256
boolMtmPreserveCommitOrder;
256257
boolMtmVolksWagenMode;/* Pretend to be normal postgres. This means skip some NOTICE's and use local sequences */
257258

258-
TransactionIdMtmUtilityProcessedInXid;
259-
260259
staticchar*MtmConnStrs;
261260
staticchar*MtmRemoteFunctionsList;
262261
staticchar*MtmClusterName;
@@ -275,6 +274,7 @@ static bool MtmClusterLocked;
275274
staticboolMtmInsideTransaction;
276275
staticboolMtmReferee;
277276
staticboolMtmMonotonicSequences;
277+
staticvoidconst*MtmDDLStatement;
278278

279279
staticExecutorStart_hook_typePreviousExecutorStartHook;
280280
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
@@ -923,6 +923,7 @@ MtmResetTransaction()
923923
x->csn=INVALID_CSN;
924924
x->status=TRANSACTION_STATUS_UNKNOWN;
925925
x->gid[0]='\0';
926+
MtmDDLStatement=NULL;
926927
}
927928

928929
#if0
@@ -986,6 +987,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
986987
MtmCheckClusterLock();
987988
}
988989
MtmInsideTransaction= true;
990+
MtmDDLStatement=NULL;
989991
Mtm->nRunningTransactions+=1;
990992

991993
x->snapshot=MtmAssignCSN();
@@ -3447,7 +3449,7 @@ _PG_init(void)
34473449
&MtmRemoteFunctionsList,
34483450
"lo_create,lo_unlink",
34493451
PGC_USERSET,/* context */
3450-
0,/* flags */
3452+
GUC_LIST_INPUT |GUC_LIST_QUOTE,/* flags */
34513453
NULL,/* GucStringCheckHook check_hook */
34523454
MtmSetRemoteFunction,/* GucStringAssignHook assign_hook */
34533455
NULL/* GucShowHook show_hook */
@@ -4961,14 +4963,17 @@ static void MtmGucDiscard()
49614963
dlist_init(&MtmGucList);
49624964

49634965
hash_destroy(MtmGucHash);
4964-
MtmGucInit();
4966+
MtmGucHash=NULL;
49654967
}
49664968

49674969
staticinlinevoidMtmGucUpdate(constchar*key,char*value)
49684970
{
49694971
MtmGucEntry*hentry;
49704972
boolfound;
49714973

4974+
if (!MtmGucHash)
4975+
MtmGucInit();
4976+
49724977
hentry= (MtmGucEntry*)hash_search(MtmGucHash,key,HASH_ENTER,&found);
49734978
if (found)
49744979
{
@@ -4984,6 +4989,9 @@ static inline void MtmGucRemove(const char *key)
49844989
MtmGucEntry*hentry;
49854990
boolfound;
49864991

4992+
if (!MtmGucHash)
4993+
MtmGucInit();
4994+
49874995
hentry= (MtmGucEntry*)hash_search(MtmGucHash,key,HASH_FIND,&found);
49884996
if (found)
49894997
{
@@ -5042,23 +5050,19 @@ char* MtmGucSerialize(void)
50425050

50435051
serialized_gucs=makeStringInfo();
50445052

5045-
/*
5046-
* Crutch for scheduler. It sets search_path through SetConfigOption()
5047-
* so our callback do not react on that.
5048-
*/
5049-
search_path=GetConfigOption("search_path", false, true);
5050-
appendStringInfo(serialized_gucs,"SET search_path TO %s; ",search_path);
5051-
50525053
dlist_foreach(iter,&MtmGucList)
50535054
{
50545055
MtmGucEntry*cur_entry=dlist_container(MtmGucEntry,list_node,iter.cur);
50555056

5057+
if (strcmp(cur_entry->key,"search_path")==0)
5058+
continue;
5059+
50565060
appendStringInfoString(serialized_gucs,"SET ");
50575061
appendStringInfoString(serialized_gucs,cur_entry->key);
50585062
appendStringInfoString(serialized_gucs," TO ");
50595063

50605064
/* quite a crutch */
5061-
if (strstr(cur_entry->key,"_mem")!=NULL||*(cur_entry->value)=='\0'||strchr(cur_entry->value,',')!=NULL)
5065+
if (strstr(cur_entry->key,"_mem")!=NULL||*(cur_entry->value)=='\0')
50625066
{
50635067
appendStringInfoString(serialized_gucs,"'");
50645068
appendStringInfoString(serialized_gucs,cur_entry->value);
@@ -5071,6 +5075,13 @@ char* MtmGucSerialize(void)
50715075
appendStringInfoString(serialized_gucs,"; ");
50725076
}
50735077

5078+
/*
5079+
* Crutch for scheduler. It sets search_path through SetConfigOption()
5080+
* so our callback do not react on that.
5081+
*/
5082+
search_path=GetConfigOption("search_path", false, true);
5083+
appendStringInfo(serialized_gucs,"SET search_path TO %s; ",search_path);
5084+
50745085
returnserialized_gucs->data;
50755086
}
50765087

@@ -5363,6 +5374,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
53635374
return;
53645375
}
53655376
}
5377+
elseif (stmt->removeType==OBJECT_FUNCTION&&MtmTx.isReplicated)
5378+
{
5379+
/* Make it possible to drop functions which were not replicated */
5380+
stmt->missing_ok= true;
5381+
}
53665382
}
53675383
break;
53685384

@@ -5395,16 +5411,14 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
53955411
break;
53965412
}
53975413

5398-
if (!skipCommand&& !MtmTx.isReplicated&&(context==PROCESS_UTILITY_TOPLEVEL||MtmUtilityProcessedInXid!=GetCurrentTransactionId()))
5414+
if (!skipCommand&& !MtmTx.isReplicated&&!MtmDDLStatement)
53995415
{
5400-
MtmUtilityProcessedInXid=GetCurrentTransactionId();
5401-
if (context==PROCESS_UTILITY_TOPLEVEL|| !ActivePortal) {
5402-
MtmProcessDDLCommand(queryString, true);
5403-
}else {
5404-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5405-
}
5416+
MTM_LOG3("Process DDL statement '%s', MtmTx.isReplicated=%d, MtmIsLogicalReceiver=%d",queryString,MtmTx.isReplicated,MtmIsLogicalReceiver);
5417+
MtmProcessDDLCommand(queryString, true);
54065418
executed= true;
5419+
MtmDDLStatement=queryString;
54075420
}
5421+
elseMTM_LOG3("Skip utility statement '%s': skip=%d, insideDDL=%d",queryString,skipCommand,MtmDDLStatement!=NULL);
54085422

54095423
if (PreviousProcessUtilityHook!=NULL)
54105424
{
@@ -5423,16 +5437,17 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
54235437
#endif
54245438
if (MyXactAccessedTempRel)
54255439
{
5426-
MTM_LOG1("Xact accessed temp table, stopping replication");
5440+
MTM_LOG1("Xact accessed temp table, stopping replication of statement '%s'",queryString);
54275441
MtmTx.isDistributed= false;/* Skip */
54285442
MtmTx.snapshot=INVALID_CSN;
54295443
}
54305444

54315445
if (executed)
54325446
{
54335447
MtmFinishDDLCommand();
5448+
MtmDDLStatement=NULL;
54345449
}
5435-
if (nodeTag(parsetree)==T_CreateStmt)
5450+
if (IsA(parsetree,CreateStmt))
54365451
{
54375452
CreateStmt*create= (CreateStmt*)parsetree;
54385453
Oidrelid=RangeVarGetRelid(create->relation,NoLock, true);
@@ -5449,15 +5464,12 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
54495464
}
54505465
}
54515466
}
5452-
if (context==PROCESS_UTILITY_TOPLEVEL) {
5453-
MtmUtilityProcessedInXid=InvalidTransactionId;
5454-
}
54555467
}
54565468

54575469
staticvoid
54585470
MtmExecutorStart(QueryDesc*queryDesc,inteflags)
54595471
{
5460-
if (!MtmTx.isReplicated&&ActivePortal)
5472+
if (!MtmTx.isReplicated&&!MtmDDLStatement)
54615473
{
54625474
ListCell*tlist;
54635475

@@ -5471,11 +5483,32 @@ MtmExecutorStart(QueryDesc *queryDesc, int eflags)
54715483
TargetEntry*tle= (TargetEntry*)lfirst(tlist);
54725484
if (tle->expr&&IsA(tle->expr,FuncExpr))
54735485
{
5474-
if (hash_search(MtmRemoteFunctions,&((FuncExpr*)tle->expr)->funcid,HASH_FIND,NULL))
5486+
Oidfunc_oid= ((FuncExpr*)tle->expr)->funcid;
5487+
if (!hash_search(MtmRemoteFunctions,&func_oid,HASH_FIND,NULL))
54755488
{
5476-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5477-
break;
5489+
Form_pg_procfuncform;
5490+
boolis_sec_def;
5491+
HeapTuplefunc_tuple=SearchSysCache1(PROCOID,ObjectIdGetDatum(func_oid));
5492+
if (!HeapTupleIsValid(func_tuple))
5493+
elog(ERROR,"cache lookup failed for function %u",func_oid);
5494+
funcform= (Form_pg_proc)GETSTRUCT(func_tuple);
5495+
is_sec_def=funcform->prosecdef;
5496+
ReleaseSysCache(func_tuple);
5497+
elog(LOG,"Function %s security defined=%d",tle->resname,is_sec_def);
5498+
if (!is_sec_def)
5499+
{
5500+
continue;
5501+
}
54785502
}
5503+
/*
5504+
* Execute security defined functions or functions marked as remote at replicated nodes.
5505+
* Them are executed as DDL statements.
5506+
* All data modifications done inside this function are not replicated.
5507+
* As a result generated content can vary at different nodes.
5508+
*/
5509+
MtmProcessDDLCommand(queryDesc->sourceText, true);
5510+
MtmDDLStatement=queryDesc;
5511+
break;
54795512
}
54805513
}
54815514
}
@@ -5524,6 +5557,12 @@ MtmExecutorFinish(QueryDesc *queryDesc)
55245557
{
55255558
standard_ExecutorFinish(queryDesc);
55265559
}
5560+
5561+
if (MtmDDLStatement==queryDesc)
5562+
{
5563+
MtmFinishDDLCommand();
5564+
MtmDDLStatement=NULL;
5565+
}
55275566
}
55285567

55295568
staticvoidMtmSeqNextvalHook(Oidseqid,int64next)

‎src/backend/commands/functioncmds.c

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,17 +1114,13 @@ CreateFunction(CreateFunctionStmt *stmt, const char *queryString)
11141114
* Note: this is also used for aggregate deletion, since the OIDs of
11151115
* both functions and aggregates point to pg_proc.
11161116
*/
1117+
externboolMyXactAccessedTempRel;
11171118
void
11181119
RemoveFunctionById(OidfuncOid)
11191120
{
11201121
Relationrelation;
11211122
HeapTupletup;
11221123
boolisagg;
1123-
Oidlanguage_oid;
1124-
HeapTuplelanguageTuple;
1125-
Form_pg_languagelanguageStruct;
1126-
OidlanguageValidator;
1127-
boolsave_check_function_bodies;
11281124

11291125
/*
11301126
* Delete the pg_proc tuple.
@@ -1137,24 +1133,6 @@ RemoveFunctionById(Oid funcOid)
11371133

11381134
isagg= ((Form_pg_proc)GETSTRUCT(tup))->proisagg;
11391135

1140-
/*
1141-
* MTM-CRUTCH: We need to know wheteher our function had
1142-
* accessed temp relation or not. So validate function body
1143-
* again -- that will set MyXactAccessedTempRel.
1144-
*/
1145-
save_check_function_bodies=check_function_bodies;
1146-
check_function_bodies= false;
1147-
language_oid= ((Form_pg_proc)GETSTRUCT(tup))->prolang;
1148-
languageTuple=SearchSysCache1(LANGOID,language_oid);
1149-
languageStruct= (Form_pg_language)GETSTRUCT(languageTuple);
1150-
languageValidator=languageStruct->lanvalidator;
1151-
ifOidIsValid(languageValidator) {
1152-
/* Language validator is optional feature of language */
1153-
OidFunctionCall1(languageValidator,ObjectIdGetDatum(funcOid));
1154-
}
1155-
ReleaseSysCache(languageTuple);
1156-
check_function_bodies=save_check_function_bodies;
1157-
11581136
simple_heap_delete(relation,&tup->t_self);
11591137

11601138
ReleaseSysCache(tup);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp