@@ -223,6 +223,8 @@ int MtmTransSpillThreshold;
223223int MtmMaxNodes ;
224224int MtmHeartbeatSendTimeout ;
225225int MtmHeartbeatRecvTimeout ;
226+ int MtmMin2PCTimeout ;
227+ int MtmMax2PCRatio ;
226228bool MtmUseRaftable ;
227229bool MtmUseDtm ;
228230bool MtmPreserveCommitOrder ;
@@ -954,6 +956,62 @@ MtmVotingCompleted(MtmTransState* ts)
954956|| ts -> status == TRANSACTION_STATUS_ABORTED ;/* or transaction was aborted */
955957}
956958
959+ static void
960+ Mtm2PCVoting (MtmCurrentTrans * x ,MtmTransState * ts )
961+ {
962+ int result = 0 ;
963+ int nConfigChanges = Mtm -> nConfigChanges ;
964+ timestamp_t elapsed ,start = MtmGetSystemTime ();
965+ timestamp_t deadline = 0 ;
966+ /* Wait votes from all nodes until: */
967+ while (!MtmVotingCompleted (ts )
968+ && (ts -> isPrepared || nConfigChanges == Mtm -> nConfigChanges ))
969+ {
970+ MtmUnlock ();
971+ MTM_TXTRACE (x ,"PostPrepareTransaction WaitLatch Start" );
972+ result = WaitLatch (& MyProc -> procLatch ,WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH ,MtmHeartbeatRecvTimeout );
973+ MTM_TXTRACE (x ,"PostPrepareTransaction WaitLatch Finish" );
974+ /* Emergency bailout if postmaster has died */
975+ if (result & WL_POSTMASTER_DEATH ) {
976+ proc_exit (1 );
977+ }
978+ if (result & WL_LATCH_SET ) {
979+ ResetLatch (& MyProc -> procLatch );
980+ }
981+ elapsed = MtmGetSystemTime ()- start ;
982+ MtmLock (LW_EXCLUSIVE );
983+ if (deadline == 0 && ts -> votedMask != 0 ) {
984+ deadline = Max (MSEC_TO_USEC (MtmMin2PCTimeout ),elapsed * MtmMax2PCRatio /100 );
985+ }else {
986+ if (ts -> isPrepared ) {
987+ /* reset precommit message */
988+ MtmSend2PCMessage (ts ,MSG_PRECOMMIT );
989+ }else {
990+ if (elapsed > deadline ) {
991+ elog (WARNING ,"Commit of distributed transaction is canceled because of %ld msec timeout expiration" ,USEC_TO_MSEC (elapsed ));
992+ MtmAbortTransaction (ts );
993+ }
994+ }
995+ }
996+ }
997+ if (ts -> status != TRANSACTION_STATUS_ABORTED && !ts -> votingCompleted ) {
998+ if (ts -> isPrepared ) {
999+ // GetNewTransactionId(false); /* force increment of transaction counter */
1000+ // elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1001+ elog (WARNING ,"Commit of distributed transaction %s is suspended because node is switched to %s mode" ,ts -> gid ,MtmNodeStatusMnem [Mtm -> status ]);
1002+ x -> isSuspended = true;
1003+ }else {
1004+ if (Mtm -> status != MTM_ONLINE ) {
1005+ elog (WARNING ,"Commit of distributed transaction is canceled because node is switched to %s mode" ,MtmNodeStatusMnem [Mtm -> status ]);
1006+ }else if (nConfigChanges != Mtm -> nConfigChanges ) {
1007+ elog (WARNING ,"Commit of distributed transaction is canceled because cluster configuration was changed" );
1008+ }
1009+ MtmAbortTransaction (ts );
1010+ }
1011+ }
1012+ x -> status = ts -> status ;
1013+ MTM_LOG3 ("%d: Result of vote: %d" ,MyProcPid ,ts -> status );
1014+ }
9571015
9581016static void
9591017MtmPostPrepareTransaction (MtmCurrentTrans * x )
@@ -987,42 +1045,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
9871045MtmUnlock ();
9881046MtmResetTransaction ();
9891047}else {
990- int result = 0 ;
991- int nConfigChanges = Mtm -> nConfigChanges ;
992- /* Wait votes from all nodes until: */
993- while (!MtmVotingCompleted (ts )
994- && (ts -> isPrepared || nConfigChanges == Mtm -> nConfigChanges ))
995- {
996- MtmUnlock ();
997- MTM_TXTRACE (x ,"PostPrepareTransaction WaitLatch Start" );
998- result = WaitLatch (& MyProc -> procLatch ,WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH ,MtmHeartbeatRecvTimeout );
999- MTM_TXTRACE (x ,"PostPrepareTransaction WaitLatch Finish" );
1000- /* Emergency bailout if postmaster has died */
1001- if (result & WL_POSTMASTER_DEATH ) {
1002- proc_exit (1 );
1003- }
1004- if (result & WL_LATCH_SET ) {
1005- ResetLatch (& MyProc -> procLatch );
1006- }
1007- MtmLock (LW_EXCLUSIVE );
1008- }
1009- if (ts -> status != TRANSACTION_STATUS_ABORTED && !ts -> votingCompleted ) {
1010- if (ts -> isPrepared ) {
1011- // GetNewTransactionId(false); /* force increment of transaction counter */
1012- // elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1013- elog (WARNING ,"Commit of distributed transaction %s is suspended because node is switched to %s mode" ,ts -> gid ,MtmNodeStatusMnem [Mtm -> status ]);
1014- x -> isSuspended = true;
1015- }else {
1016- if (Mtm -> status != MTM_ONLINE ) {
1017- elog (WARNING ,"Commit of distributed transaction is canceled because node is switched to %s mode" ,MtmNodeStatusMnem [Mtm -> status ]);
1018- }else {
1019- elog (WARNING ,"Commit of distributed transaction is canceled because cluster configuration was changed" );
1020- }
1021- MtmAbortTransaction (ts );
1022- }
1023- }
1024- x -> status = ts -> status ;
1025- MTM_LOG3 ("%d: Result of vote: %d" ,MyProcPid ,ts -> status );
1048+ Mtm2PCVoting (x ,ts );
10261049MtmUnlock ();
10271050if (x -> isTwoPhase ) {
10281051MtmResetTransaction ();
@@ -1051,9 +1074,6 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10511074if (tm == NULL ) {
10521075elog (WARNING ,"Global transaciton ID '%s' is not found" ,x -> gid );
10531076}else {
1054- int result = 0 ;
1055- int nConfigChanges = Mtm -> nConfigChanges ;
1056-
10571077Assert (tm -> state != NULL );
10581078MTM_LOG3 ("Commit prepared transaction %d with gid='%s'" ,x -> xid ,x -> gid );
10591079ts = tm -> state ;
@@ -1065,44 +1085,11 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10651085ts -> procno = MyProc -> pgprocno ;
10661086MTM_TXTRACE (ts ,"Coordinator sends MSG_PRECOMMIT" );
10671087MtmSend2PCMessage (ts ,MSG_PRECOMMIT );
1068-
1069- /* Wait votes from all nodes until: */
1070- while (!MtmVotingCompleted (ts )
1071- && (ts -> isPrepared || nConfigChanges == Mtm -> nConfigChanges ))
1072- {
1073- MtmUnlock ();
1074- MTM_TXTRACE (x ,"CommitPreparedTransaction WaitLatch Start" );
1075- result = WaitLatch (& MyProc -> procLatch ,WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH ,MtmHeartbeatRecvTimeout );
1076- MTM_TXTRACE (x ,"CommitPreparedTransaction WaitLatch Finish" );
1077- /* Emergency bailout if postmaster has died */
1078- if (result & WL_POSTMASTER_DEATH ) {
1079- proc_exit (1 );
1080- }
1081- MtmLock (LW_EXCLUSIVE );
1082- if (result & WL_LATCH_SET ) {
1083- MTM_LOG3 ("Latch signaled at %ld" ,MtmGetSystemTime ());
1084- ResetLatch (& MyProc -> procLatch );
1085- }
1086- }
1087- if (ts -> status != TRANSACTION_STATUS_ABORTED && !ts -> votingCompleted ) {
1088- if (ts -> isPrepared ) {
1089- // GetNewTransactionId(false); /* force increment of transaction counter */
1090- // elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1091- elog (WARNING ,"Commit of distributed transaction %s is suspended because node is switched to %s mode" ,ts -> gid ,MtmNodeStatusMnem [Mtm -> status ]);
1092- x -> isSuspended = true;
1093- }else {
1094- if (Mtm -> status != MTM_ONLINE ) {
1095- elog (WARNING ,"Commit of distributed transaction is canceled because node is switched to %s mode" ,MtmNodeStatusMnem [Mtm -> status ]);
1096- }else {
1097- elog (WARNING ,"Commit of distributed transaction is canceled because cluster configuration was changed" );
1098- }
1099- MtmAbortTransaction (ts );
1100- }
1101- }
1102- x -> status = ts -> status ;
1088+
1089+ Mtm2PCVoting (x ,ts );
1090+
11031091x -> xid = ts -> xid ;
11041092x -> isPrepared = true;
1105- MTM_LOG3 ("%d: Result of vote: %d" ,MyProcPid ,ts -> status );
11061093}
11071094MtmUnlock ();
11081095}
@@ -1202,7 +1189,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12021189 * Send notification only if ABORT happens during transaction processing at replicas,
12031190 * do not send notification if ABORT is received from master
12041191 */
1205- MTM_LOG2 ("%d: send ABORT notification for transaction %d to coordinator %d" ,MyProcPid ,x -> gtid .xid ,x -> gtid .node );
1192+ MTM_LOG1 ("%d: send ABORT notification for transaction %d to coordinator %d" ,MyProcPid ,x -> gtid .xid ,x -> gtid .node );
12061193if (ts == NULL ) {
12071194bool found ;
12081195Assert (TransactionIdIsValid (x -> xid ));
@@ -1277,7 +1264,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12771264int i ;
12781265for (i = 0 ;i < Mtm -> nAllNodes ;i ++ )
12791266{
1280- if (BIT_CHECK (ts -> participantsMask & ~Mtm -> disabledNodeMask ,i ))
1267+ if (BIT_CHECK (ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ ts -> votedMask ,i ))
12811268{
12821269Assert (TransactionIdIsValid (ts -> xids [i ]));
12831270msg .node = i + 1 ;
@@ -2645,6 +2632,36 @@ _PG_init(void)
26452632NULL
26462633);
26472634
2635+ DefineCustomIntVariable (
2636+ "multimaster.min_2pc_timeout" ,
2637+ "Minimal timeout between receiving PREPARED message from nodes participated in transaction to coordinator (milliseconds)" ,
2638+ NULL ,
2639+ & MtmMin2PCTimeout ,
2640+ 2000 ,/* 2 seconds */
2641+ 1 ,
2642+ INT_MAX ,
2643+ PGC_BACKEND ,
2644+ 0 ,
2645+ NULL ,
2646+ NULL ,
2647+ NULL
2648+ );
2649+
2650+ DefineCustomIntVariable (
2651+ "multimaster.max_2pc_ratio" ,
2652+ "Maximal ratio (in percents) between prepare time at different nodes: if T is time of preparing transaction at some node, then transaction can be aborted if prepared responce was not received in T*MtmMax2PCRatio/100" ,
2653+ NULL ,
2654+ & MtmMax2PCRatio ,
2655+ 200 ,/* 2 times */
2656+ 1 ,
2657+ INT_MAX ,
2658+ PGC_BACKEND ,
2659+ 0 ,
2660+ NULL ,
2661+ NULL ,
2662+ NULL
2663+ );
2664+
26482665DefineCustomIntVariable (
26492666"multimaster.queue_size" ,
26502667"Multimaster queue size" ,