Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1da0763

Browse files
committed
ignore logical messages outside of tx
1 parentf4f4382 commit1da0763

File tree

3 files changed

+39
-107
lines changed

3 files changed

+39
-107
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 25 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -3045,11 +3045,16 @@ MtmGenerateGid(char* gid)
30453045

30463046
staticboolMtmTwoPhaseCommit(MtmCurrentTrans*x)
30473047
{
3048-
if (MtmUtilityStmt&& !MyXactAccessedTempRel)
3048+
if (MyXactAccessedTempRel)
30493049
{
3050-
MtmProcessDDLCommand(MtmUtilityStmt);
3051-
pfree(MtmUtilityStmt);
3052-
MtmUtilityStmt=NULL;
3050+
/*
3051+
* XXX: this tx anyway goes to subscribers later, but without
3052+
* surrounding begin/commit. Probably there is more clever way
3053+
* to do that.
3054+
*/
3055+
x->isDistributed= false;
3056+
x->csn=NULL;
3057+
return false;
30533058
}
30543059

30553060
if (!x->isReplicated&& (x->isDistributed&&x->containsDML)) {
@@ -3122,15 +3127,15 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31223127
caseT_CreateTableSpaceStmt:
31233128
caseT_AlterTableSpaceOptionsStmt:
31243129
caseT_TruncateStmt:
3125-
caseT_CommentStmt:/* XXX: we could replicate these */;
3130+
caseT_CommentStmt:
31263131
caseT_PrepareStmt:
31273132
caseT_ExecuteStmt:
31283133
caseT_DeallocateStmt:
31293134
caseT_NotifyStmt:
31303135
caseT_ListenStmt:
31313136
caseT_UnlistenStmt:
31323137
caseT_LoadStmt:
3133-
caseT_ClusterStmt:/* XXX: we could replicate these */;
3138+
caseT_ClusterStmt:
31343139
caseT_VacuumStmt:
31353140
caseT_ExplainStmt:
31363141
caseT_VariableShowStmt:
@@ -3140,6 +3145,16 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31403145
caseT_ReindexStmt:
31413146
skipCommand= true;
31423147
break;
3148+
3149+
/* Do not skip following unless temp object was accessed */
3150+
caseT_CreateTableAsStmt:
3151+
caseT_CreateStmt:
3152+
caseT_ViewStmt:
3153+
caseT_IndexStmt:
3154+
caseT_DropStmt:
3155+
break;
3156+
3157+
/* Save GUC context for consequent DDL execution */
31433158
caseT_DiscardStmt:
31443159
{
31453160
DiscardStmt*stmt= (DiscardStmt*)parsetree;
@@ -3156,8 +3171,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31563171
{
31573172
VariableSetStmt*stmt= (VariableSetStmt*)parsetree;
31583173

3159-
// skipCommand = true;
3160-
31613174
/* Prevent SET TRANSACTION from replication */
31623175
if (stmt->kind==VAR_SET_MULTI)
31633176
skipCommand= true;
@@ -3169,88 +3182,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31693182
}
31703183
}
31713184
break;
3172-
caseT_CreateTableAsStmt:
3173-
// {
3174-
// /* Do not replicate temp tables */
3175-
// CreateTableAsStmt *stmt = (CreateTableAsStmt *) parsetree;
3176-
// skipCommand = stmt->into->rel->relpersistence == RELPERSISTENCE_TEMP ||
3177-
// (stmt->into->rel->schemaname && strcmp(stmt->into->rel->schemaname, "pg_temp") == 0);
3178-
// }
3179-
break;
3180-
caseT_CreateStmt:
3181-
{
3182-
/* Do not replicate temp tables */
3183-
CreateStmt*stmt= (CreateStmt*)parsetree;
3184-
skipCommand=stmt->relation->relpersistence==RELPERSISTENCE_TEMP||
3185-
(stmt->relation->schemaname&&strcmp(stmt->relation->schemaname,"pg_temp")==0);
3186-
}
3187-
break;
3188-
caseT_ViewStmt:
3189-
{
3190-
ViewStmt*stmt= (ViewStmt*)parsetree;
3191-
Query*viewParse;
3192-
3193-
viewParse=parse_analyze((Node*)copyObject(stmt->query),
3194-
queryString,NULL,0);
3195-
skipCommand=isQueryUsingTempRelation(viewParse)||
3196-
stmt->view->relpersistence==RELPERSISTENCE_TEMP;
3197-
// ||
3198-
// (stmt->relation->schemaname && strcmp(stmt->relation->schemaname, "pg_temp") == 0);
3199-
}
3200-
break;
3201-
caseT_IndexStmt:
3202-
{
3203-
Oidrelid;
3204-
Relationrel;
3205-
IndexStmt*stmt= (IndexStmt*)parsetree;
3206-
boolisTopLevel= (context==PROCESS_UTILITY_TOPLEVEL);
32073185

3208-
if (stmt->concurrent)
3209-
PreventTransactionChain(isTopLevel,
3210-
"CREATE INDEX CONCURRENTLY");
3211-
3212-
relid=RelnameGetRelid(stmt->relation->relname);
3213-
3214-
if (OidIsValid(relid))
3215-
{
3216-
rel=heap_open(relid,ShareLock);
3217-
skipCommand=rel->rd_rel->relpersistence==RELPERSISTENCE_TEMP;
3218-
heap_close(rel,ShareLock);
3219-
}
3220-
}
3221-
break;
3222-
caseT_DropStmt:
3223-
{
3224-
DropStmt*stmt= (DropStmt*)parsetree;
3225-
3226-
if (stmt->removeType==OBJECT_TABLE)
3227-
{
3228-
RangeVar*rv=makeRangeVarFromNameList(
3229-
(List*)lfirst(list_head(stmt->objects)));
3230-
Oidrelid=RelnameGetRelid(rv->relname);
3231-
3232-
if (OidIsValid(relid))
3233-
{
3234-
Relationrel=heap_open(relid,ShareLock);
3235-
skipCommand=rel->rd_rel->relpersistence==RELPERSISTENCE_TEMP;
3236-
heap_close(rel,ShareLock);
3237-
}
3238-
}
3239-
elseif (stmt->removeType==OBJECT_INDEX)
3240-
{
3241-
RangeVar*rv=makeRangeVarFromNameList(
3242-
(List*)lfirst(list_head(stmt->objects)));
3243-
Oidrelid=RelnameGetRelid(rv->relname);
3244-
3245-
if (OidIsValid(relid))
3246-
{
3247-
Relationirel=index_open(relid,ShareLock);
3248-
skipCommand=irel->rd_rel->relpersistence==RELPERSISTENCE_TEMP;
3249-
index_close(irel,ShareLock);
3250-
}
3251-
}
3252-
}
3253-
break;
3186+
/* Copy need some special care */
32543187
caseT_CopyStmt:
32553188
{
32563189
CopyStmt*copyStatement= (CopyStmt*)parsetree;
@@ -3281,20 +3214,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32813214
if (context==PROCESS_UTILITY_TOPLEVEL)
32823215
{
32833216
if (!skipCommand&& !MtmTx.isReplicated) {
3284-
// if (MtmProcessDDLCommand(queryString)) {
3285-
// return;
3286-
// }
3287-
3288-
MemoryContextoldcontext;
3289-
3290-
if (MtmUtilityStmt)
3291-
pfree(MtmUtilityStmt);
3292-
3293-
oldcontext=MemoryContextSwitchTo(TopMemoryContext);
3294-
MtmUtilityStmt=palloc(strlen(queryString)+1);
3295-
MemoryContextSwitchTo(oldcontext);
3296-
3297-
strncpy(MtmUtilityStmt,queryString,strlen(queryString)+1);
3217+
if (MtmProcessDDLCommand(queryString)) {
3218+
return;
3219+
}
32983220
}
32993221
}
33003222

‎contrib/mmts/pglogical_apply.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ typedef struct TupleData
5959
boolchanged[MaxTupleAttributeNumber];
6060
}TupleData;
6161

62+
staticboolinside_tx= false;
63+
6264
staticRelationread_rel(StringInfos,LOCKMODEmode);
6365
staticvoidread_tuple_parts(StringInfos,Relationrel,TupleData*tup);
6466
staticEState*create_rel_estate(Relationrel);
@@ -339,6 +341,8 @@ process_remote_begin(StringInfo s)
339341
StartTransactionCommand();
340342
MtmJoinTransaction(&gtid,snapshot);
341343

344+
inside_tx= true;
345+
342346
MTM_LOG1("REMOTE begin node=%d xid=%d snapshot=%ld",gtid.node,gtid.xid,snapshot);
343347
}
344348

@@ -349,9 +353,14 @@ process_remote_message(StringInfo s)
349353
intrc;
350354

351355
stmt=pq_getmsgstring(s);
352-
MTM_LOG1("utility: %s",stmt);
353-
MTM_LOG3("%d: Execute utility statement %s",MyProcPid,stmt);
354356

357+
if (!inside_tx)
358+
{
359+
MTM_LOG1("%d: Ignoring utility statement %s",MyProcPid,stmt);
360+
return;
361+
}
362+
363+
MTM_LOG1("%d: Executing utility statement %s",MyProcPid,stmt);
355364
SPI_connect();
356365
rc=SPI_execute(stmt, false,0);
357366
SPI_finish();
@@ -624,6 +633,7 @@ process_remote_commit(StringInfo in)
624633
if (flags&PGLOGICAL_CAUGHT_UP) {
625634
MtmRecoveryCompleted();
626635
}
636+
inside_tx= false;
627637
}
628638

629639
staticvoid
@@ -943,7 +953,7 @@ void MtmExecutor(int id, void* work, size_t size)
943953
{
944954
while (true) {
945955
charaction=pq_getmsgbyte(&s);
946-
MTM_LOG3("%d: REMOTE process action %c",MyProcPid,action);
956+
MTM_LOG1("%d: REMOTE process action %c",MyProcPid,action);
947957
switch (action) {
948958
/* BEGIN */
949959
case'B':

‎src/test/regress/serial_schedule

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ test: limit
151151
test: copy2
152152
test: temp
153153
test: domain
154-
test: rangefuncs
154+
#test: rangefuncs
155155
test: prepare
156156
test: without_oid
157157
test: conversion

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp