Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3abe087

Browse files
knizhnikkelvich
authored andcommitted
Restore timeouts
1 parent7668c14 commit3abe087

File tree

1 file changed

+94
-77
lines changed

1 file changed

+94
-77
lines changed

‎multimaster.c

Lines changed: 94 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@ int MtmTransSpillThreshold;
223223
intMtmMaxNodes;
224224
intMtmHeartbeatSendTimeout;
225225
intMtmHeartbeatRecvTimeout;
226+
intMtmMin2PCTimeout;
227+
intMtmMax2PCRatio;
226228
boolMtmUseRaftable;
227229
boolMtmUseDtm;
228230
boolMtmPreserveCommitOrder;
@@ -954,6 +956,62 @@ MtmVotingCompleted(MtmTransState* ts)
954956
||ts->status==TRANSACTION_STATUS_ABORTED;/* or transaction was aborted */
955957
}
956958

959+
staticvoid
960+
Mtm2PCVoting(MtmCurrentTrans*x,MtmTransState*ts)
961+
{
962+
intresult=0;
963+
intnConfigChanges=Mtm->nConfigChanges;
964+
timestamp_telapsed,start=MtmGetSystemTime();
965+
timestamp_tdeadline=0;
966+
/* Wait votes from all nodes until: */
967+
while (!MtmVotingCompleted(ts)
968+
&& (ts->isPrepared||nConfigChanges==Mtm->nConfigChanges))
969+
{
970+
MtmUnlock();
971+
MTM_TXTRACE(x,"PostPrepareTransaction WaitLatch Start");
972+
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,MtmHeartbeatRecvTimeout);
973+
MTM_TXTRACE(x,"PostPrepareTransaction WaitLatch Finish");
974+
/* Emergency bailout if postmaster has died */
975+
if (result&WL_POSTMASTER_DEATH) {
976+
proc_exit(1);
977+
}
978+
if (result&WL_LATCH_SET) {
979+
ResetLatch(&MyProc->procLatch);
980+
}
981+
elapsed=MtmGetSystemTime()-start;
982+
MtmLock(LW_EXCLUSIVE);
983+
if (deadline==0&&ts->votedMask!=0) {
984+
deadline=Max(MSEC_TO_USEC(MtmMin2PCTimeout),elapsed*MtmMax2PCRatio/100);
985+
}else {
986+
if (ts->isPrepared) {
987+
/* reset precommit message */
988+
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
989+
}else {
990+
if (elapsed>deadline) {
991+
elog(WARNING,"Commit of distributed transaction is canceled because of %ld msec timeout expiration",USEC_TO_MSEC(elapsed));
992+
MtmAbortTransaction(ts);
993+
}
994+
}
995+
}
996+
}
997+
if (ts->status!=TRANSACTION_STATUS_ABORTED&& !ts->votingCompleted) {
998+
if (ts->isPrepared) {
999+
// GetNewTransactionId(false); /* force increment of transaction counter */
1000+
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1001+
elog(WARNING,"Commit of distributed transaction %s is suspended because node is switched to %s mode",ts->gid,MtmNodeStatusMnem[Mtm->status]);
1002+
x->isSuspended= true;
1003+
}else {
1004+
if (Mtm->status!=MTM_ONLINE) {
1005+
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1006+
}elseif (nConfigChanges!=Mtm->nConfigChanges) {
1007+
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
1008+
}
1009+
MtmAbortTransaction(ts);
1010+
}
1011+
}
1012+
x->status=ts->status;
1013+
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
1014+
}
9571015

9581016
staticvoid
9591017
MtmPostPrepareTransaction(MtmCurrentTrans*x)
@@ -987,42 +1045,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
9871045
MtmUnlock();
9881046
MtmResetTransaction();
9891047
}else {
990-
intresult=0;
991-
intnConfigChanges=Mtm->nConfigChanges;
992-
/* Wait votes from all nodes until: */
993-
while (!MtmVotingCompleted(ts)
994-
&& (ts->isPrepared||nConfigChanges==Mtm->nConfigChanges))
995-
{
996-
MtmUnlock();
997-
MTM_TXTRACE(x,"PostPrepareTransaction WaitLatch Start");
998-
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,MtmHeartbeatRecvTimeout);
999-
MTM_TXTRACE(x,"PostPrepareTransaction WaitLatch Finish");
1000-
/* Emergency bailout if postmaster has died */
1001-
if (result&WL_POSTMASTER_DEATH) {
1002-
proc_exit(1);
1003-
}
1004-
if (result&WL_LATCH_SET) {
1005-
ResetLatch(&MyProc->procLatch);
1006-
}
1007-
MtmLock(LW_EXCLUSIVE);
1008-
}
1009-
if (ts->status!=TRANSACTION_STATUS_ABORTED&& !ts->votingCompleted) {
1010-
if (ts->isPrepared) {
1011-
// GetNewTransactionId(false); /* force increment of transaction counter */
1012-
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1013-
elog(WARNING,"Commit of distributed transaction %s is suspended because node is switched to %s mode",ts->gid,MtmNodeStatusMnem[Mtm->status]);
1014-
x->isSuspended= true;
1015-
}else {
1016-
if (Mtm->status!=MTM_ONLINE) {
1017-
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1018-
}else {
1019-
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
1020-
}
1021-
MtmAbortTransaction(ts);
1022-
}
1023-
}
1024-
x->status=ts->status;
1025-
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
1048+
Mtm2PCVoting(x,ts);
10261049
MtmUnlock();
10271050
if (x->isTwoPhase) {
10281051
MtmResetTransaction();
@@ -1051,9 +1074,6 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10511074
if (tm==NULL) {
10521075
elog(WARNING,"Global transaciton ID '%s' is not found",x->gid);
10531076
}else {
1054-
intresult=0;
1055-
intnConfigChanges=Mtm->nConfigChanges;
1056-
10571077
Assert(tm->state!=NULL);
10581078
MTM_LOG3("Commit prepared transaction %d with gid='%s'",x->xid,x->gid);
10591079
ts=tm->state;
@@ -1065,44 +1085,11 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10651085
ts->procno=MyProc->pgprocno;
10661086
MTM_TXTRACE(ts,"Coordinator sends MSG_PRECOMMIT");
10671087
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
1068-
1069-
/* Wait votes from all nodes until: */
1070-
while (!MtmVotingCompleted(ts)
1071-
&& (ts->isPrepared||nConfigChanges==Mtm->nConfigChanges))
1072-
{
1073-
MtmUnlock();
1074-
MTM_TXTRACE(x,"CommitPreparedTransaction WaitLatch Start");
1075-
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,MtmHeartbeatRecvTimeout);
1076-
MTM_TXTRACE(x,"CommitPreparedTransaction WaitLatch Finish");
1077-
/* Emergency bailout if postmaster has died */
1078-
if (result&WL_POSTMASTER_DEATH) {
1079-
proc_exit(1);
1080-
}
1081-
MtmLock(LW_EXCLUSIVE);
1082-
if (result&WL_LATCH_SET) {
1083-
MTM_LOG3("Latch signaled at %ld",MtmGetSystemTime());
1084-
ResetLatch(&MyProc->procLatch);
1085-
}
1086-
}
1087-
if (ts->status!=TRANSACTION_STATUS_ABORTED&& !ts->votingCompleted) {
1088-
if (ts->isPrepared) {
1089-
// GetNewTransactionId(false); /* force increment of transaction counter */
1090-
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1091-
elog(WARNING,"Commit of distributed transaction %s is suspended because node is switched to %s mode",ts->gid,MtmNodeStatusMnem[Mtm->status]);
1092-
x->isSuspended= true;
1093-
}else {
1094-
if (Mtm->status!=MTM_ONLINE) {
1095-
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1096-
}else {
1097-
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
1098-
}
1099-
MtmAbortTransaction(ts);
1100-
}
1101-
}
1102-
x->status=ts->status;
1088+
1089+
Mtm2PCVoting(x,ts);
1090+
11031091
x->xid=ts->xid;
11041092
x->isPrepared= true;
1105-
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
11061093
}
11071094
MtmUnlock();
11081095
}
@@ -1202,7 +1189,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12021189
* Send notification only if ABORT happens during transaction processing at replicas,
12031190
* do not send notification if ABORT is received from master
12041191
*/
1205-
MTM_LOG2("%d: send ABORT notification for transaction %d to coordinator %d",MyProcPid,x->gtid.xid,x->gtid.node);
1192+
MTM_LOG1("%d: send ABORT notification for transaction %d to coordinator %d",MyProcPid,x->gtid.xid,x->gtid.node);
12061193
if (ts==NULL) {
12071194
boolfound;
12081195
Assert(TransactionIdIsValid(x->xid));
@@ -1277,7 +1264,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12771264
inti;
12781265
for (i=0;i<Mtm->nAllNodes;i++)
12791266
{
1280-
if (BIT_CHECK(ts->participantsMask& ~Mtm->disabledNodeMask,i))
1267+
if (BIT_CHECK(ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask,i))
12811268
{
12821269
Assert(TransactionIdIsValid(ts->xids[i]));
12831270
msg.node=i+1;
@@ -2645,6 +2632,36 @@ _PG_init(void)
26452632
NULL
26462633
);
26472634

2635+
DefineCustomIntVariable(
2636+
"multimaster.min_2pc_timeout",
2637+
"Minimal timeout between receiving PREPARED message from nodes participated in transaction to coordinator (milliseconds)",
2638+
NULL,
2639+
&MtmMin2PCTimeout,
2640+
2000,/* 2 seconds */
2641+
1,
2642+
INT_MAX,
2643+
PGC_BACKEND,
2644+
0,
2645+
NULL,
2646+
NULL,
2647+
NULL
2648+
);
2649+
2650+
DefineCustomIntVariable(
2651+
"multimaster.max_2pc_ratio",
2652+
"Maximal ratio (in percents) between prepare time at different nodes: if T is time of preparing transaction at some node, then transaction can be aborted if prepared responce was not received in T*MtmMax2PCRatio/100",
2653+
NULL,
2654+
&MtmMax2PCRatio,
2655+
200,/* 2 times */
2656+
1,
2657+
INT_MAX,
2658+
PGC_BACKEND,
2659+
0,
2660+
NULL,
2661+
NULL,
2662+
NULL
2663+
);
2664+
26482665
DefineCustomIntVariable(
26492666
"multimaster.queue_size",
26502667
"Multimaster queue size",

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp