Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8920f0b

Browse files
knizhnikkelvich
authored andcommitted
Support users 2PC
1 parentb093bb1 commit8920f0b

File tree

4 files changed

+95
-13
lines changed

4 files changed

+95
-13
lines changed

‎arbiter.c

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -997,15 +997,18 @@ static void MtmReceiver(Datum arg)
997997
/* All nodes are finished their transactions */
998998
if (ts->status==TRANSACTION_STATUS_ABORTED) {
999999
MtmWakeUpBackend(ts);
1000-
}elseif (MtmUseDtm) {
1001-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1002-
ts->nVotes=1;/* I voted myself */
1003-
MTM_TXTRACE(ts,"MtmTransReceiver send MSG_PREPARE");
1004-
MtmSend2PCMessage(ts,MSG_PREPARE);
10051000
}else {
10061001
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1007-
ts->status=TRANSACTION_STATUS_UNKNOWN;
1008-
MtmWakeUpBackend(ts);
1002+
if (ts->isTwoPhase) {
1003+
MtmWakeUpBackend(ts);
1004+
}elseif (MtmUseDtm) {
1005+
ts->nVotes=1;/* I voted myself */
1006+
MTM_TXTRACE(ts,"MtmTransReceiver send MSG_PREPARE");
1007+
MtmSend2PCMessage(ts,MSG_PREPARE);
1008+
}else {
1009+
ts->status=TRANSACTION_STATUS_UNKNOWN;
1010+
MtmWakeUpBackend(ts);
1011+
}
10091012
}
10101013
}
10111014
}

‎multimaster.c

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ static void MtmBeginTransaction(MtmCurrentTrans* x);
129129
staticvoidMtmPrePrepareTransaction(MtmCurrentTrans*x);
130130
staticvoidMtmPostPrepareTransaction(MtmCurrentTrans*x);
131131
staticvoidMtmAbortPreparedTransaction(MtmCurrentTrans*x);
132+
staticvoidMtmCommitPreparedTransaction(MtmCurrentTrans*x);
132133
staticvoidMtmEndTransaction(MtmCurrentTrans*x,boolcommit);
133134
staticboolMtmTwoPhaseCommit(MtmCurrentTrans*x);
134135
staticTransactionIdMtmGetOldestXmin(Relationrel,boolignoreVacuum);
@@ -684,6 +685,9 @@ MtmXactCallback(XactEvent event, void *arg)
684685
caseXACT_EVENT_ABORT_PREPARED:
685686
MtmAbortPreparedTransaction(&MtmTx);
686687
break;
688+
caseXACT_EVENT_COMMIT_PREPARED:
689+
MtmCommitPreparedTransaction(&MtmTx);
690+
break;
687691
caseXACT_EVENT_COMMIT:
688692
MtmEndTransaction(&MtmTx, true);
689693
break;
@@ -792,6 +796,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
792796
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
793797
ts->snapshot=x->snapshot;
794798
ts->isLocal= true;
799+
ts->isTwoPhase=x->isTwoPhase;
795800
if (!found) {
796801
ts->isEnqueued= false;
797802
ts->isActive= false;
@@ -969,6 +974,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
969974
x->status=ts->status;
970975
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
971976
MtmUnlock();
977+
if (x->isTwoPhase) {
978+
MtmResetTransaction();
979+
}
972980
}
973981
//if (x->gid[0]) MTM_LOG1("Prepared transaction %d (%s) csn=%ld at %ld: %d", x->xid, x->gid, ts->csn, MtmGetCurrentTime(), ts->status);
974982
if (Mtm->inject2PCError==3) {
@@ -979,6 +987,74 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
979987
MTM_TXTRACE(x,"PostPrepareTransaction Finish");
980988
}
981989

