Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7bd4516

Browse files
committed
Ressurect support of concurrent DDL statements
1 parentc6ea4e6 commit7bd4516

File tree

6 files changed

+25
-19
lines changed

6 files changed

+25
-19
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ static void MtmBeginTransaction(MtmCurrentTrans* x);
133133
staticvoidMtmPrePrepareTransaction(MtmCurrentTrans*x);
134134
staticvoidMtmPostPrepareTransaction(MtmCurrentTrans*x);
135135
staticvoidMtmAbortPreparedTransaction(MtmCurrentTrans*x);
136-
staticvoidMtmCommitPreparedTransaction(MtmCurrentTrans*x);
136+
staticvoidMtmPreCommitPreparedTransaction(MtmCurrentTrans*x);
137137
staticvoidMtmEndTransaction(MtmCurrentTrans*x,boolcommit);
138138
staticboolMtmTwoPhaseCommit(MtmCurrentTrans*x);
139139
staticTransactionIdMtmGetOldestXmin(Relationrel,boolignoreVacuum);
@@ -705,8 +705,8 @@ MtmXactCallback(XactEvent event, void *arg)
705705
caseXACT_EVENT_ABORT_PREPARED:
706706
MtmAbortPreparedTransaction(&MtmTx);
707707
break;
708-
caseXACT_EVENT_COMMIT_PREPARED:
709-
MtmCommitPreparedTransaction(&MtmTx);
708+
caseXACT_EVENT_PRE_COMMIT_PREPARED:
709+
MtmPreCommitPreparedTransaction(&MtmTx);
710710
break;
711711
caseXACT_EVENT_COMMIT:
712712
MtmEndTransaction(&MtmTx, true);
@@ -1149,7 +1149,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
11491149
}
11501150

11511151
staticvoid
1152-
MtmCommitPreparedTransaction(MtmCurrentTrans*x)
1152+
MtmPreCommitPreparedTransaction(MtmCurrentTrans*x)
11531153
{
11541154
MtmTransMap*tm;
11551155
MtmTransState*ts;
@@ -1181,6 +1181,7 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
11811181
Mtm2PCVoting(x,ts);
11821182

11831183
x->xid=ts->xid;
1184+
x->csn=ts->csn;
11841185
x->isPrepared= true;
11851186
}
11861187
MtmUnlock();
@@ -3529,6 +3530,7 @@ bool MtmFilterTransaction(char* record, int size)
35293530
}
35303531
restart_lsn=origin_node==MtmReplicationNodeId ?end_lsn :origin_lsn;
35313532
if (Mtm->nodes[origin_node-1].restartLSN<restart_lsn) {
3533+
Assert(restart_lsn!=InvalidXLogRecPtr);
35323534
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)",MtmReplicationNodeId,Mtm->nodes[MtmReplicationNodeId-1].restartLSN,restart_lsn);
35333535
Mtm->nodes[origin_node-1].restartLSN=restart_lsn;
35343536
}else {
@@ -4442,19 +4444,14 @@ static void MtmProcessDDLCommand(char const* queryString, bool transactional)
44424444
}
44434445
MTM_LOG3("Sending utility: %s",queryString);
44444446
if (transactional) {
4445-
/* DDL */
4447+
/*TransactionalDDL */
44464448
LogLogicalMessage("D",queryString,strlen(queryString)+1, true);
44474449
MtmTx.containsDML= true;
4448-
}else {
4449-
char*gucCtx=MtmGucSerialize();
4450-
return;
4451-
if (*gucCtx) {
4452-
queryString=psprintf("%s; %s",gucCtx,queryString);
4453-
}
4454-
/* CONCURRENT DDL */
4450+
}else {
4451+
MTM_LOG1("Execute concurrent DDL: %s",queryString);
4452+
/* Concurrent DDL */
44554453
XLogFlush(LogLogicalMessage("C",queryString,strlen(queryString)+1, false));
44564454
}
4457-
return;
44584455
}
44594456

44604457
staticvoidMtmFinishDDLCommand()
@@ -4542,7 +4539,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
45424539

45434540
caseT_VacuumStmt:
45444541
skipCommand= true;
4545-
#if0
45464542
if (context==PROCESS_UTILITY_TOPLEVEL) {
45474543
MtmProcessDDLCommand(queryString, false);
45484544
MtmTx.isDistributed= false;
@@ -4553,7 +4549,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
45534549
MemoryContextSwitchTo(oldContext);
45544550
return;
45554551
}
4556-
#endif
45574552
break;
45584553

45594554
caseT_CreateDomainStmt:

‎contrib/mmts/pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ process_remote_message(StringInfo s)
384384
{
385385
MTM_LOG1("%d: Executing non-tx utility statement %s",MyProcPid,messageBody);
386386
SetCurrentStatementStartTimestamp();
387+
MtmResetTransaction();
387388
StartTransactionCommand();
388389
standalone= true;
389390
/* intentional falldown to the next case */
@@ -413,7 +414,6 @@ process_remote_message(StringInfo s)
413414
false, false,
414415
NULL,
415416
NULL);
416-
417417
/* Run parse analysis ... */
418418
MtmIndexStmt=transformIndexStmt(relid,MtmIndexStmt,messageBody);
419419

‎contrib/mmts/pglogical_receiver.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -543,9 +543,13 @@ pglogical_receiver_main(Datum main_arg)
543543
MtmSpillToFile(spill_file,buf.data,buf.used);
544544
ByteBufferReset(&buf);
545545
}
546-
if (stmt[0]=='M'&& (stmt[1]=='L'||stmt[1]=='A')) {
546+
if (stmt[0]=='M'&& (stmt[1]=='L'||stmt[1]=='A'||stmt[1]=='C')) {
547547
MTM_LOG3("Process '%c' message from %d",stmt[1],nodeId);
548-
MtmExecutor(stmt,rc-hdr_len);
548+
if (stmt[1]=='C') {/* concurrent DDL */
549+
MtmExecute(stmt,rc-hdr_len);
550+
}else {
551+
MtmExecutor(stmt,rc-hdr_len);
552+
}
549553
}else {
550554
ByteBufferAppend(&buf,stmt,rc-hdr_len);
551555
if (stmt[0]=='C')/* commit */

‎src/backend/access/transam/twophase.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1555,6 +1555,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
15551555
SharedInvalidationMessage*invalmsgs;
15561556
inti;
15571557

1558+
1559+
if (isCommit)
1560+
{
1561+
CallXactCallbacks(XACT_EVENT_PRE_COMMIT_PREPARED);
1562+
}
1563+
15581564
/*
15591565
* Validate the GID, and lock the GXACT to ensure that two backends do not
15601566
* try to commit the same GID at once.

‎src/backend/replication/logical/decode.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
746746
xl_xact_parsed_abort*parsed,TransactionIdxid)
747747
{
748748
inti;
749-
XLogRecPtrorigin_lsn=InvalidXLogRecPtr;
749+
XLogRecPtrorigin_lsn=parsed->origin_lsn;
750750
XLogRecPtrcommit_time=InvalidXLogRecPtr;
751751
RepOriginIdorigin_id=XLogRecGetOrigin(buf->record);
752752

‎src/include/access/xact.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ typedef enum
9898
XACT_EVENT_PARALLEL_PRE_COMMIT,
9999
XACT_EVENT_PRE_PREPARE,
100100
XACT_EVENT_POST_PREPARE,
101+
XACT_EVENT_PRE_COMMIT_PREPARED,
101102
XACT_EVENT_COMMIT_PREPARED,
102103
XACT_EVENT_ABORT_PREPARED,
103104
XACT_EVENT_COMMIT_COMMAND

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp