Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite57ee93

Browse files
committed
Handle CREATE/DROP INDEX CONCURRENTLY without 2PC
1 parentcf4758e commite57ee93

File tree

3 files changed

+75
-39
lines changed

3 files changed

+75
-39
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ static void MtmShmemStartup(void);
149149
staticBgwPool*MtmPoolConstructor(void);
150150
staticboolMtmRunUtilityStmt(PGconn*conn,charconst*sql,char**errmsg);
151151
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError);
152-
staticboolMtmProcessDDLCommand(charconst*queryString);
152+
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional);
153153

154154
MtmState*Mtm;
155155

@@ -3721,7 +3721,7 @@ static char * MtmGucSerialize(void)
37213721
* -------------------------------------------
37223722
*/
37233723

3724-
staticboolMtmProcessDDLCommand(charconst*queryString)
3724+
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional)
37253725
{
37263726
char*queryWithContext;
37273727
char*gucContext;
@@ -3740,7 +3740,12 @@ static bool MtmProcessDDLCommand(char const* queryString)
37403740
}
37413741

37423742
MTM_LOG1("Sending utility: %s",queryWithContext);
3743-
LogLogicalMessage("G",queryWithContext,strlen(queryWithContext)+1, true);
3743+
if (transactional)
3744+
/* DDL */
3745+
LogLogicalMessage("D",queryWithContext,strlen(queryWithContext)+1, true);
3746+
else
3747+
/* CONCURRENT DDL */
3748+
LogLogicalMessage("C",queryWithContext,strlen(queryWithContext)+1, false);
37443749

37453750
MtmTx.containsDML= true;
37463751
return false;
@@ -3876,6 +3881,30 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
38763881
}
38773882
break;
38783883

3884+
caseT_IndexStmt:
3885+
{
3886+
IndexStmt*indexStmt= (IndexStmt*)parsetree;
3887+
if (indexStmt->concurrent&& !IsTransactionBlock())
3888+
{
3889+
skipCommand= true;
3890+
MtmProcessDDLCommand(queryString, false);
3891+
MtmTx.isDistributed= false;
3892+
}
3893+
}
3894+
break;
3895+
3896+
caseT_DropStmt:
3897+
{
3898+
DropStmt*stmt= (DropStmt*)parsetree;
3899+
if (stmt->removeType==OBJECT_INDEX&&stmt->concurrent&& !IsTransactionBlock())
3900+
{
3901+
skipCommand= true;
3902+
MtmProcessDDLCommand(queryString, false);
3903+
MtmTx.isDistributed= false;
3904+
}
3905+
}
3906+
break;
3907+
38793908
/* Copy need some special care */
38803909
caseT_CopyStmt:
38813910
{
@@ -3913,7 +3942,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39133942
{
39143943
if (!skipCommand&& !MtmTx.isReplicated&& (MtmUtilityProcessedInXid==InvalidTransactionId)) {
39153944
MtmUtilityProcessedInXid=GetCurrentTransactionId();
3916-
MtmProcessDDLCommand(queryString);
3945+
MtmProcessDDLCommand(queryString, true);
39173946
executed= true;
39183947
}
39193948
}
@@ -3936,7 +3965,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39363965
MtmTx.snapshot=INVALID_CSN;
39373966
}
39383967

3939-
if (executed)
3968+
if (executed&& !skipCommand)
39403969
{
39413970
MtmFinishDDLCommand();
39423971
}

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
7070
staticvoidUserTableUpdateIndexes(EState*estate,TupleTableSlot*slot);
7171

7272
staticboolprocess_remote_begin(StringInfos);
73-
staticvoidprocess_remote_transactional_message(StringInfos);
7473
staticvoidprocess_remote_message(StringInfos);
7574
staticvoidprocess_remote_commit(StringInfos);
7675
staticvoidprocess_remote_insert(StringInfos,Relationrel);
@@ -355,35 +354,43 @@ process_remote_begin(StringInfo s)
355354
}
356355

357356
staticvoid
358-
process_remote_transactional_message(StringInfos)
357+
process_remote_message(StringInfos)
359358
{
360-
intrc;
359+
charaction=pq_getmsgbyte(s);
361360
intmessageSize=pq_getmsgint(s,4);
362-
charconst*stmt=pq_getmsgbytes(s,messageSize);
361+
charconst*messageBody=pq_getmsgbytes(s,messageSize);
363362

364-
MTM_LOG1("%d: Executing utility statement %s",MyProcPid,stmt);
365-
SPI_connect();
366-
ActivePortal->sourceText=stmt;
367-
rc=SPI_execute(stmt, false,0);
368-
SPI_finish();
369-
if (rc<0)
370-
elog(ERROR,"Failed to execute utility statement %s",stmt);
363+
switch (action)
364+
{
365+
case'C':
366+
{
367+
MTM_LOG1("%d: Executing non-tx utility statement %s",MyProcPid,messageBody);
368+
SetCurrentStatementStartTimestamp();
369+
StartTransactionCommand();
370+
/* intentional falldown to the next case */
371+
}
372+
case'D':
373+
{
374+
intrc;
375+
376+
MTM_LOG1("%d: Executing utility statement %s",MyProcPid,messageBody);
377+
SPI_connect();
378+
ActivePortal->sourceText=messageBody;
379+
rc=SPI_execute(messageBody, false,0);
380+
SPI_finish();
381+
if (rc<0)
382+
elog(ERROR,"Failed to execute utility statement %s",messageBody);
383+
break;
384+
}
385+
case'L':
386+
{
387+
MTM_LOG3("%ld: Process deadlock message with size %d from %d",MtmGetSystemTime(),messageSize,MtmReplicationNodeId);
388+
MtmUpdateLockGraph(MtmReplicationNodeId,messageBody,messageSize);
389+
break;
390+
}
391+
}
371392

372-
//XXX: create messages for tables localization too.
373-
// if (strcmp(relname, MULTIMASTER_LOCAL_TABLES_TABLE) == 0) {
374-
// char* schema = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
375-
// char* name = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
376-
// MtmMakeTableLocal(schema, name);
377-
// }
378-
}
379393

380-
staticvoid
381-
process_remote_message(StringInfos)
382-
{
383-
intmessageSize=pq_getmsgint(s,4);
384-
charconst*messageBody=pq_getmsgbytes(s,messageSize);
385-
MTM_LOG3("%ld: Process deadlock message with size %d from %d",MtmGetSystemTime(),messageSize,MtmReplicationNodeId);
386-
MtmUpdateLockGraph(MtmReplicationNodeId,messageBody,messageSize);
387394
}
388395

389396
staticvoid
@@ -1049,16 +1056,10 @@ void MtmExecutor(void* work, size_t size)
10491056
s.len=save_len;
10501057
continue;
10511058
}
1052-
case'G':
1053-
case'E':
1054-
{
1055-
process_remote_transactional_message(&s);
1056-
continue;
1057-
}
1058-
case'L':
1059+
case'M':
10591060
{
10601061
process_remote_message(&s);
1061-
break;
1062+
continue;
10621063
}
10631064
default:
10641065
elog(ERROR,"unknown action of type %c",action);

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pglogical_write_message(StringInfo out,
137137
{
138138
MTM_LOG1("Send deadlock message to node %d",MtmReplicationNodeId);
139139
}
140-
elseif (*prefix=='G')
140+
elseif (*prefix=='D')
141141
{
142142
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN)
143143
{
@@ -149,8 +149,14 @@ pglogical_write_message(StringInfo out,
149149
elseif (*prefix=='E')
150150
{
151151
DDLInProress= false;
152+
/*
153+
* we use End message only as indicator of DDL transaction finish,
154+
* so no need to send that to replicas.
155+
*/
156+
return;
152157
}
153158

159+
pq_sendbyte(out,'M');
154160
pq_sendbyte(out,*prefix);
155161
pq_sendint(out,sz,4);
156162
pq_sendbytes(out,message,sz);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp