Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite0896c1

Browse files
knizhnikkelvich
authored andcommitted
Ressurect support of concurrent DDL statements
1 parent1f46538 commite0896c1

File tree

3 files changed

+17
-18
lines changed

3 files changed

+17
-18
lines changed

‎multimaster.c

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ static void MtmBeginTransaction(MtmCurrentTrans* x);
133133
staticvoidMtmPrePrepareTransaction(MtmCurrentTrans*x);
134134
staticvoidMtmPostPrepareTransaction(MtmCurrentTrans*x);
135135
staticvoidMtmAbortPreparedTransaction(MtmCurrentTrans*x);
136-
staticvoidMtmCommitPreparedTransaction(MtmCurrentTrans*x);
136+
staticvoidMtmPreCommitPreparedTransaction(MtmCurrentTrans*x);
137137
staticvoidMtmEndTransaction(MtmCurrentTrans*x,boolcommit);
138138
staticboolMtmTwoPhaseCommit(MtmCurrentTrans*x);
139139
staticTransactionIdMtmGetOldestXmin(Relationrel,boolignoreVacuum);
@@ -705,8 +705,8 @@ MtmXactCallback(XactEvent event, void *arg)
705705
caseXACT_EVENT_ABORT_PREPARED:
706706
MtmAbortPreparedTransaction(&MtmTx);
707707
break;
708-
caseXACT_EVENT_COMMIT_PREPARED:
709-
MtmCommitPreparedTransaction(&MtmTx);
708+
caseXACT_EVENT_PRE_COMMIT_PREPARED:
709+
MtmPreCommitPreparedTransaction(&MtmTx);
710710
break;
711711
caseXACT_EVENT_COMMIT:
712712
MtmEndTransaction(&MtmTx, true);
@@ -1149,7 +1149,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
11491149
}
11501150

11511151
staticvoid
1152-
MtmCommitPreparedTransaction(MtmCurrentTrans*x)
1152+
MtmPreCommitPreparedTransaction(MtmCurrentTrans*x)
11531153
{
11541154
MtmTransMap*tm;
11551155
MtmTransState*ts;
@@ -1181,6 +1181,7 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
11811181
Mtm2PCVoting(x,ts);
11821182

11831183
x->xid=ts->xid;
1184+
x->csn=ts->csn;
11841185
x->isPrepared= true;
11851186
}
11861187
MtmUnlock();
@@ -3529,6 +3530,7 @@ bool MtmFilterTransaction(char* record, int size)
35293530
}
35303531
restart_lsn=origin_node==MtmReplicationNodeId ?end_lsn :origin_lsn;
35313532
if (Mtm->nodes[origin_node-1].restartLSN<restart_lsn) {
3533+
Assert(restart_lsn!=InvalidXLogRecPtr);
35323534
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)",MtmReplicationNodeId,Mtm->nodes[MtmReplicationNodeId-1].restartLSN,restart_lsn);
35333535
Mtm->nodes[origin_node-1].restartLSN=restart_lsn;
35343536
}else {
@@ -4442,19 +4444,14 @@ static void MtmProcessDDLCommand(char const* queryString, bool transactional)
44424444
}
44434445
MTM_LOG3("Sending utility: %s",queryString);
44444446
if (transactional) {
4445-
/* DDL */
4447+
/*TransactionalDDL */
44464448
LogLogicalMessage("D",queryString,strlen(queryString)+1, true);
44474449
MtmTx.containsDML= true;
4448-
}else {
4449-
char*gucCtx=MtmGucSerialize();
4450-
return;
4451-
if (*gucCtx) {
4452-
queryString=psprintf("%s; %s",gucCtx,queryString);
4453-
}
4454-
/* CONCURRENT DDL */
4450+
}else {
4451+
MTM_LOG1("Execute concurrent DDL: %s",queryString);
4452+
/* Concurrent DDL */
44554453
XLogFlush(LogLogicalMessage("C",queryString,strlen(queryString)+1, false));
44564454
}
4457-
return;
44584455
}
44594456

44604457
staticvoidMtmFinishDDLCommand()
@@ -4542,7 +4539,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
45424539

45434540
caseT_VacuumStmt:
45444541
skipCommand= true;
4545-
#if0
45464542
if (context==PROCESS_UTILITY_TOPLEVEL) {
45474543
MtmProcessDDLCommand(queryString, false);
45484544
MtmTx.isDistributed= false;
@@ -4553,7 +4549,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
45534549
MemoryContextSwitchTo(oldContext);
45544550
return;
45554551
}
4556-
#endif
45574552
break;
45584553

45594554
caseT_CreateDomainStmt:

‎pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ process_remote_message(StringInfo s)
384384
{
385385
MTM_LOG1("%d: Executing non-tx utility statement %s",MyProcPid,messageBody);
386386
SetCurrentStatementStartTimestamp();
387+
MtmResetTransaction();
387388
StartTransactionCommand();
388389
standalone= true;
389390
/* intentional falldown to the next case */
@@ -413,7 +414,6 @@ process_remote_message(StringInfo s)
413414
false, false,
414415
NULL,
415416
NULL);
416-
417417
/* Run parse analysis ... */
418418
MtmIndexStmt=transformIndexStmt(relid,MtmIndexStmt,messageBody);
419419

‎pglogical_receiver.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -543,9 +543,13 @@ pglogical_receiver_main(Datum main_arg)
543543
MtmSpillToFile(spill_file,buf.data,buf.used);
544544
ByteBufferReset(&buf);
545545
}
546-
if (stmt[0]=='M'&& (stmt[1]=='L'||stmt[1]=='A')) {
546+
if (stmt[0]=='M'&& (stmt[1]=='L'||stmt[1]=='A'||stmt[1]=='C')) {
547547
MTM_LOG3("Process '%c' message from %d",stmt[1],nodeId);
548-
MtmExecutor(stmt,rc-hdr_len);
548+
if (stmt[1]=='C') {/* concurrent DDL */
549+
MtmExecute(stmt,rc-hdr_len);
550+
}else {
551+
MtmExecutor(stmt,rc-hdr_len);
552+
}
549553
}else {
550554
ByteBufferAppend(&buf,stmt,rc-hdr_len);
551555
if (stmt[0]=='C')/* commit */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp