Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9dae3b8

Browse files
committed
Merge branch 'PGPROEE9_6_MULTIMASTER' into PGPROEE9_6
2 parents7e32d6f +484e3d8 commit9dae3b8

File tree

6 files changed

+192
-72
lines changed

6 files changed

+192
-72
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 168 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
#include"catalog/indexing.h"
6565
#include"catalog/namespace.h"
6666
#include"catalog/pg_constraint_fn.h"
67+
#include"catalog/pg_proc.h"
6768
#include"pglogical_output/hooks.h"
6869
#include"parser/analyze.h"
6970
#include"parser/parse_relation.h"
@@ -256,8 +257,6 @@ bool MtmUseRDMA;
256257
boolMtmPreserveCommitOrder;
257258
boolMtmVolksWagenMode;/* Pretend to be normal postgres. This means skip some NOTICE's and use local sequences */
258259

259-
TransactionIdMtmUtilityProcessedInXid;
260-
261260
staticchar*MtmConnStrs;
262261
staticchar*MtmRemoteFunctionsList;
263262
staticchar*MtmClusterName;
@@ -276,6 +275,7 @@ static bool MtmClusterLocked;
276275
staticboolMtmInsideTransaction;
277276
staticboolMtmReferee;
278277
staticboolMtmMonotonicSequences;
278+
staticvoidconst*MtmDDLStatement;
279279

280280
staticExecutorStart_hook_typePreviousExecutorStartHook;
281281
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
@@ -924,6 +924,7 @@ MtmResetTransaction()
924924
x->csn=INVALID_CSN;
925925
x->status=TRANSACTION_STATUS_UNKNOWN;
926926
x->gid[0]='\0';
927+
MtmDDLStatement=NULL;
927928
}
928929

929930
#if0
@@ -987,6 +988,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
987988
MtmCheckClusterLock();
988989
}
989990
MtmInsideTransaction= true;
991+
MtmDDLStatement=NULL;
990992
Mtm->nRunningTransactions+=1;
991993

992994
x->snapshot=MtmAssignCSN();
@@ -3461,7 +3463,7 @@ _PG_init(void)
34613463
&MtmRemoteFunctionsList,
34623464
"lo_create,lo_unlink",
34633465
PGC_USERSET,/* context */
3464-
0,/* flags */
3466+
GUC_LIST_INPUT |GUC_LIST_QUOTE,/* flags */
34653467
NULL,/* GucStringCheckHook check_hook */
34663468
MtmSetRemoteFunction,/* GucStringAssignHook assign_hook */
34673469
NULL/* GucShowHook show_hook */
@@ -4975,14 +4977,17 @@ static void MtmGucDiscard()
49754977
dlist_init(&MtmGucList);
49764978

49774979
hash_destroy(MtmGucHash);
4978-
MtmGucInit();
4980+
MtmGucHash=NULL;
49794981
}
49804982

49814983
staticinlinevoidMtmGucUpdate(constchar*key,char*value)
49824984
{
49834985
MtmGucEntry*hentry;
49844986
boolfound;
49854987

4988+
if (!MtmGucHash)
4989+
MtmGucInit();
4990+
49864991
hentry= (MtmGucEntry*)hash_search(MtmGucHash,key,HASH_ENTER,&found);
49874992
if (found)
49884993
{
@@ -4998,6 +5003,9 @@ static inline void MtmGucRemove(const char *key)
49985003
MtmGucEntry*hentry;
49995004
boolfound;
50005005

5006+
if (!MtmGucHash)
5007+
MtmGucInit();
5008+
50015009
hentry= (MtmGucEntry*)hash_search(MtmGucHash,key,HASH_FIND,&found);
50025010
if (found)
50035011
{
@@ -5056,23 +5064,19 @@ char* MtmGucSerialize(void)
50565064

50575065
serialized_gucs=makeStringInfo();
50585066

5059-
/*
5060-
* Crutch for scheduler. It sets search_path through SetConfigOption()
5061-
* so our callback do not react on that.
5062-
*/
5063-
search_path=GetConfigOption("search_path", false, true);
5064-
appendStringInfo(serialized_gucs,"SET search_path TO %s; ",search_path);
5065-
50665067
dlist_foreach(iter,&MtmGucList)
50675068
{
50685069
MtmGucEntry*cur_entry=dlist_container(MtmGucEntry,list_node,iter.cur);
50695070

5071+
if (strcmp(cur_entry->key,"search_path")==0)
5072+
continue;
5073+
50705074
appendStringInfoString(serialized_gucs,"SET ");
50715075
appendStringInfoString(serialized_gucs,cur_entry->key);
50725076
appendStringInfoString(serialized_gucs," TO ");
50735077

50745078
/* quite a crutch */
5075-
if (strstr(cur_entry->key,"_mem")!=NULL||*(cur_entry->value)=='\0'||strchr(cur_entry->value,',')!=NULL)
5079+
if (strstr(cur_entry->key,"_mem")!=NULL||*(cur_entry->value)=='\0')
50765080
{
50775081
appendStringInfoString(serialized_gucs,"'");
50785082
appendStringInfoString(serialized_gucs,cur_entry->value);
@@ -5085,6 +5089,13 @@ char* MtmGucSerialize(void)
50855089
appendStringInfoString(serialized_gucs,"; ");
50865090
}
50875091

5092+
/*
5093+
* Crutch for scheduler. It sets search_path through SetConfigOption()
5094+
* so our callback do not react on that.
5095+
*/
5096+
search_path=GetConfigOption("search_path", false, true);
5097+
appendStringInfo(serialized_gucs,"SET search_path TO %s; ",search_path);
5098+
50885099
returnserialized_gucs->data;
50895100
}
50905101

@@ -5142,12 +5153,60 @@ void MtmUpdateLockGraph(int nodeId, void const* messageBody, int messageSize)
51425153
MTM_LOG1("Update deadlock graph for node %d size %d",nodeId,messageSize);
51435154
}
51445155

5156+
staticboolMtmIsTempType(TypeName*typeName)
5157+
{
5158+
boolisTemp= false;
5159+
5160+
if (typeName!=NULL)
5161+
{
5162+
TypetypeTuple=LookupTypeName(NULL,typeName,NULL, false);
5163+
if (typeTuple!=NULL)
5164+
{
5165+
Form_pg_typetypeStruct= (Form_pg_type)GETSTRUCT(typeTuple);
5166+
Oidrelid=typeStruct->typrelid;
5167+
ReleaseSysCache(typeTuple);
5168+
5169+
if (relid!=InvalidOid)
5170+
{
5171+
HeapTupleclassTuple=SearchSysCache1(RELOID,relid);
5172+
Form_pg_classclassStruct= (Form_pg_class)GETSTRUCT(classTuple);
5173+
if (classStruct->relpersistence=='t')
5174+
isTemp= true;
5175+
ReleaseSysCache(classTuple);
5176+
}
5177+
}
5178+
}
5179+
returnisTemp;
5180+
}
5181+
5182+
staticboolMtmFunctionProfileDependsOnTempTable(CreateFunctionStmt*func)
5183+
{
5184+
ListCell*elem;
5185+
5186+
if (MtmIsTempType(func->returnType))
5187+
{
5188+
return true;
5189+
}
5190+
foreach (elem,func->parameters)
5191+
{
5192+
FunctionParameter*param= (FunctionParameter*)lfirst(elem);
5193+
if (MtmIsTempType(param->argType))
5194+
{
5195+
return true;
5196+
}
5197+
}
5198+
return false;
5199+
}
5200+
5201+
5202+
51455203
staticvoidMtmProcessUtility(Node*parsetree,constchar*queryString,
51465204
ProcessUtilityContextcontext,ParamListInfoparams,
51475205
DestReceiver*dest,char*completionTag)
51485206
{
51495207
boolskipCommand= false;
51505208
boolexecuted= false;
5209+
boolprevMyXactAccessedTempRel;
51515210

51525211
MTM_LOG2("%d: Process utility statement tag=%d, context=%d, issubtrans=%d, creating_extension=%d, query=%s",
51535212
MyProcPid,nodeTag(parsetree),context,IsSubTransaction(),creating_extension,queryString);
@@ -5229,19 +5288,24 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
52295288
break;
52305289

52315290
caseT_VacuumStmt:
5232-
skipCommand= true;
5233-
if (context==PROCESS_UTILITY_TOPLEVEL) {
5234-
MtmProcessDDLCommand(queryString, false);
5235-
MtmTx.isDistributed= false;
5236-
}elseif (MtmApplyContext!=NULL) {
5237-
MemoryContextoldContext=MemoryContextSwitchTo(MtmApplyContext);
5238-
Assert(oldContext!=MtmApplyContext);
5239-
MtmVacuumStmt= (VacuumStmt*)copyObject(parsetree);
5240-
MemoryContextSwitchTo(oldContext);
5241-
return;
5242-
}
5243-
break;
5244-
5291+
{
5292+
VacuumStmt*vacuum= (VacuumStmt*)parsetree;
5293+
skipCommand= true;
5294+
if ((vacuum->options&VACOPT_LOCAL)==0&& !MtmVolksWagenMode)
5295+
{
5296+
if (context==PROCESS_UTILITY_TOPLEVEL) {
5297+
MtmProcessDDLCommand(queryString, false);
5298+
MtmTx.isDistributed= false;
5299+
}elseif (MtmApplyContext!=NULL) {
5300+
MemoryContextoldContext=MemoryContextSwitchTo(MtmApplyContext);
5301+
Assert(oldContext!=MtmApplyContext);
5302+
MtmVacuumStmt= (VacuumStmt*)copyObject(parsetree);
5303+
MemoryContextSwitchTo(oldContext);
5304+
return;
5305+
}
5306+
}
5307+
break;
5308+
}
52455309
caseT_CreateDomainStmt:
52465310
/* Detect temp tables access */
52475311
{
@@ -5377,6 +5441,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
53775441
return;
53785442
}
53795443
}
5444+
elseif (stmt->removeType==OBJECT_FUNCTION&&MtmTx.isReplicated)
5445+
{
5446+
/* Make it possible to drop functions which were not replicated */
5447+
stmt->missing_ok= true;
5448+
}
53805449
}
53815450
break;
53825451

@@ -5386,6 +5455,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
53865455
CopyStmt*copyStatement= (CopyStmt*)parsetree;
53875456
skipCommand= true;
53885457
if (copyStatement->is_from) {
5458+
ListCell*opt;
53895459
RangeVar*relation=copyStatement->relation;
53905460

53915461
if (relation!=NULL)
@@ -5400,6 +5470,25 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
54005470
heap_close(rel,ShareLock);
54015471
}
54025472
}
5473+
5474+
foreach(opt,copyStatement->options)
5475+
{
5476+
DefElem*elem=lfirst(opt);
5477+
if (strcmp("local",elem->defname)==0) {
5478+
MtmTx.isDistributed= false;/* Skip */
5479+
MtmTx.snapshot=INVALID_CSN;
5480+
MtmTx.containsDML= false;
5481+
break;
5482+
}
5483+
}
5484+
}
5485+
caseT_CreateFunctionStmt:
5486+
{
5487+
if (MtmTx.isReplicated)
5488+
{
5489+
// disable functiob body cehck at replica
5490+
check_function_bodies= false;
5491+
}
54035492
}
54045493
break;
54055494
}
@@ -5409,16 +5498,16 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
54095498
break;
54105499
}
54115500

5412-
if (!skipCommand&& !MtmTx.isReplicated&&(context==PROCESS_UTILITY_TOPLEVEL||MtmUtilityProcessedInXid!=GetCurrentTransactionId()))
5501+
if (!skipCommand&& !MtmTx.isReplicated&&!MtmDDLStatement)
54135502
{
5414-
MtmUtilityProcessedInXid=GetCurrentTransactionId();
5415-
if (context==PROCESS_UTILITY_TOPLEVEL|| !ActivePortal) {
5416-
MtmProcessDDLCommand(queryString, true);
5417-
}else {
5418-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5419-
}
5503+
MTM_LOG3("Process DDL statement '%s', MtmTx.isReplicated=%d, MtmIsLogicalReceiver=%d",queryString,MtmTx.isReplicated,MtmIsLogicalReceiver);
5504+
MtmProcessDDLCommand(queryString, true);
54205505
executed= true;
5506+
MtmDDLStatement=queryString;
54215507
}
5508+
elseMTM_LOG3("Skip utility statement '%s': skip=%d, insideDDL=%d",queryString,skipCommand,MtmDDLStatement!=NULL);
5509+
5510+
prevMyXactAccessedTempRel=MyXactAccessedTempRel;
54225511

54235512
if (PreviousProcessUtilityHook!=NULL)
54245513
{
@@ -5435,18 +5524,32 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
54355524
MTM_ELOG(ERROR,"Isolation level %s is not supported by multimaster",isoLevelStr[XactIsoLevel]);
54365525
}
54375526
#endif
5527+
/* Allow replication of functions operating on temporary tables.
5528+
* Even through temporary table doesn't exist at replica, diasabling functoin body check makes it possible to create such function at replica.
5529+
* And it can be accessed later at replica if correspondent temporary table will be created.
5530+
* But disable replication of functions returning temporary tables: such functions can not be created at replica in any case.
5531+
*/
5532+
if (IsA(parsetree,CreateFunctionStmt))
5533+
{
5534+
if (MtmFunctionProfileDependsOnTempTable((CreateFunctionStmt*)parsetree))
5535+
{
5536+
prevMyXactAccessedTempRel= true;
5537+
}
5538+
MyXactAccessedTempRel=prevMyXactAccessedTempRel;
5539+
}
54385540
if (MyXactAccessedTempRel)
54395541
{
5440-
MTM_LOG1("Xact accessed temp table, stopping replication");
5542+
MTM_LOG1("Xact accessed temp table, stopping replication of statement '%s'",queryString);
54415543
MtmTx.isDistributed= false;/* Skip */
54425544
MtmTx.snapshot=INVALID_CSN;
54435545
}
54445546

54455547
if (executed)
54465548
{
54475549
MtmFinishDDLCommand();
5550+
MtmDDLStatement=NULL;
54485551
}
5449-
if (nodeTag(parsetree)==T_CreateStmt)
5552+
if (IsA(parsetree,CreateStmt))
54505553
{
54515554
CreateStmt*create= (CreateStmt*)parsetree;
54525555
Oidrelid=RangeVarGetRelid(create->relation,NoLock, true);
@@ -5463,15 +5566,12 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
54635566
}
54645567
}
54655568
}
5466-
if (context==PROCESS_UTILITY_TOPLEVEL) {
5467-
MtmUtilityProcessedInXid=InvalidTransactionId;
5468-
}
54695569
}
54705570

54715571
staticvoid
54725572
MtmExecutorStart(QueryDesc*queryDesc,inteflags)
54735573
{
5474-
if (!MtmTx.isReplicated&&ActivePortal)
5574+
if (!MtmTx.isReplicated&&!MtmDDLStatement)
54755575
{
54765576
ListCell*tlist;
54775577

@@ -5485,11 +5585,32 @@ MtmExecutorStart(QueryDesc *queryDesc, int eflags)
54855585
TargetEntry*tle= (TargetEntry*)lfirst(tlist);
54865586
if (tle->expr&&IsA(tle->expr,FuncExpr))
54875587
{
5488-
if (hash_search(MtmRemoteFunctions,&((FuncExpr*)tle->expr)->funcid,HASH_FIND,NULL))
5588+
Oidfunc_oid= ((FuncExpr*)tle->expr)->funcid;
5589+
if (!hash_search(MtmRemoteFunctions,&func_oid,HASH_FIND,NULL))
54895590
{
5490-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5491-
break;
5591+
Form_pg_procfuncform;
5592+
boolis_sec_def;
5593+
HeapTuplefunc_tuple=SearchSysCache1(PROCOID,ObjectIdGetDatum(func_oid));
5594+
if (!HeapTupleIsValid(func_tuple))
5595+
elog(ERROR,"cache lookup failed for function %u",func_oid);
5596+
funcform= (Form_pg_proc)GETSTRUCT(func_tuple);
5597+
is_sec_def=funcform->prosecdef;
5598+
ReleaseSysCache(func_tuple);
5599+
elog(LOG,"Function %s security defined=%d",tle->resname,is_sec_def);
5600+
if (!is_sec_def)
5601+
{
5602+
continue;
5603+
}
54925604
}
5605+
/*
5606+
* Execute security defined functions or functions marked as remote at replicated nodes.
5607+
* Them are executed as DDL statements.
5608+
* All data modifications done inside this function are not replicated.
5609+
* As a result generated content can vary at different nodes.
5610+
*/
5611+
MtmProcessDDLCommand(queryDesc->sourceText, true);
5612+
MtmDDLStatement=queryDesc;
5613+
break;
54935614
}
54945615
}
54955616
}
@@ -5538,6 +5659,12 @@ MtmExecutorFinish(QueryDesc *queryDesc)
55385659
{
55395660
standard_ExecutorFinish(queryDesc);
55405661
}
5662+
5663+
if (MtmDDLStatement==queryDesc)
5664+
{
5665+
MtmFinishDDLCommand();
5666+
MtmDDLStatement=NULL;
5667+
}
55415668
}
55425669

55435670
staticvoidMtmSeqNextvalHook(Oidseqid,int64next)
File renamed without changes.

‎src/backend/commands/copy.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1191,7 +1191,7 @@ ProcessCopyOptions(CopyState cstate,
11911191
errmsg("argument to option \"%s\" must be a valid encoding name",
11921192
defel->defname)));
11931193
}
1194-
else
1194+
elseif (strcmp(defel->defname,"local")!=0)
11951195
ereport(ERROR,
11961196
(errcode(ERRCODE_SYNTAX_ERROR),
11971197
errmsg("option \"%s\" not recognized",

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp