Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc431162

Browse files
committed
ignore logical messages outside of tx
1 parent5a20755 commitc431162

File tree

2 files changed

+38
-111
lines changed

2 files changed

+38
-111
lines changed

‎multimaster.c

Lines changed: 25 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -3172,11 +3172,16 @@ MtmGenerateGid(char* gid)
31723172

31733173
staticboolMtmTwoPhaseCommit(MtmCurrentTrans*x)
31743174
{
3175-
if (MtmUtilityStmt&& !MyXactAccessedTempRel)
3175+
if (MyXactAccessedTempRel)
31763176
{
3177-
MtmProcessDDLCommand(MtmUtilityStmt);
3178-
pfree(MtmUtilityStmt);
3179-
MtmUtilityStmt=NULL;
3177+
/*
3178+
* XXX: this tx anyway goes to subscribers later, but without
3179+
* surrounding begin/commit. Probably there is more clever way
3180+
* to do that.
3181+
*/
3182+
x->isDistributed= false;
3183+
x->csn=NULL;
3184+
return false;
31803185
}
31813186

31823187
if (!x->isReplicated&& (x->isDistributed&&x->containsDML)) {
@@ -3245,15 +3250,15 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32453250
caseT_CreateTableSpaceStmt:
32463251
caseT_AlterTableSpaceOptionsStmt:
32473252
caseT_TruncateStmt:
3248-
caseT_CommentStmt:/* XXX: we could replicate these */;
3253+
caseT_CommentStmt:
32493254
caseT_PrepareStmt:
32503255
caseT_ExecuteStmt:
32513256
caseT_DeallocateStmt:
32523257
caseT_NotifyStmt:
32533258
caseT_ListenStmt:
32543259
caseT_UnlistenStmt:
32553260
caseT_LoadStmt:
3256-
caseT_ClusterStmt:/* XXX: we could replicate these */;
3261+
caseT_ClusterStmt:
32573262
caseT_VacuumStmt:
32583263
caseT_ExplainStmt:
32593264
caseT_VariableShowStmt:
@@ -3263,6 +3268,16 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32633268
caseT_ReindexStmt:
32643269
skipCommand= true;
32653270
break;
3271+
3272+
/* Do not skip following unless temp object was accessed */
3273+
caseT_CreateTableAsStmt:
3274+
caseT_CreateStmt:
3275+
caseT_ViewStmt:
3276+
caseT_IndexStmt:
3277+
caseT_DropStmt:
3278+
break;
3279+
3280+
/* Save GUC context for consequent DDL execution */
32663281
caseT_DiscardStmt:
32673282
{
32683283
DiscardStmt*stmt= (DiscardStmt*)parsetree;
@@ -3279,8 +3294,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32793294
{
32803295
VariableSetStmt*stmt= (VariableSetStmt*)parsetree;
32813296

3282-
// skipCommand = true;
3283-
32843297
/* Prevent SET TRANSACTION from replication */
32853298
if (stmt->kind==VAR_SET_MULTI)
32863299
skipCommand= true;
@@ -3292,88 +3305,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32923305
}
32933306
}
32943307
break;
3295-
caseT_CreateTableAsStmt:
3296-
// {
3297-
// /* Do not replicate temp tables */
3298-
// CreateTableAsStmt *stmt = (CreateTableAsStmt *) parsetree;
3299-
// skipCommand = stmt->into->rel->relpersistence == RELPERSISTENCE_TEMP ||
3300-
// (stmt->into->rel->schemaname && strcmp(stmt->into->rel->schemaname, "pg_temp") == 0);
3301-
// }
3302-
break;
3303-
caseT_CreateStmt:
3304-
{
3305-
/* Do not replicate temp tables */
3306-
CreateStmt*stmt= (CreateStmt*)parsetree;
3307-
skipCommand=stmt->relation->relpersistence==RELPERSISTENCE_TEMP||
3308-
(stmt->relation->schemaname&&strcmp(stmt->relation->schemaname,"pg_temp")==0);
3309-
}
3310-
break;
3311-
caseT_ViewStmt:
3312-
{
3313-
ViewStmt*stmt= (ViewStmt*)parsetree;
3314-
Query*viewParse;
3315-
3316-
viewParse=parse_analyze((Node*)copyObject(stmt->query),
3317-
queryString,NULL,0);
3318-
skipCommand=isQueryUsingTempRelation(viewParse)||
3319-
stmt->view->relpersistence==RELPERSISTENCE_TEMP;
3320-
// ||
3321-
// (stmt->relation->schemaname && strcmp(stmt->relation->schemaname, "pg_temp") == 0);
3322-
}
3323-
break;
3324-
caseT_IndexStmt:
3325-
{
3326-
Oidrelid;
3327-
Relationrel;
3328-
IndexStmt*stmt= (IndexStmt*)parsetree;
3329-
boolisTopLevel= (context==PROCESS_UTILITY_TOPLEVEL);
33303308

3331-
if (stmt->concurrent)
3332-
PreventTransactionChain(isTopLevel,
3333-
"CREATE INDEX CONCURRENTLY");
3334-
3335-
relid=RelnameGetRelid(stmt->relation->relname);
3336-
3337-
if (OidIsValid(relid))
3338-
{
3339-
rel=heap_open(relid,ShareLock);
3340-
skipCommand=rel->rd_rel->relpersistence==RELPERSISTENCE_TEMP;
3341-
heap_close(rel,ShareLock);
3342-
}
3343-
}
3344-
break;
3345-
caseT_DropStmt:
3346-
{
3347-
DropStmt*stmt= (DropStmt*)parsetree;
3348-
3349-
if (stmt->removeType==OBJECT_TABLE)
3350-
{
3351-
RangeVar*rv=makeRangeVarFromNameList(
3352-
(List*)lfirst(list_head(stmt->objects)));
3353-
Oidrelid=RelnameGetRelid(rv->relname);
3354-
3355-
if (OidIsValid(relid))
3356-
{
3357-
Relationrel=heap_open(relid,ShareLock);
3358-
skipCommand=rel->rd_rel->relpersistence==RELPERSISTENCE_TEMP;
3359-
heap_close(rel,ShareLock);
3360-
}
3361-
}
3362-
elseif (stmt->removeType==OBJECT_INDEX)
3363-
{
3364-
RangeVar*rv=makeRangeVarFromNameList(
3365-
(List*)lfirst(list_head(stmt->objects)));
3366-
Oidrelid=RelnameGetRelid(rv->relname);
3367-
3368-
if (OidIsValid(relid))
3369-
{
3370-
Relationirel=index_open(relid,ShareLock);
3371-
skipCommand=irel->rd_rel->relpersistence==RELPERSISTENCE_TEMP;
3372-
index_close(irel,ShareLock);
3373-
}
3374-
}
3375-
}
3376-
break;
3309+
/* Copy need some special care */
33773310
caseT_CopyStmt:
33783311
{
33793312
CopyStmt*copyStatement= (CopyStmt*)parsetree;
@@ -3404,20 +3337,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
34043337
if (context==PROCESS_UTILITY_TOPLEVEL)
34053338
{
34063339
if (!skipCommand&& !MtmTx.isReplicated) {
3407-
// if (MtmProcessDDLCommand(queryString)) {
3408-
// return;
3409-
// }
3410-
3411-
MemoryContextoldcontext;
3412-
3413-
if (MtmUtilityStmt)
3414-
pfree(MtmUtilityStmt);
3415-
3416-
oldcontext=MemoryContextSwitchTo(TopMemoryContext);
3417-
MtmUtilityStmt=palloc(strlen(queryString)+1);
3418-
MemoryContextSwitchTo(oldcontext);
3419-
3420-
strncpy(MtmUtilityStmt,queryString,strlen(queryString)+1);
3340+
if (MtmProcessDDLCommand(queryString)) {
3341+
return;
3342+
}
34213343
}
34223344
}
34233345

‎pglogical_apply.c

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ typedef struct TupleData
5959
boolchanged[MaxTupleAttributeNumber];
6060
}TupleData;
6161

62+
staticboolinside_tx= false;
63+
6264
staticRelationread_rel(StringInfos,LOCKMODEmode);
6365
staticvoidread_tuple_parts(StringInfos,Relationrel,TupleData*tup);
6466
staticEState*create_rel_estate(Relationrel);
@@ -339,6 +341,8 @@ process_remote_begin(StringInfo s)
339341
StartTransactionCommand();
340342
MtmJoinTransaction(&gtid,snapshot);
341343

344+
inside_tx= true;
345+
342346
MTM_LOG1("REMOTE begin node=%d xid=%d snapshot=%ld",gtid.node,gtid.xid,snapshot);
343347
}
344348

@@ -349,9 +353,14 @@ process_remote_message(StringInfo s)
349353
intrc;
350354

351355
stmt=pq_getmsgstring(s);
352-
MTM_LOG1("utility: %s",stmt);
353-
MTM_LOG3("%d: Execute utility statement %s",MyProcPid,stmt);
354356

357+
if (!inside_tx)
358+
{
359+
MTM_LOG1("%d: Ignoring utility statement %s",MyProcPid,stmt);
360+
return;
361+
}
362+
363+
MTM_LOG1("%d: Executing utility statement %s",MyProcPid,stmt);
355364
SPI_connect();
356365
rc=SPI_execute(stmt, false,0);
357366
SPI_finish();
@@ -631,6 +640,7 @@ process_remote_commit(StringInfo in)
631640
if (flags&PGLOGICAL_CAUGHT_UP) {
632641
MtmRecoveryCompleted();
633642
}
643+
inside_tx= false;
634644
}
635645

636646
staticvoid
@@ -950,12 +960,7 @@ void MtmExecutor(int id, void* work, size_t size)
950960
{
951961
while (true) {
952962
charaction=pq_getmsgbyte(&s);
953-
MTM_LOG3("%d: REMOTE process action %c",MyProcPid,action);
954-
#if0
955-
if (Mtm->status==MTM_RECOVERY) {
956-
MTM_LOG1("Replay action %c[%x]",action,s.data[s.cursor]);
957-
}
958-
#endif
963+
MTM_LOG1("%d: REMOTE process action %c",MyProcPid,action);
959964
switch (action) {
960965
/* BEGIN */
961966
case'B':

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp