Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit32d4d32

Browse files
committed
Correctly process concurrent logical messages
1 parentcfbf8e4 commit32d4d32

File tree

2 files changed

+32
-25
lines changed

2 files changed

+32
-25
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ static void MtmShmemStartup(void);
156156
staticBgwPool*MtmPoolConstructor(void);
157157
staticboolMtmRunUtilityStmt(PGconn*conn,charconst*sql,char**errmsg);
158158
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError);
159-
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional);
159+
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional,boolcontextFree);
160160

161161
MtmState*Mtm;
162162

@@ -3825,33 +3825,31 @@ static char * MtmGucSerialize(void)
38253825
* -------------------------------------------
38263826
*/
38273827

3828-
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional)
3828+
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional,boolcontextFree)
38293829
{
3830-
char*queryWithContext;
3830+
char*queryWithContext= (char*)queryString;
38313831
char*gucContext;
38323832

3833-
/* Append global GUC to utility stmt. */
3834-
gucContext=MtmGucSerialize();
3835-
if (gucContext)
3836-
{
3837-
queryWithContext=palloc(strlen(gucContext)+strlen(queryString)+1);
3838-
strcpy(queryWithContext,gucContext);
3839-
strcat(queryWithContext,queryString);
3840-
}
3841-
else
3842-
{
3843-
queryWithContext= (char*)queryString;
3833+
if (!contextFree) {
3834+
/* Append global GUC to utility stmt. */
3835+
gucContext=MtmGucSerialize();
3836+
if (gucContext)
3837+
{
3838+
queryWithContext=palloc(strlen(gucContext)+strlen(queryString)+1);
3839+
strcpy(queryWithContext,gucContext);
3840+
strcat(queryWithContext,queryString);
3841+
}
38443842
}
38453843

38463844
MTM_LOG3("Sending utility: %s",queryWithContext);
3847-
if (transactional)
3845+
if (transactional) {
38483846
/* DDL */
38493847
LogLogicalMessage("D",queryWithContext,strlen(queryWithContext)+1, true);
3850-
else
3848+
MtmTx.containsDML= true;
3849+
}else {
38513850
/* CONCURRENT DDL */
3852-
LogLogicalMessage("C",queryWithContext,strlen(queryWithContext)+1, false);
3853-
3854-
MtmTx.containsDML= true;
3851+
XLogFlush(LogLogicalMessage("C",queryWithContext,strlen(queryWithContext)+1, false));
3852+
}
38553853
return false;
38563854
}
38573855

@@ -3930,7 +3928,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39303928
caseT_UnlistenStmt:
39313929
caseT_LoadStmt:
39323930
caseT_ClusterStmt:
3933-
caseT_VacuumStmt:
39343931
caseT_VariableShowStmt:
39353932
caseT_ReassignOwnedStmt:
39363933
caseT_LockStmt:
@@ -3939,6 +3936,13 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39393936
skipCommand= true;
39403937
break;
39413938

3939+
caseT_VacuumStmt:
3940+
context=PROCESS_UTILITY_TOPLEVEL;
3941+
MtmProcessDDLCommand(queryString, false, true);
3942+
MtmTx.isDistributed= false;
3943+
skipCommand= true;
3944+
break;
3945+
39423946
caseT_CreateDomainStmt:
39433947
{
39443948
CreateDomainStmt*stmt= (CreateDomainStmt*)parsetree;
@@ -4030,7 +4034,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40304034
if (indexStmt->concurrent&& !IsTransactionBlock()&& !MtmTx.isReplicated)
40314035
{
40324036
skipCommand= true;
4033-
MtmProcessDDLCommand(queryString, false);
4037+
MtmProcessDDLCommand(queryString, false, false);
40344038
MtmTx.isDistributed= false;
40354039
}
40364040
}
@@ -4042,7 +4046,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40424046
if (stmt->removeType==OBJECT_INDEX&&stmt->concurrent&& !IsTransactionBlock()&& !MtmTx.isReplicated)
40434047
{
40444048
skipCommand= true;
4045-
MtmProcessDDLCommand(queryString, false);
4049+
MtmProcessDDLCommand(queryString, false, false);
40464050
MtmTx.isDistributed= false;
40474051
}
40484052
}
@@ -4085,9 +4089,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40854089
MtmUtilityProcessedInXid=GetCurrentTransactionId();
40864090

40874091
if (context==PROCESS_UTILITY_TOPLEVEL)
4088-
MtmProcessDDLCommand(queryString, true);
4092+
MtmProcessDDLCommand(queryString, true, false);
40894093
else
4090-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
4094+
MtmProcessDDLCommand(ActivePortal->sourceText, true, false);
40914095

40924096
executed= true;
40934097
}
@@ -4143,7 +4147,7 @@ MtmExecutorStart(QueryDesc *queryDesc, int eflags)
41434147
}
41444148

41454149
if (ddl_generating_call&& !MtmTx.isReplicated)
4146-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
4150+
MtmProcessDDLCommand(ActivePortal->sourceText, true, false);
41474151

41484152
if (PreviousExecutorStartHook!=NULL)
41494153
PreviousExecutorStartHook(queryDesc,eflags);

‎contrib/mmts/pglogical_receiver.c‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,9 @@ pglogical_receiver_main(Datum main_arg)
528528
if (stmt[0]=='M'&&stmt[1]=='L') {
529529
MTM_LOG3("Process deadlock message from %d",nodeId);
530530
MtmExecutor(stmt,rc-hdr_len);
531+
}elseif (stmt[0]=='M'&&stmt[1]=='C') {
532+
MTM_LOG3("Process concurrent DDL message from %d",nodeId);
533+
MtmExecutor(stmt,rc-hdr_len);
531534
}else {
532535
ByteBufferAppend(&buf,stmt,rc-hdr_len);
533536
if (stmt[0]=='C')/* commit */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp