Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8fefcf6

Browse files
knizhnikkelvich
authored andcommitted
Support 2PC in multimaster
1 parentd3a8cdc commit8fefcf6

File tree

2 files changed

+85
-6
lines changed

2 files changed

+85
-6
lines changed

‎multimaster.c

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ typedef struct {
6161
boolisReplicated;/* transaction on replica */
6262
boolisDistributed;/* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6363
boolcontainsDML;/* transaction contains DML statements */
64-
csn_tsnapshot;/* transaction snaphsot */
64+
boolisPrepared;/* transaction is prepared as part of 2PC */
65+
csn_tsnapshot;/* transaction snaphsot */
6566
}MtmCurrentTrans;
6667

6768
/* #define USE_SPINLOCK 1 */
@@ -94,6 +95,8 @@ static void MtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
9495
staticvoidMtmInitialize(void);
9596
staticvoidMtmXactCallback(XactEventevent,void*arg);
9697
staticvoidMtmBeginTransaction(MtmCurrentTrans*x);
98+
staticvoidMtmPrecommitTransaction(MtmCurrentTrans*x);
99+
staticboolMtmCommitTransaction(TransactionIdxid,intnsubxids,TransactionId*subxids);
97100
staticvoidMtmPrepareTransaction(MtmCurrentTrans*x);
98101
staticvoidMtmEndTransaction(MtmCurrentTrans*x,boolcommit);
99102
staticTransactionIdMtmGetOldestXmin(Relationrel,boolignoreVacuum);
@@ -143,6 +146,7 @@ static int MtmWorkers;
143146
staticintMtmVacuumDelay;
144147
staticintMtmMinRecoveryLag;
145148
staticintMtmMaxRecoveryLag;
149+
staticboolMtmUse2PC;
146150

147151
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
148152
staticProcessUtility_hook_typePreviousProcessUtilityHook;
@@ -467,6 +471,9 @@ MtmXactCallback(XactEvent event, void *arg)
467471
MtmBeginTransaction(&dtmTx);
468472
break;
469473
caseXACT_EVENT_PRE_COMMIT:
474+
MtmPrecommitTransaction(&dtmTx);
475+
break;
476+
caseXACT_EVENT_PREPARE:
470477
MtmPrepareTransaction(&dtmTx);
471478
break;
472479
caseXACT_EVENT_COMMIT:
@@ -498,6 +505,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
498505
x->isReplicated= false;
499506
x->isDistributed=MtmIsUserTransaction();
500507
x->containsDML= false;
508+
x->isPrepared= false;
501509
x->snapshot=MtmAssignCSN();
502510
x->gtid.xid=InvalidTransactionId;
503511
MtmUnlock();
@@ -561,10 +569,11 @@ MtmCheckClusterLock()
561569
}
562570

563571
/*
572+
* This functions is called as pre-commit callback.
564573
* We need to pass snapshot to WAL-sender, so create record in transaction status hash table
565574
* before commit
566575
*/
567-
staticvoidMtmPrepareTransaction(MtmCurrentTrans*x)
576+
staticvoidMtmPrecommitTransaction(MtmCurrentTrans*x)
568577
{
569578
MtmTransState*ts;
570579
inti;
@@ -608,6 +617,20 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
608617
MTM_TRACE("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n",getpid(),x->xid,ts->csn);
609618
}
610619

620+
staticvoid
621+
MtmPrepareTransaction(MtmCurrentTrans*x)
622+
{
623+
TransactionId*subxids;
624+
intnSubxids;
625+
MtmPrecommitTransaction(x);
626+
x->isPrepared= true;
627+
nSubxids=xactGetCommittedChildren(&subxids);
628+
if (!MtmCommitTransaction(x->xid,nSubxids,subxids))
629+
{
630+
elog(ERROR,"Commit of transaction %d is rejected by DTM",x->xid);
631+
}
632+
}
633+
611634
/**
612635
* Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
613636
* WAL overflow
@@ -755,7 +778,7 @@ static void
755778
MtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn)
756779
{
757780
MTM_TRACE("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d\n",getpid(),xid,dtmTx.xid,status,dtmTx.isDistributed);
758-
if (xid==dtmTx.xid&&dtmTx.isDistributed)
781+
if (xid==dtmTx.xid&&dtmTx.isDistributed&& !dtmTx.isPrepared)
759782
{
760783
if (status==TRANSACTION_STATUS_ABORTED|| !dtmTx.containsDML||dtm->status==MTM_RECOVERY)
761784
{
@@ -812,6 +835,18 @@ _PG_init(void)
812835
if (!process_shared_preload_libraries_in_progress)
813836
return;
814837

838+
DefineCustomBoolVariable(
839+
"multimaster.use_2pc",
840+
"Use two phase commit",
841+
"Replace normal commit with two phase commit",
842+
&MtmUse2PC,
843+
false,
844+
PGC_BACKEND,
845+
0,
846+
NULL,
847+
NULL,
848+
NULL
849+
);
815850
DefineCustomIntVariable(
816851
"multimaster.min_recovery_lag",
817852
"Minamal lag of WAL-sender performing recovery after which cluster is locked until recovery is completed",
@@ -1313,13 +1348,54 @@ static bool MtmProcessDDLCommand(char const* queryString)
13131348
return false;
13141349
}
13151350

1351+
/*
1352+
* Genenerate global transaction identifier for two-pahse commit.
1353+
* It should be unique for all nodes
1354+
*/
1355+
staticchar*
1356+
MtmGenerateGid()
1357+
{
1358+
staticintlocalCount;
1359+
returnpsprintf("GID-%d-%d-%d",MtmNodeId,MyProcPid,++localCount);
1360+
}
1361+
13161362
staticvoidMtmProcessUtility(Node*parsetree,constchar*queryString,
13171363
ProcessUtilityContextcontext,ParamListInfoparams,
13181364
DestReceiver*dest,char*completionTag)
13191365
{
13201366
boolskipCommand;
13211367
switch (nodeTag(parsetree))
13221368
{
1369+
caseT_TransactionStmt:
1370+
{
1371+
TransactionStmt*stmt= (TransactionStmt*)parsetree;
1372+
switch (stmt->kind)
1373+
{
1374+
caseTRANS_STMT_COMMIT:
1375+
if (MtmUse2PC) {
1376+
char*gid=MtmGenerateGid();
1377+
if (!PrepareTransactionBlock(gid))
1378+
{
1379+
/* report unsuccessful commit in completionTag */
1380+
if (completionTag) {
1381+
strcpy(completionTag,"ROLLBACK");
1382+
}
1383+
/* ??? Should we do explicit rollback */
1384+
}else {
1385+
FinishPreparedTransaction(gid, true);
1386+
}
1387+
return;
1388+
}
1389+
break;
1390+
caseTRANS_STMT_PREPARE:
1391+
caseTRANS_STMT_COMMIT_PREPARED:
1392+
caseTRANS_STMT_ROLLBACK_PREPARED:
1393+
elog(ERROR,"Two phase commit is not supported by multimaster");
1394+
default:
1395+
break;
1396+
}
1397+
}
1398+
/* no break */
13231399
caseT_PlannedStmt:
13241400
caseT_ClosePortalStmt:
13251401
caseT_FetchStmt:
@@ -1333,7 +1409,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
13331409
caseT_LoadStmt:
13341410
caseT_VariableSetStmt:
13351411
caseT_VariableShowStmt:
1336-
caseT_TransactionStmt:
13371412
skipCommand= true;
13381413
break;
13391414
default:

‎pglogical_receiver.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,8 @@ pglogical_receiver_main(Datum main_arg)
209209
boolinsideTrans= false;
210210
#endif
211211
ByteBufferbuf;
212-
XLogRecPtroriginStartPos;
212+
XLogRecPtroriginStartPos=0;
213+
RepOriginIdoriginId;
213214

214215
/* Register functions for SIGTERM/SIGHUP management */
215216
pqsignal(SIGHUP,receiver_raw_sighup);
@@ -266,7 +267,10 @@ pglogical_receiver_main(Datum main_arg)
266267
resetPQExpBuffer(query);
267268
}
268269
/* Start logical replication at specified position */
269-
originStartPos=replorigin_session_get_progress(false);
270+
originId=replorigin_by_name(args->receiver_slot, true);
271+
if (originId!=InvalidRepOriginId) {
272+
originStartPos=replorigin_get_progress(originId, false);
273+
}
270274
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %u/%u (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d')",
271275
args->receiver_slot,
272276
(uint32) (originStartPos >>32),

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp