Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8072031

Browse files
committed
Support users 2PC
1 parent9958c15 commit8072031

File tree

4 files changed

+95
-13
lines changed

4 files changed

+95
-13
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -997,15 +997,18 @@ static void MtmReceiver(Datum arg)
997997
/* All nodes are finished their transactions */
998998
if (ts->status==TRANSACTION_STATUS_ABORTED) {
999999
MtmWakeUpBackend(ts);
1000-
}elseif (MtmUseDtm) {
1001-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1002-
ts->nVotes=1;/* I voted myself */
1003-
MTM_TXTRACE(ts,"MtmTransReceiver send MSG_PREPARE");
1004-
MtmSend2PCMessage(ts,MSG_PREPARE);
10051000
}else {
10061001
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1007-
ts->status=TRANSACTION_STATUS_UNKNOWN;
1008-
MtmWakeUpBackend(ts);
1002+
if (ts->isTwoPhase) {
1003+
MtmWakeUpBackend(ts);
1004+
}elseif (MtmUseDtm) {
1005+
ts->nVotes=1;/* I voted myself */
1006+
MTM_TXTRACE(ts,"MtmTransReceiver send MSG_PREPARE");
1007+
MtmSend2PCMessage(ts,MSG_PREPARE);
1008+
}else {
1009+
ts->status=TRANSACTION_STATUS_UNKNOWN;
1010+
MtmWakeUpBackend(ts);
1011+
}
10091012
}
10101013
}
10111014
}

‎contrib/mmts/multimaster.c‎

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ static void MtmBeginTransaction(MtmCurrentTrans* x);
130130
staticvoidMtmPrePrepareTransaction(MtmCurrentTrans*x);
131131
staticvoidMtmPostPrepareTransaction(MtmCurrentTrans*x);
132132
staticvoidMtmAbortPreparedTransaction(MtmCurrentTrans*x);
133+
staticvoidMtmCommitPreparedTransaction(MtmCurrentTrans*x);
133134
staticvoidMtmEndTransaction(MtmCurrentTrans*x,boolcommit);
134135
staticboolMtmTwoPhaseCommit(MtmCurrentTrans*x);
135136
staticTransactionIdMtmGetOldestXmin(Relationrel,boolignoreVacuum);
@@ -685,6 +686,9 @@ MtmXactCallback(XactEvent event, void *arg)
685686
caseXACT_EVENT_ABORT_PREPARED:
686687
MtmAbortPreparedTransaction(&MtmTx);
687688
break;
689+
caseXACT_EVENT_COMMIT_PREPARED:
690+
MtmCommitPreparedTransaction(&MtmTx);
691+
break;
688692
caseXACT_EVENT_COMMIT:
689693
MtmEndTransaction(&MtmTx, true);
690694
break;
@@ -793,6 +797,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
793797
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
794798
ts->snapshot=x->snapshot;
795799
ts->isLocal= true;
800+
ts->isTwoPhase=x->isTwoPhase;
796801
if (!found) {
797802
ts->isEnqueued= false;
798803
ts->isActive= false;
@@ -970,6 +975,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
970975
x->status=ts->status;
971976
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
972977
MtmUnlock();
978+
if (x->isTwoPhase) {
979+
MtmResetTransaction();
980+
}
973981
}
974982
//if (x->gid[0]) MTM_LOG1("Prepared transaction %d (%s) csn=%ld at %ld: %d", x->xid, x->gid, ts->csn, MtmGetCurrentTime(), ts->status);
975983
if (Mtm->inject2PCError==3) {
@@ -980,6 +988,74 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
980988
MTM_TXTRACE(x,"PostPrepareTransaction Finish");
981989
}
982990

991+
staticvoid
992+
MtmCommitPreparedTransaction(MtmCurrentTrans*x)
993+
{
994+
MtmTransMap*tm;
995+
MtmTransState*ts;
996+
997+
if (Mtm->status==MTM_RECOVERY||x->isReplicated||x->isPrepared) {/* Ignore auto-2PC originated by multimaster */
998+
return;
999+
}
1000+
MtmLock(LW_EXCLUSIVE);
1001+
tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_FIND,NULL);
1002+
if (tm==NULL) {
1003+
elog(WARNING,"Global transaciton ID '%s' is not found",x->gid);
1004+
}else {
1005+
time_ttransTimeout=MSEC_TO_USEC(Mtm2PCMinTimeout);
1006+
intnConfigChanges=Mtm->nConfigChanges;
1007+
timestamp_tstart=MtmGetSystemTime();
1008+
intresult=0;
1009+
1010+
Assert(tm->state!=NULL);
1011+
MTM_LOG1("Commit prepared transaction %d with gid='%s'",x->xid,x->gid);
1012+
ts=tm->state;
1013+
1014+
Assert(MtmIsCoordinator(ts));
1015+
1016+
ts->votingCompleted= false;
1017+
ts->nVotes=1;/* I voted myself */
1018+
ts->procno=MyProc->pgprocno;
1019+
MTM_TXTRACE(ts,"Coordinator sends MSG_PREPARE");
1020+
MtmSend2PCMessage(ts,MSG_PREPARE);
1021+
1022+
/* Wait votes from all nodes until: */
1023+
while (!ts->votingCompleted/* all nodes voted */
1024+
&&nConfigChanges==Mtm->nConfigChanges/* configarion is changed */
1025+
&&Mtm->status==MTM_ONLINE/* node is not online */
1026+
&&ts->status!=TRANSACTION_STATUS_ABORTED/* transaction is aborted */
1027+
&&start+transTimeout >=MtmGetSystemTime())/* timeout is expired */
1028+
{
1029+
MtmUnlock();
1030+
MTM_TXTRACE(x,"CommitPreparedTransaction WaitLatch Start");
1031+
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,MtmHeartbeatRecvTimeout);
1032+
MTM_TXTRACE(x,"CommitPreparedTransaction WaitLatch Finish");
1033+
/* Emergency bailout if postmaster has died */
1034+
if (result&WL_POSTMASTER_DEATH) {
1035+
proc_exit(1);
1036+
}
1037+
if (result&WL_LATCH_SET) {
1038+
MTM_LOG3("Latch signaled at %ld",MtmGetSystemTime());
1039+
ResetLatch(&MyProc->procLatch);
1040+
}
1041+
MtmLock(LW_EXCLUSIVE);
1042+
}
1043+
if (ts->status!=TRANSACTION_STATUS_ABORTED&& (!ts->votingCompleted||nConfigChanges!=Mtm->nConfigChanges)) {
1044+
if (nConfigChanges!=Mtm->nConfigChanges) {
1045+
elog(WARNING,"Transaction %d (%s) is aborted because cluster configuration is changed during commit",x->xid,x->gid);
1046+
}else {
1047+
elog(WARNING,"Transaction %d (%s) is aborted because of %d msec timeout expiration, prepare time %d msec",
1048+
x->xid,x->gid, (int)USEC_TO_MSEC(transTimeout), (int)USEC_TO_MSEC(ts->csn-x->snapshot));
1049+
}
1050+
MtmAbortTransaction(ts);
1051+
}
1052+
x->status=ts->status;
1053+
x->xid=ts->xid;
1054+
x->isPrepared= true;
1055+
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
1056+
}
1057+
MtmUnlock();
1058+
}
9831059

9841060
staticvoid
9851061
MtmAbortPreparedTransaction(MtmCurrentTrans*x)
@@ -1009,9 +1085,9 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
10091085
staticvoid
10101086
MtmEndTransaction(MtmCurrentTrans*x,boolcommit)
10111087
{
1012-
MTM_LOG3("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, gid=%s -> %s",
1013-
MyProcPid,x->xid,x->isPrepared,x->isReplicated,x->isDistributed,x->gid,commit ?"commit" :"abort");
1014-
if (x->status!=TRANSACTION_STATUS_ABORTED&&x->isDistributed&& (x->isPrepared||x->isReplicated)) {
1088+
MTM_LOG1("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s",
1089+
MyProcPid,x->xid,x->isPrepared,x->isReplicated,x->isDistributed,x->isTwoPhase,x->gid,commit ?"commit" :"abort");
1090+
if (x->status!=TRANSACTION_STATUS_ABORTED&&x->isDistributed&& (x->isPrepared||x->isReplicated)&& !x->isTwoPhase) {
10151091
MtmTransState*ts=NULL;
10161092
MtmLock(LW_EXCLUSIVE);
10171093
if (x->isPrepared) {
@@ -3820,9 +3896,13 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
38203896
}
38213897
break;
38223898
caseTRANS_STMT_PREPARE:
3899+
MtmTx.isTwoPhase= true;
3900+
strcpy(MtmTx.gid,stmt->gid);
3901+
break;
3902+
/* nobreak */
38233903
caseTRANS_STMT_COMMIT_PREPARED:
38243904
caseTRANS_STMT_ROLLBACK_PREPARED:
3825-
MtmTx.isTwoPhase= true;
3905+
Assert(!MtmTx.isTwoPhase);
38263906
strcpy(MtmTx.gid,stmt->gid);
38273907
break;
38283908
default:

‎contrib/mmts/multimaster.h‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ typedef struct MtmTransState
213213
boolisLocal;/* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
214214
boolisEnqueued;/* Transaction is inserted in queue */
215215
boolisActive;/* Transaction is active */
216+
boolisTwoPhase;/* user level 2PC */
216217
nodemask_tparticipantsMask;/* Mask of nodes involved in transaction */
217218
nodemask_tvotedMask;/* Mask of voted nodes */
218219
TransactionIdxids[1];/* [Mtm->nAllNodes]: transaction ID at replicas */

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,6 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
186186
else
187187
Assert(false);
188188

189-
Assert(flags!=PGLOGICAL_COMMIT_PREPARED||txn->xid<1000||MtmTransactionRecords!=1);
190-
191189
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
192190
// if (MtmIsFilteredTxn) {
193191
// Assert(MtmTransactionRecords == 0);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp