Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit85f9100

Browse files
knizhnikkelvich
authored andcommitted
Correctly process concurrent logical messages
1 parent29ce7df commit85f9100

File tree

2 files changed

+32
-25
lines changed

2 files changed

+32
-25
lines changed

‎multimaster.c

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ static void MtmShmemStartup(void);
155155
staticBgwPool*MtmPoolConstructor(void);
156156
staticboolMtmRunUtilityStmt(PGconn*conn,charconst*sql,char**errmsg);
157157
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError);
158-
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional);
158+
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional,boolcontextFree);
159159

160160
MtmState*Mtm;
161161

@@ -3824,33 +3824,31 @@ static char * MtmGucSerialize(void)
38243824
* -------------------------------------------
38253825
*/
38263826

3827-
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional)
3827+
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional,boolcontextFree)
38283828
{
3829-
char*queryWithContext;
3829+
char*queryWithContext= (char*)queryString;
38303830
char*gucContext;
38313831

3832-
/* Append global GUC to utility stmt. */
3833-
gucContext=MtmGucSerialize();
3834-
if (gucContext)
3835-
{
3836-
queryWithContext=palloc(strlen(gucContext)+strlen(queryString)+1);
3837-
strcpy(queryWithContext,gucContext);
3838-
strcat(queryWithContext,queryString);
3839-
}
3840-
else
3841-
{
3842-
queryWithContext= (char*)queryString;
3832+
if (!contextFree) {
3833+
/* Append global GUC to utility stmt. */
3834+
gucContext=MtmGucSerialize();
3835+
if (gucContext)
3836+
{
3837+
queryWithContext=palloc(strlen(gucContext)+strlen(queryString)+1);
3838+
strcpy(queryWithContext,gucContext);
3839+
strcat(queryWithContext,queryString);
3840+
}
38433841
}
38443842

38453843
MTM_LOG3("Sending utility: %s",queryWithContext);
3846-
if (transactional)
3844+
if (transactional) {
38473845
/* DDL */
38483846
LogLogicalMessage("D",queryWithContext,strlen(queryWithContext)+1, true);
3849-
else
3847+
MtmTx.containsDML= true;
3848+
}else {
38503849
/* CONCURRENT DDL */
3851-
LogLogicalMessage("C",queryWithContext,strlen(queryWithContext)+1, false);
3852-
3853-
MtmTx.containsDML= true;
3850+
XLogFlush(LogLogicalMessage("C",queryWithContext,strlen(queryWithContext)+1, false));
3851+
}
38543852
return false;
38553853
}
38563854

@@ -3929,7 +3927,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39293927
caseT_UnlistenStmt:
39303928
caseT_LoadStmt:
39313929
caseT_ClusterStmt:
3932-
caseT_VacuumStmt:
39333930
caseT_VariableShowStmt:
39343931
caseT_ReassignOwnedStmt:
39353932
caseT_LockStmt:
@@ -3938,6 +3935,13 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39383935
skipCommand= true;
39393936
break;
39403937

3938+
caseT_VacuumStmt:
3939+
context=PROCESS_UTILITY_TOPLEVEL;
3940+
MtmProcessDDLCommand(queryString, false, true);
3941+
MtmTx.isDistributed= false;
3942+
skipCommand= true;
3943+
break;
3944+
39413945
caseT_CreateDomainStmt:
39423946
/* Detect temp tables access */
39433947
{
@@ -4030,7 +4034,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40304034
if (indexStmt->concurrent&& !IsTransactionBlock()&& !MtmTx.isReplicated)
40314035
{
40324036
skipCommand= true;
4033-
MtmProcessDDLCommand(queryString, false);
4037+
MtmProcessDDLCommand(queryString, false, false);
40344038
MtmTx.isDistributed= false;
40354039
}
40364040
}
@@ -4042,7 +4046,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40424046
if (stmt->removeType==OBJECT_INDEX&&stmt->concurrent&& !IsTransactionBlock()&& !MtmTx.isReplicated)
40434047
{
40444048
skipCommand= true;
4045-
MtmProcessDDLCommand(queryString, false);
4049+
MtmProcessDDLCommand(queryString, false, false);
40464050
MtmTx.isDistributed= false;
40474051
}
40484052
}
@@ -4085,9 +4089,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40854089
MtmUtilityProcessedInXid=GetCurrentTransactionId();
40864090

40874091
if (context==PROCESS_UTILITY_TOPLEVEL)
4088-
MtmProcessDDLCommand(queryString, true);
4092+
MtmProcessDDLCommand(queryString, true, false);
40894093
else
4090-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
4094+
MtmProcessDDLCommand(ActivePortal->sourceText, true, false);
40914095

40924096
executed= true;
40934097
}
@@ -4144,7 +4148,7 @@ MtmExecutorStart(QueryDesc *queryDesc, int eflags)
41444148
}
41454149

41464150
if (ddl_generating_call&& !MtmTx.isReplicated)
4147-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
4151+
MtmProcessDDLCommand(ActivePortal->sourceText, true, false);
41484152

41494153
if (PreviousExecutorStartHook!=NULL)
41504154
PreviousExecutorStartHook(queryDesc,eflags);

‎pglogical_receiver.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,9 @@ pglogical_receiver_main(Datum main_arg)
528528
if (stmt[0]=='M'&&stmt[1]=='L') {
529529
MTM_LOG3("Process deadlock message from %d",nodeId);
530530
MtmExecutor(stmt,rc-hdr_len);
531+
}elseif (stmt[0]=='M'&&stmt[1]=='C') {
532+
MTM_LOG3("Process concurrent DDL message from %d",nodeId);
533+
MtmExecutor(stmt,rc-hdr_len);
531534
}else {
532535
ByteBufferAppend(&buf,stmt,rc-hdr_len);
533536
if (stmt[0]=='C')/* commit */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp