73
73
typedef struct {
74
74
TransactionId xid ;/* local transaction ID */
75
75
GlobalTransactionId gtid ;/* global transaction ID assigned by coordinator of transaction */
76
+ bool isTwoPhase ;/* user level 2PC */
76
77
bool isReplicated ;/* transaction on replica */
77
78
bool isDistributed ;/* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
78
79
bool isPrepared ;/* transaction is perpared at first stage of 2PC */
@@ -719,6 +720,7 @@ MtmResetTransaction()
719
720
x -> gtid .xid = InvalidTransactionId ;
720
721
x -> isDistributed = false;
721
722
x -> isPrepared = false;
723
+ x -> isTwoPhase = false;
722
724
x -> status = TRANSACTION_STATUS_UNKNOWN ;
723
725
}
724
726
@@ -746,6 +748,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
746
748
x -> isReplicated = MtmIsLogicalReceiver ;
747
749
x -> isDistributed = MtmIsUserTransaction ();
748
750
x -> isPrepared = false;
751
+ x -> isTwoPhase = false;
749
752
x -> isTransactionBlock = IsTransactionBlock ();
750
753
/* Application name can be changed usnig PGAPPNAME environment variable */
751
754
if (x -> isDistributed && Mtm -> status != MTM_ONLINE && strcmp (application_name ,MULTIMASTER_ADMIN )!= 0 ) {
@@ -906,8 +909,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
906
909
Assert (ts != NULL );
907
910
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
908
911
if (!MtmIsCoordinator (ts )|| Mtm -> status == MTM_RECOVERY ) {
909
- bool found ;
910
- MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State ,x -> gid ,HASH_ENTER ,& found );
912
+ MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State ,x -> gid ,HASH_ENTER ,NULL );
911
913
Assert (x -> gid [0 ]);
912
914
tm -> state = ts ;
913
915
ts -> votingCompleted = true;
@@ -925,8 +927,13 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
925
927
time_t transTimeout = Max (MSEC_TO_USEC (Mtm2PCMinTimeout ), (ts -> csn - ts -> snapshot )* Mtm2PCPrepareRatio /100 );
926
928
int result = 0 ;
927
929
int nConfigChanges = Mtm -> nConfigChanges ;
928
-
929
930
timestamp_t start = MtmGetSystemTime ();
931
+
932
+ if (x -> isTwoPhase ) {
933
+ MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State ,x -> gid ,HASH_ENTER ,NULL );
934
+ tm -> state = ts ;
935
+ }
936
+
930
937
/* Wait votes from all nodes until: */
931
938
while (!ts -> votingCompleted /* all nodes voted */
932
939
&& nConfigChanges == Mtm -> nConfigChanges /* configarion is changed */
@@ -982,7 +989,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
982
989
MtmLock (LW_EXCLUSIVE );
983
990
tm = (MtmTransMap * )hash_search (MtmGid2State ,x -> gid ,HASH_REMOVE ,NULL );
984
991
if (tm == NULL ) {
985
- elog (WARNING ,"Global transaciton ID%s is not found" ,x -> gid );
992
+ elog (WARNING ,"Global transaciton ID'%s' is not found" ,x -> gid );
986
993
}else {
987
994
Assert (tm -> state != NULL );
988
995
MTM_LOG1 ("Abort prepared transaction %d with gid='%s'" ,x -> xid ,x -> gid );
@@ -1265,7 +1272,7 @@ void MtmAbortTransaction(MtmTransState* ts)
1265
1272
Assert (MtmLockCount != 0 );/* should be invoked with exclsuive lock */
1266
1273
if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1267
1274
if (ts -> status == TRANSACTION_STATUS_COMMITTED ) {
1268
- elog (WARNING ,"Attempt to rollback already committed transaction %d (%s)" ,ts -> xid ,ts -> gid );
1275
+ elog (LOG ,"Attempt to rollback already committed transaction %d (%s)" ,ts -> xid ,ts -> gid );
1269
1276
}else {
1270
1277
MTM_LOG1 ("Rollback active transaction %d:%d (local xid %d) status %d" ,ts -> gtid .node ,ts -> gtid .xid ,ts -> xid ,ts -> status );
1271
1278
ts -> status = TRANSACTION_STATUS_ABORTED ;
@@ -3798,11 +3805,10 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3798
3805
}
3799
3806
break ;
3800
3807
case TRANS_STMT_PREPARE :
3801
- elog (ERROR ,"Two phase commit is not supported by multimaster" );
3802
- break ;
3803
3808
case TRANS_STMT_COMMIT_PREPARED :
3804
3809
case TRANS_STMT_ROLLBACK_PREPARED :
3805
- skipCommand = true;
3810
+ MtmTx .isTwoPhase = true;
3811
+ strcpy (MtmTx .gid ,stmt -> gid );
3806
3812
break ;
3807
3813
default :
3808
3814
break ;
@@ -3960,8 +3966,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3960
3966
standard_ProcessUtility (parsetree ,queryString ,context ,
3961
3967
params ,dest ,completionTag );
3962
3968
}
3963
-
3964
- if (MtmTx .isDistributed && XactIsoLevel != XACT_REPEATABLE_READ && ! MtmVolksWagenMode ) {
3969
+
3970
+ if (! MtmVolksWagenMode && MtmTx .isDistributed && XactIsoLevel != XACT_REPEATABLE_READ ) {
3965
3971
elog (ERROR ,"Isolation level %s is not supported by multimaster" ,isoLevelStr [XactIsoLevel ]);
3966
3972
}
3967
3973
@@ -4147,7 +4153,7 @@ MtmDetectGlobalDeadLockFortXid(TransactionId xid)
4147
4153
}
4148
4154
MtmGetGtid (xid ,& gtid );
4149
4155
hasDeadlock = MtmGraphFindLoop (& graph ,& gtid );
4150
- elog (WARNING ,"Distributed deadlock check by backend %d for %u:%u = %d" ,MyProcPid ,gtid .node ,gtid .xid ,hasDeadlock );
4156
+ elog (LOG ,"Distributed deadlock check by backend %d for %u:%u = %d" ,MyProcPid ,gtid .node ,gtid .xid ,hasDeadlock );
4151
4157
if (!hasDeadlock ) {
4152
4158
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
4153
4159
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not