990+
staticvoid
991+
MtmCommitPreparedTransaction(MtmCurrentTrans*x)
992+
{
993+
MtmTransMap*tm;
994+
MtmTransState*ts;
995+
996+
if (Mtm->status==MTM_RECOVERY||x->isReplicated||x->isPrepared) {/* Ignore auto-2PC originated by multimaster */
997+
return;
998+
}
999+
MtmLock(LW_EXCLUSIVE);
1000+
tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_FIND,NULL);
1001+
if (tm==NULL) {
1002+
elog(WARNING,"Global transaciton ID '%s' is not found",x->gid);
1003+
}else {
1004+
time_ttransTimeout=MSEC_TO_USEC(Mtm2PCMinTimeout);
1005+
intnConfigChanges=Mtm->nConfigChanges;
1006+
timestamp_tstart=MtmGetSystemTime();
1007+
intresult=0;
1008+
1009+
Assert(tm->state!=NULL);
1010+
MTM_LOG1("Commit prepared transaction %d with gid='%s'",x->xid,x->gid);
1011+
ts=tm->state;
1012+
1013+
Assert(MtmIsCoordinator(ts));
1014+
1015+
ts->votingCompleted= false;
1016+
ts->nVotes=1;/* I voted myself */
1017+
ts->procno=MyProc->pgprocno;
1018+
MTM_TXTRACE(ts,"Coordinator sends MSG_PREPARE");
1019+
MtmSend2PCMessage(ts,MSG_PREPARE);
1020+
1021+
/* Wait votes from all nodes until: */
1022+
while (!ts->votingCompleted/* all nodes voted */
1023+
&&nConfigChanges==Mtm->nConfigChanges/* configarion is changed */
1024+
&&Mtm->status==MTM_ONLINE/* node is not online */
1025+
&&ts->status!=TRANSACTION_STATUS_ABORTED/* transaction is aborted */
1026+
&&start+transTimeout >=MtmGetSystemTime())/* timeout is expired */
1027+
{
1028+
MtmUnlock();
1029+
MTM_TXTRACE(x,"CommitPreparedTransaction WaitLatch Start");
1030+
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,MtmHeartbeatRecvTimeout);
1031+
MTM_TXTRACE(x,"CommitPreparedTransaction WaitLatch Finish");
1032+
/* Emergency bailout if postmaster has died */
1033+
if (result&WL_POSTMASTER_DEATH) {
1034+
proc_exit(1);
1035+
}
1036+
if (result&WL_LATCH_SET) {
1037+
MTM_LOG3("Latch signaled at %ld",MtmGetSystemTime());
1038+
ResetLatch(&MyProc->procLatch);
1039+
}
1040+
MtmLock(LW_EXCLUSIVE);
1041+
}
1042+
if (ts->status!=TRANSACTION_STATUS_ABORTED&& (!ts->votingCompleted||nConfigChanges!=Mtm->nConfigChanges)) {
1043+
if (nConfigChanges!=Mtm->nConfigChanges) {
1044+
elog(WARNING,"Transaction %d (%s) is aborted because cluster configuration is changed during commit",x->xid,x->gid);
1045+
}else {
1046+
elog(WARNING,"Transaction %d (%s) is aborted because of %d msec timeout expiration, prepare time %d msec",
1047+
x->xid,x->gid, (int)USEC_TO_MSEC(transTimeout), (int)USEC_TO_MSEC(ts->csn-x->snapshot));
1048+
}
1049+
MtmAbortTransaction(ts);
1050+
}
1051+
x->status=ts->status;
1052+
x->xid=ts->xid;
1053+
x->isPrepared= true;
1054+
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
1055+
}
1056+
MtmUnlock();
1057+
}
9821058

9831059
staticvoid
9841060
MtmAbortPreparedTransaction(MtmCurrentTrans*x)
@@ -1008,9 +1084,9 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
10081084
staticvoid
10091085
MtmEndTransaction(MtmCurrentTrans*x,boolcommit)
10101086
{
1011-
MTM_LOG3("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, gid=%s -> %s",
1012-
MyProcPid,x->xid,x->isPrepared,x->isReplicated,x->isDistributed,x->gid,commit ?"commit" :"abort");
1013-
if (x->status!=TRANSACTION_STATUS_ABORTED&&x->isDistributed&& (x->isPrepared||x->isReplicated)) {
1087+
MTM_LOG1("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s",
1088+
MyProcPid,x->xid,x->isPrepared,x->isReplicated,x->isDistributed,x->isTwoPhase,x->gid,commit ?"commit" :"abort");
1089+
if (x->status!=TRANSACTION_STATUS_ABORTED&&x->isDistributed&& (x->isPrepared||x->isReplicated)&& !x->isTwoPhase) {
10141090
MtmTransState*ts=NULL;
10151091
MtmLock(LW_EXCLUSIVE);
10161092
if (x->isPrepared) {
@@ -3819,9 +3895,13 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
38193895
}
38203896
break;
38213897
caseTRANS_STMT_PREPARE:
3898+
MtmTx.isTwoPhase= true;
3899+
strcpy(MtmTx.gid,stmt->gid);
3900+
break;
3901+
/* nobreak */
38223902
caseTRANS_STMT_COMMIT_PREPARED:
38233903
caseTRANS_STMT_ROLLBACK_PREPARED:
3824-
MtmTx.isTwoPhase= true;
3904+
Assert(!MtmTx.isTwoPhase);
38253905
strcpy(MtmTx.gid,stmt->gid);
38263906
break;
38273907
default:

‎multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ typedef struct MtmTransState
213213
boolisLocal;/* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
214214
boolisEnqueued;/* Transaction is inserted in queue */
215215
boolisActive;/* Transaction is active */
216+
boolisTwoPhase;/* user level 2PC */
216217
nodemask_tparticipantsMask;/* Mask of nodes involved in transaction */
217218
nodemask_tvotedMask;/* Mask of voted nodes */
218219
TransactionIdxids[1];/* [Mtm->nAllNodes]: transaction ID at replicas */

‎pglogical_proto.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,6 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
186186
else
187187
Assert(false);
188188

189-
Assert(flags!=PGLOGICAL_COMMIT_PREPARED||txn->xid<1000||MtmTransactionRecords!=1);
190-
191189
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
192190
// if (MtmIsFilteredTxn) {
193191
// Assert(MtmTransactionRecords == 0);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp