Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit37a2e58

Browse files
committed
Do not set XMAX_INVALID for transaction which are in progress
2 parents258b5f8 +10b4690 commit37a2e58

File tree

4 files changed

+88
-60
lines changed

4 files changed

+88
-60
lines changed

‎contrib/mmts/multimaster--1.0.sql‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ CREATE FUNCTION mtm.drop_node(node integer, drop_slot bool default false) RETURN
1313
AS'MODULE_PATHNAME','mtm_drop_node'
1414
LANGUAGE C;
1515

16-
CREATEFUNCTIONmtm.add_node(conn_strcstring) RETURNS void
16+
CREATEFUNCTIONmtm.add_node(conn_strtext) RETURNS void
1717
AS'MODULE_PATHNAME','mtm_add_node'
1818
LANGUAGE C;
1919

‎contrib/mmts/multimaster.c‎

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
#include"pglogical_output/hooks.h"
6363
#include"parser/analyze.h"
6464
#include"parser/parse_relation.h"
65+
#include"tcop/pquery.h"
6566

6667
#include"multimaster.h"
6768
#include"ddd.h"
@@ -150,7 +151,7 @@ static void MtmShmemStartup(void);
150151
staticBgwPool*MtmPoolConstructor(void);
151152
staticboolMtmRunUtilityStmt(PGconn*conn,charconst*sql,char**errmsg);
152153
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError);
153-
staticboolMtmProcessDDLCommand(charconst*queryString);
154+
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional);
154155

155156
MtmState*Mtm;
156157

@@ -3022,7 +3023,7 @@ mtm_drop_node(PG_FUNCTION_ARGS)
30223023
Datum
30233024
mtm_add_node(PG_FUNCTION_ARGS)
30243025
{
3025-
char*connStr=PG_GETARG_CSTRING(0);
3026+
char*connStr=text_to_cstring(PG_GETARG_TEXT_PP(0));
30263027

30273028
if (Mtm->nAllNodes==MtmMaxNodes) {
30283029
elog(ERROR,"Maximal number of nodes %d is reached",MtmMaxNodes);
@@ -3729,7 +3730,7 @@ static char * MtmGucSerialize(void)
37293730
* -------------------------------------------
37303731
*/
37313732

3732-
staticboolMtmProcessDDLCommand(charconst*queryString)
3733+
staticboolMtmProcessDDLCommand(charconst*queryString,booltransactional)
37333734
{
37343735
char*queryWithContext;
37353736
char*gucContext;
@@ -3748,7 +3749,12 @@ static bool MtmProcessDDLCommand(char const* queryString)
37483749
}
37493750

37503751
MTM_LOG1("Sending utility: %s",queryWithContext);
3751-
LogLogicalMessage("G",queryWithContext,strlen(queryWithContext)+1, true);
3752+
if (transactional)
3753+
/* DDL */
3754+
LogLogicalMessage("D",queryWithContext,strlen(queryWithContext)+1, true);
3755+
else
3756+
/* CONCURRENT DDL */
3757+
LogLogicalMessage("C",queryWithContext,strlen(queryWithContext)+1, false);
37523758

37533759
MtmTx.containsDML= true;
37543760
return false;
@@ -3785,17 +3791,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
37853791
MTM_LOG3("%d: Process utility statement %s",MyProcPid,queryString);
37863792
switch (nodeTag(parsetree))
37873793
{
3788-
caseT_IndexStmt:
3789-
{
3790-
IndexStmt*stmt= (IndexStmt*)parsetree;
3791-
if (stmt->concurrent) {
3792-
stmt->concurrent= false;
3793-
elog(WARNING,"Disable concurrent option for index creation");
3794-
}
3795-
break;
3796-
}
3797-
3798-
caseT_TransactionStmt:
3794+
caseT_TransactionStmt:
37993795
{
38003796
TransactionStmt*stmt= (TransactionStmt*)parsetree;
38013797
switch (stmt->kind)
@@ -3893,6 +3889,30 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
38933889
}
38943890
break;
38953891

3892+
caseT_IndexStmt:
3893+
{
3894+
IndexStmt*indexStmt= (IndexStmt*)parsetree;
3895+
if (indexStmt->concurrent&& !IsTransactionBlock())
3896+
{
3897+
skipCommand= true;
3898+
MtmProcessDDLCommand(queryString, false);
3899+
MtmTx.isDistributed= false;
3900+
}
3901+
}
3902+
break;
3903+
3904+
caseT_DropStmt:
3905+
{
3906+
DropStmt*stmt= (DropStmt*)parsetree;
3907+
if (stmt->removeType==OBJECT_INDEX&&stmt->concurrent&& !IsTransactionBlock())
3908+
{
3909+
skipCommand= true;
3910+
MtmProcessDDLCommand(queryString, false);
3911+
MtmTx.isDistributed= false;
3912+
}
3913+
}
3914+
break;
3915+
38963916
/* Copy need some special care */
38973917
caseT_CopyStmt:
38983918
{
@@ -3926,13 +3946,15 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39263946
if (!skipCommand&& (context==PROCESS_UTILITY_TOPLEVEL||MtmUtilityProcessedInXid!=GetCurrentTransactionId()))
39273947
MtmUtilityProcessedInXid=InvalidTransactionId;
39283948

3929-
if (context==PROCESS_UTILITY_TOPLEVEL||context==PROCESS_UTILITY_QUERY)
3930-
{
3931-
if (!skipCommand&& !MtmTx.isReplicated&& (MtmUtilityProcessedInXid==InvalidTransactionId)) {
3932-
MtmUtilityProcessedInXid=GetCurrentTransactionId();
3933-
MtmProcessDDLCommand(queryString);
3934-
executed= true;
3935-
}
3949+
if (!skipCommand&& !MtmTx.isReplicated&& (MtmUtilityProcessedInXid==InvalidTransactionId)) {
3950+
MtmUtilityProcessedInXid=GetCurrentTransactionId();
3951+
3952+
if (context==PROCESS_UTILITY_TOPLEVEL)
3953+
MtmProcessDDLCommand(queryString, true);
3954+
else
3955+
MtmProcessDDLCommand(ActivePortal->sourceText, true);
3956+
3957+
executed= true;
39363958
}
39373959

39383960
if (PreviousProcessUtilityHook!=NULL)
@@ -3945,19 +3967,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39453967
standard_ProcessUtility(parsetree,queryString,context,
39463968
params,dest,completionTag);
39473969
}
3948-
39493970
if (!MtmVolksWagenMode&&MtmTx.isDistributed&&XactIsoLevel!=XACT_REPEATABLE_READ) {
39503971
elog(ERROR,"Isolation level %s is not supported by multimaster",isoLevelStr[XactIsoLevel]);
39513972
}
3952-
3973+
39533974
if (MyXactAccessedTempRel)
39543975
{
39553976
MTM_LOG1("Xact accessed temp table, stopping replication");
39563977
MtmTx.isDistributed= false;/* Skip */
39573978
MtmTx.snapshot=INVALID_CSN;
39583979
}
39593980

3960-
if (executed)
3981+
if (executed&& !skipCommand)
39613982
{
39623983
MtmFinishDDLCommand();
39633984
}

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
7070
staticvoidUserTableUpdateIndexes(EState*estate,TupleTableSlot*slot);
7171

7272
staticboolprocess_remote_begin(StringInfos);
73-
staticvoidprocess_remote_transactional_message(StringInfos);
7473
staticvoidprocess_remote_message(StringInfos);
7574
staticvoidprocess_remote_commit(StringInfos);
7675
staticvoidprocess_remote_insert(StringInfos,Relationrel);
@@ -355,35 +354,43 @@ process_remote_begin(StringInfo s)
355354
}
356355

357356
staticvoid
358-
process_remote_transactional_message(StringInfos)
357+
process_remote_message(StringInfos)
359358
{
360-
intrc;
359+
charaction=pq_getmsgbyte(s);
361360
intmessageSize=pq_getmsgint(s,4);
362-
charconst*stmt=pq_getmsgbytes(s,messageSize);
361+
charconst*messageBody=pq_getmsgbytes(s,messageSize);
363362

364-
MTM_LOG1("%d: Executing utility statement %s",MyProcPid,stmt);
365-
SPI_connect();
366-
ActivePortal->sourceText=stmt;
367-
rc=SPI_execute(stmt, false,0);
368-
SPI_finish();
369-
if (rc<0)
370-
elog(ERROR,"Failed to execute utility statement %s",stmt);
363+
switch (action)
364+
{
365+
case'C':
366+
{
367+
MTM_LOG1("%d: Executing non-tx utility statement %s",MyProcPid,messageBody);
368+
SetCurrentStatementStartTimestamp();
369+
StartTransactionCommand();
370+
/* intentional falldown to the next case */
371+
}
372+
case'D':
373+
{
374+
intrc;
375+
376+
MTM_LOG1("%d: Executing utility statement %s",MyProcPid,messageBody);
377+
SPI_connect();
378+
ActivePortal->sourceText=messageBody;
379+
rc=SPI_execute(messageBody, false,0);
380+
SPI_finish();
381+
if (rc<0)
382+
elog(ERROR,"Failed to execute utility statement %s",messageBody);
383+
break;
384+
}
385+
case'L':
386+
{
387+
MTM_LOG3("%ld: Process deadlock message with size %d from %d",MtmGetSystemTime(),messageSize,MtmReplicationNodeId);
388+
MtmUpdateLockGraph(MtmReplicationNodeId,messageBody,messageSize);
389+
break;
390+
}
391+
}
371392

372-
//XXX: create messages for tables localization too.
373-
// if (strcmp(relname, MULTIMASTER_LOCAL_TABLES_TABLE) == 0) {
374-
// char* schema = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
375-
// char* name = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
376-
// MtmMakeTableLocal(schema, name);
377-
// }
378-
}
379393

380-
staticvoid
381-
process_remote_message(StringInfos)
382-
{
383-
intmessageSize=pq_getmsgint(s,4);
384-
charconst*messageBody=pq_getmsgbytes(s,messageSize);
385-
MTM_LOG3("%ld: Process deadlock message with size %d from %d",MtmGetSystemTime(),messageSize,MtmReplicationNodeId);
386-
MtmUpdateLockGraph(MtmReplicationNodeId,messageBody,messageSize);
387394
}
388395

389396
staticvoid
@@ -1049,16 +1056,10 @@ void MtmExecutor(void* work, size_t size)
10491056
s.len=save_len;
10501057
continue;
10511058
}
1052-
case'G':
1053-
case'E':
1054-
{
1055-
process_remote_transactional_message(&s);
1056-
continue;
1057-
}
1058-
case'L':
1059+
case'M':
10591060
{
10601061
process_remote_message(&s);
1061-
break;
1062+
continue;
10621063
}
10631064
default:
10641065
elog(ERROR,"unknown action of type %c",action);

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pglogical_write_message(StringInfo out,
137137
{
138138
MTM_LOG1("Send deadlock message to node %d",MtmReplicationNodeId);
139139
}
140-
elseif (*prefix=='G')
140+
elseif (*prefix=='D')
141141
{
142142
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN)
143143
{
@@ -149,8 +149,14 @@ pglogical_write_message(StringInfo out,
149149
elseif (*prefix=='E')
150150
{
151151
DDLInProress= false;
152+
/*
153+
* we use End message only as indicator of DDL transaction finish,
154+
* so no need to send that to replicas.
155+
*/
156+
return;
152157
}
153158

159+
pq_sendbyte(out,'M');
154160
pq_sendbyte(out,*prefix);
155161
pq_sendint(out,sz,4);
156162
pq_sendbytes(out,message,sz);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp