Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7c17559

Browse files
committed
Handle CREATE/DROP INDEX CONCURRENTLY without 2PC
1 parent9914ce5 commit7c17559

File tree

3 files changed

+75
-39
lines changed

3 files changed

+75
-39
lines changed

‎multimaster.c

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ static void MtmShmemStartup(void);
148148
staticBgwPool*MtmPoolConstructor(void);
149149
staticboolMtmRunUtilityStmt(PGconn*conn,charconst*sql,char**errmsg);
150150
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError);
151-
staticboolMtmProcessDDLCommand(charconst*queryString);
151+
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional);
152152

153153
MtmState*Mtm;
154154

@@ -3721,7 +3721,7 @@ static char * MtmGucSerialize(void)
37213721
* -------------------------------------------
37223722
*/
37233723

3724-
staticboolMtmProcessDDLCommand(charconst*queryString)
3724+
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional)
37253725
{
37263726
char*queryWithContext;
37273727
char*gucContext;
@@ -3740,7 +3740,12 @@ static bool MtmProcessDDLCommand(char const* queryString)
37403740
}
37413741

37423742
MTM_LOG1("Sending utility: %s",queryWithContext);
3743-
LogLogicalMessage("G",queryWithContext,strlen(queryWithContext)+1, true);
3743+
if (transactional)
3744+
/* DDL */
3745+
LogLogicalMessage("D",queryWithContext,strlen(queryWithContext)+1, true);
3746+
else
3747+
/* CONCURRENT DDL */
3748+
LogLogicalMessage("C",queryWithContext,strlen(queryWithContext)+1, false);
37443749

37453750
MtmTx.containsDML= true;
37463751
return false;
@@ -3886,6 +3891,30 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
38863891
}
38873892
break;
38883893

3894+
caseT_IndexStmt:
3895+
{
3896+
IndexStmt*indexStmt= (IndexStmt*)parsetree;
3897+
if (indexStmt->concurrent&& !IsTransactionBlock())
3898+
{
3899+
skipCommand= true;
3900+
MtmProcessDDLCommand(queryString, false);
3901+
MtmTx.isDistributed= false;
3902+
}
3903+
}
3904+
break;
3905+
3906+
caseT_DropStmt:
3907+
{
3908+
DropStmt*stmt= (DropStmt*)parsetree;
3909+
if (stmt->removeType==OBJECT_INDEX&&stmt->concurrent&& !IsTransactionBlock())
3910+
{
3911+
skipCommand= true;
3912+
MtmProcessDDLCommand(queryString, false);
3913+
MtmTx.isDistributed= false;
3914+
}
3915+
}
3916+
break;
3917+
38893918
/* Copy need some special care */
38903919
caseT_CopyStmt:
38913920
{
@@ -3923,7 +3952,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39233952
{
39243953
if (!skipCommand&& !MtmTx.isReplicated&& (MtmUtilityProcessedInXid==InvalidTransactionId)) {
39253954
MtmUtilityProcessedInXid=GetCurrentTransactionId();
3926-
MtmProcessDDLCommand(queryString);
3955+
MtmProcessDDLCommand(queryString, true);
39273956
executed= true;
39283957
}
39293958
}
@@ -3950,7 +3979,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39503979
MtmTx.snapshot=INVALID_CSN;
39513980
}
39523981

3953-
if (executed)
3982+
if (executed&& !skipCommand)
39543983
{
39553984
MtmFinishDDLCommand();
39563985
}

‎pglogical_apply.c

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
7070
staticvoidUserTableUpdateIndexes(EState*estate,TupleTableSlot*slot);
7171

7272
staticboolprocess_remote_begin(StringInfos);
73-
staticvoidprocess_remote_transactional_message(StringInfos);
7473
staticvoidprocess_remote_message(StringInfos);
7574
staticvoidprocess_remote_commit(StringInfos);
7675
staticvoidprocess_remote_insert(StringInfos,Relationrel);
@@ -355,35 +354,43 @@ process_remote_begin(StringInfo s)
355354
}
356355

357356
staticvoid
358-
process_remote_transactional_message(StringInfos)
357+
process_remote_message(StringInfos)
359358
{
360-
intrc;
359+
charaction=pq_getmsgbyte(s);
361360
intmessageSize=pq_getmsgint(s,4);
362-
charconst*stmt=pq_getmsgbytes(s,messageSize);
361+
charconst*messageBody=pq_getmsgbytes(s,messageSize);
363362

364-
MTM_LOG1("%d: Executing utility statement %s",MyProcPid,stmt);
365-
SPI_connect();
366-
ActivePortal->sourceText=stmt;
367-
rc=SPI_execute(stmt, false,0);
368-
SPI_finish();
369-
if (rc<0)
370-
elog(ERROR,"Failed to execute utility statement %s",stmt);
363+
switch (action)
364+
{
365+
case'C':
366+
{
367+
MTM_LOG1("%d: Executing non-tx utility statement %s",MyProcPid,messageBody);
368+
SetCurrentStatementStartTimestamp();
369+
StartTransactionCommand();
370+
/* intentional falldown to the next case */
371+
}
372+
case'D':
373+
{
374+
intrc;
375+
376+
MTM_LOG1("%d: Executing utility statement %s",MyProcPid,messageBody);
377+
SPI_connect();
378+
ActivePortal->sourceText=messageBody;
379+
rc=SPI_execute(messageBody, false,0);
380+
SPI_finish();
381+
if (rc<0)
382+
elog(ERROR,"Failed to execute utility statement %s",messageBody);
383+
break;
384+
}
385+
case'L':
386+
{
387+
MTM_LOG3("%ld: Process deadlock message with size %d from %d",MtmGetSystemTime(),messageSize,MtmReplicationNodeId);
388+
MtmUpdateLockGraph(MtmReplicationNodeId,messageBody,messageSize);
389+
break;
390+
}
391+
}
371392

372-
//XXX: create messages for tables localization too.
373-
// if (strcmp(relname, MULTIMASTER_LOCAL_TABLES_TABLE) == 0) {
374-
// char* schema = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
375-
// char* name = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
376-
// MtmMakeTableLocal(schema, name);
377-
// }
378-
}
379393

380-
staticvoid
381-
process_remote_message(StringInfos)
382-
{
383-
intmessageSize=pq_getmsgint(s,4);
384-
charconst*messageBody=pq_getmsgbytes(s,messageSize);
385-
MTM_LOG3("%ld: Process deadlock message with size %d from %d",MtmGetSystemTime(),messageSize,MtmReplicationNodeId);
386-
MtmUpdateLockGraph(MtmReplicationNodeId,messageBody,messageSize);
387394
}
388395

389396
staticvoid
@@ -1049,16 +1056,10 @@ void MtmExecutor(void* work, size_t size)
10491056
s.len=save_len;
10501057
continue;
10511058
}
1052-
case'G':
1053-
case'E':
1054-
{
1055-
process_remote_transactional_message(&s);
1056-
continue;
1057-
}
1058-
case'L':
1059+
case'M':
10591060
{
10601061
process_remote_message(&s);
1061-
break;
1062+
continue;
10621063
}
10631064
default:
10641065
elog(ERROR,"unknown action of type %c",action);

‎pglogical_proto.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pglogical_write_message(StringInfo out,
137137
{
138138
MTM_LOG1("Send deadlock message to node %d",MtmReplicationNodeId);
139139
}
140-
elseif (*prefix=='G')
140+
elseif (*prefix=='D')
141141
{
142142
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN)
143143
{
@@ -149,8 +149,14 @@ pglogical_write_message(StringInfo out,
149149
elseif (*prefix=='E')
150150
{
151151
DDLInProress= false;
152+
/*
153+
* we use End message only as indicator of DDL transaction finish,
154+
* so no need to send that to replicas.
155+
*/
156+
return;
152157
}
153158

159+
pq_sendbyte(out,'M');
154160
pq_sendbyte(out,*prefix);
155161
pq_sendint(out,sz,4);
156162
pq_sendbytes(out,message,sz);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp