@@ -416,7 +416,7 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
416
416
417
417
MtmLock (LW_SHARED );
418
418
if (Mtm -> status == MTM_ONLINE ) {
419
- MtmTransState * ts = hash_search (MtmXid2State ,& xid ,HASH_FIND ,NULL );
419
+ MtmTransState * ts = ( MtmTransState * ) hash_search (MtmXid2State ,& xid ,HASH_FIND ,NULL );
420
420
if (ts != NULL && !ts -> isLocal ) {
421
421
snapshot = ts -> snapshot ;
422
422
Assert (ts -> gtid .node == MtmNodeId || MtmIsRecoverySession );
@@ -806,7 +806,7 @@ static MtmTransState*
806
806
MtmCreateTransState (MtmCurrentTrans * x )
807
807
{
808
808
bool found ;
809
- MtmTransState * ts = hash_search (MtmXid2State ,& x -> xid ,HASH_ENTER ,& found );
809
+ MtmTransState * ts = ( MtmTransState * ) hash_search (MtmXid2State ,& x -> xid ,HASH_ENTER ,& found );
810
810
ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
811
811
ts -> snapshot = x -> snapshot ;
812
812
ts -> isLocal = true;
@@ -859,6 +859,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
859
859
/* Do not take in account bg-workers which are performing recovery */
860
860
elog (ERROR ,"Abort current transaction because this cluster node is in %s status" ,MtmNodeStatusMnem [Mtm -> status ]);
861
861
}
862
+ if (TransactionIdIsValid (x -> gtid .xid )&& BIT_CHECK (Mtm -> disabledNodeMask ,x -> gtid .node - 1 )) {
863
+ /* Coordinator of transaction is disabled: just abort transaction without any further steps */
864
+ elog (ERROR ,"Abort transaction %d because it's coordinator %d was disabled" ,x -> xid ,x -> gtid .node );
865
+ }
862
866
863
867
MtmLock (LW_EXCLUSIVE );
864
868
@@ -920,6 +924,32 @@ bool MtmWatchdog(timestamp_t now)
920
924
return allAlive ;
921
925
}
922
926
927
+ /*
928
+ * Mark transaction as precommitted
929
+ */
930
+ void MtmPrecommitTransaction (char const * gid )
931
+ {
932
+ MtmLock (LW_EXCLUSIVE );
933
+ {
934
+ MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State ,gid ,HASH_FIND ,NULL );
935
+ if (tm == NULL ) {
936
+ elog (WARNING ,"MtmPrecommitTransaction: transaciton '%s' is not found" ,gid );
937
+ }else {
938
+ MtmTransState * ts = tm -> state ;
939
+ Assert (ts != NULL );
940
+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
941
+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
942
+ ts -> csn = MtmAssignCSN ();
943
+ MtmAdjustSubtransactions (ts );
944
+ MtmSend2PCMessage (ts ,MSG_PRECOMMITTED );
945
+ }
946
+ }
947
+ MtmUnlock ();
948
+ }
949
+
950
+
951
+
952
+
923
953
924
954
static bool
925
955
MtmVotingCompleted (MtmTransState * ts )
@@ -944,7 +974,8 @@ MtmVotingCompleted(MtmTransState* ts)
944
974
return true;
945
975
}else if (MtmUseDtm ) {
946
976
ts -> votedMask = 0 ;
947
- MtmSend2PCMessage (ts ,MSG_PRECOMMIT );
977
+ SetPrepareTransactionState (ts -> gid ,"precommitted" );
978
+ //MtmSend2PCMessage(ts, MSG_PRECOMMIT);
948
979
return false;
949
980
}else {
950
981
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
@@ -964,7 +995,8 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
964
995
int nConfigChanges = Mtm -> nConfigChanges ;
965
996
timestamp_t prepareTime = ts -> csn - ts -> snapshot ;
966
997
timestamp_t timeout = Max (prepareTime + MSEC_TO_USEC (MtmMin2PCTimeout ),prepareTime * MtmMax2PCRatio /100 );
967
- timestamp_t deadline = MtmGetSystemTime ()+ timeout ;
998
+ timestamp_t start = MtmGetSystemTime ();
999
+ timestamp_t deadline = start + timeout ;
968
1000
timestamp_t now ;
969
1001
970
1002
Assert (ts -> csn > ts -> snapshot );
@@ -989,7 +1021,8 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
989
1021
if (now > deadline ) {
990
1022
if (ts -> isPrepared ) {
991
1023
/* resend precommit message */
992
- MtmSend2PCMessage (ts ,MSG_PRECOMMIT );
1024
+ // MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1025
+ elog (LOG ,"Distributes transaction is not committed in %ld msec" ,USEC_TO_MSEC (now - start ));
993
1026
}else {
994
1027
elog (WARNING ,"Commit of distributed transaction is canceled because of %ld msec timeout expiration" ,USEC_TO_MSEC (timeout ));
995
1028
MtmAbortTransaction (ts );
@@ -1032,15 +1065,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
1032
1065
elog (ERROR ,"ERROR INJECTION for transaction %d (%s)" ,x -> xid ,x -> gid );
1033
1066
}
1034
1067
MtmLock (LW_EXCLUSIVE );
1035
- ts = hash_search (MtmXid2State ,& x -> xid ,HASH_FIND ,NULL );
1068
+ ts = ( MtmTransState * ) hash_search (MtmXid2State ,& x -> xid ,HASH_FIND ,NULL );
1036
1069
Assert (ts != NULL );
1037
1070
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
1038
1071
if (!MtmIsCoordinator (ts )|| Mtm -> status == MTM_RECOVERY ) {
1039
1072
MTM_TXTRACE (x ,"recovery?" );
1040
1073
Assert (x -> gid [0 ]);
1041
1074
ts -> votingCompleted = true;
1042
1075
MTM_TXTRACE (x ,"recovery? 1" );
1043
- if (Mtm -> status != MTM_RECOVERY || Mtm -> recoverySlot != MtmReplicationNodeId ) {
1076
+ if (Mtm -> status != MTM_RECOVERY /* || Mtm->recoverySlot != MtmReplicationNodeId*/ ) {
1044
1077
MTM_TXTRACE (x ,"recovery? 2" );
1045
1078
MtmSend2PCMessage (ts ,MSG_PREPARED );/* send notification to coordinator */
1046
1079
if (!MtmUseDtm ) {
@@ -1097,7 +1130,8 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
1097
1130
ts -> votedMask = 0 ;
1098
1131
ts -> procno = MyProc -> pgprocno ;
1099
1132
MTM_TXTRACE (ts ,"Coordinator sends MSG_PRECOMMIT" );
1100
- MtmSend2PCMessage (ts ,MSG_PRECOMMIT );
1133
+ SetPrepareTransactionState (ts -> gid ,"precommitted" );
1134
+ //MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1101
1135
1102
1136
Mtm2PCVoting (x ,ts );
1103
1137
@@ -1154,7 +1188,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
1154
1188
MtmTransState * ts = NULL ;
1155
1189
MtmLock (LW_EXCLUSIVE );
1156
1190
if (x -> isPrepared ) {
1157
- ts = hash_search (MtmXid2State ,& x -> xid ,HASH_FIND ,NULL );
1191
+ ts = ( MtmTransState * ) hash_search (MtmXid2State ,& x -> xid ,HASH_FIND ,NULL );
1158
1192
Assert (ts != NULL );
1159
1193
Assert (strcmp (x -> gid ,ts -> gid )== 0 );
1160
1194
}else if (x -> gid [0 ]) {
@@ -1206,7 +1240,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
1206
1240
if (ts == NULL ) {
1207
1241
bool found ;
1208
1242
Assert (TransactionIdIsValid (x -> xid ));
1209
- ts = hash_search (MtmXid2State ,& x -> xid ,HASH_ENTER ,& found );
1243
+ ts = ( MtmTransState * ) hash_search (MtmXid2State ,& x -> xid ,HASH_ENTER ,& found );
1210
1244
if (!found ) {
1211
1245
ts -> isEnqueued = false;
1212
1246
ts -> isActive = false;
@@ -1316,6 +1350,53 @@ static void MtmBroadcastPollMessage(MtmTransState* ts)
1316
1350
}
1317
1351
}
1318
1352
1353
+ /*
1354
+ * Restore state of recovered prepared transaction in memory
1355
+ */
1356
+ static void MtmLoadPreparedTransactions (void )
1357
+ {
1358
+ PreparedTransaction pxacts ;
1359
+ int n = GetPreparedTransactions (& pxacts );
1360
+ int i ;
1361
+
1362
+ for (i = 0 ;i < n ;i ++ ) {
1363
+ bool found ;
1364
+ char const * gid = pxacts [i ].gid ;
1365
+ MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State ,gid ,HASH_ENTER ,& found );
1366
+ if (!found ) {
1367
+ TransactionId xid = GetNewTransactionId (false);
1368
+ MtmTransState * ts = (MtmTransState * )hash_search (MtmXid2State ,& xid ,HASH_ENTER ,& found );
1369
+ MTM_LOG1 ("Recover prepared transaction %s xid %d" ,gid ,xid );
1370
+ MyPgXact -> xid = InvalidTransactionId ;/* dirty hack:((( */
1371
+ Assert (!found );
1372
+ Mtm -> nActiveTransactions += 1 ;
1373
+ ts -> isEnqueued = false;
1374
+ ts -> isActive = true;
1375
+ ts -> status = strcmp (pxacts [i ].state_3pc ,"precommitted" )== 0 ?TRANSACTION_STATUS_UNKNOWN :TRANSACTION_STATUS_IN_PROGRESS ;
1376
+ ts -> isLocal = true;
1377
+ ts -> isPrepared = false;
1378
+ ts -> isPinned = false;
1379
+ ts -> snapshot = INVALID_CSN ;
1380
+ ts -> isTwoPhase = false;
1381
+ ts -> csn = 0 ;/* should be replaced with real CSN by poll result */
1382
+ ts -> gtid .node = MtmNodeId ;
1383
+ ts -> gtid .xid = xid ;
1384
+ ts -> nSubxids = 0 ;
1385
+ ts -> votingCompleted = true;
1386
+ ts -> participantsMask = (((nodemask_t )1 <<Mtm -> nAllNodes )- 1 )& ~Mtm -> disabledNodeMask & ~((nodemask_t )1 << (MtmNodeId - 1 ));
1387
+ ts -> votedMask = 0 ;
1388
+ strcpy (ts -> gid ,gid );
1389
+ MtmTransactionListAppend (ts );
1390
+ tm -> status = ts -> status ;
1391
+ tm -> state = ts ;
1392
+ MtmBroadcastPollMessage (ts );
1393
+ }
1394
+ }
1395
+ MTM_LOG1 ("Recover %d prepared transactions" ,n );
1396
+ if (pxacts ) {
1397
+ pfree (pxacts );
1398
+ }
1399
+ }
1319
1400
1320
1401
static void MtmStartRecovery ()
1321
1402
{
@@ -2051,6 +2132,7 @@ static void MtmCheckControlFile(void)
2051
2132
}
2052
2133
}
2053
2134
2135
+
2054
2136
static void MtmInitialize ()
2055
2137
{
2056
2138
bool found ;
@@ -2087,6 +2169,7 @@ static void MtmInitialize()
2087
2169
Mtm -> nConfigChanges = 0 ;
2088
2170
Mtm -> recoveryCount = 0 ;
2089
2171
Mtm -> localTablesHashLoaded = false;
2172
+ Mtm -> preparedTransactionsLoaded = false;
2090
2173
Mtm -> inject2PCError = 0 ;
2091
2174
Mtm -> sendQueue = NULL ;
2092
2175
Mtm -> freeQueue = NULL ;
@@ -2851,6 +2934,13 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
2851
2934
MtmReplicationMode mode = REPLMODE_OPEN_EXISTED ;
2852
2935
2853
2936
MtmLock (LW_EXCLUSIVE );
2937
+
2938
+ if (!Mtm -> preparedTransactionsLoaded )
2939
+ {
2940
+ MtmLoadPreparedTransactions ();
2941
+ Mtm -> preparedTransactionsLoaded = true;
2942
+ }
2943
+
2854
2944
while ((Mtm -> status != MTM_CONNECTED && Mtm -> status != MTM_ONLINE )|| BIT_CHECK (Mtm -> disabledNodeMask ,nodeId - 1 ))
2855
2945
{
2856
2946
if (* shutdown )
@@ -3330,7 +3420,7 @@ mtm_get_csn(PG_FUNCTION_ARGS)
3330
3420
csn_t csn = INVALID_CSN ;
3331
3421
3332
3422
MtmLock (LW_SHARED );
3333
- ts = hash_search (MtmXid2State ,& xid ,HASH_FIND ,NULL );
3423
+ ts = ( MtmTransState * ) hash_search (MtmXid2State ,& xid ,HASH_FIND ,NULL );
3334
3424
if (ts != NULL ) {
3335
3425
csn = ts -> csn ;
3336
3426
}
@@ -3859,14 +3949,12 @@ static inline void MtmGucUpdate(const char *key, char *value)
3859
3949
MtmGucEntry * hentry ;
3860
3950
bool found ;
3861
3951
3862
- hentry = hash_search (MtmGucHash ,key ,HASH_FIND ,& found );
3952
+ hentry = ( MtmGucEntry * ) hash_search (MtmGucHash ,key ,HASH_ENTER ,& found );
3863
3953
if (found )
3864
3954
{
3865
3955
pfree (hentry -> value );
3866
3956
dlist_delete (& hentry -> list_node );
3867
3957
}
3868
-
3869
- hentry = hash_search (MtmGucHash ,key ,HASH_ENTER ,NULL );
3870
3958
hentry -> value = value ;
3871
3959
dlist_push_tail (& MtmGucList ,& hentry -> list_node );
3872
3960
}
@@ -3876,7 +3964,7 @@ static inline void MtmGucRemove(const char *key)
3876
3964
MtmGucEntry * hentry ;
3877
3965
bool found ;
3878
3966
3879
- hentry = hash_search (MtmGucHash ,key ,HASH_FIND ,& found );
3967
+ hentry = ( MtmGucEntry * ) hash_search (MtmGucHash ,key ,HASH_FIND ,& found );
3880
3968
if (found )
3881
3969
{
3882
3970
pfree (hentry -> value );
@@ -4465,7 +4553,7 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
4465
4553
}
4466
4554
4467
4555
static bool
4468
- MtmDetectGlobalDeadLockFortXid (TransactionId xid )
4556
+ MtmDetectGlobalDeadLockForXid (TransactionId xid )
4469
4557
{
4470
4558
bool hasDeadlock = false;
4471
4559
if (TransactionIdIsValid (xid )) {
@@ -4530,11 +4618,11 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
4530
4618
4531
4619
MTM_LOG1 ("Detect global deadlock for %d by backend %d" ,pgxact -> xid ,MyProcPid );
4532
4620
4533
- return MtmDetectGlobalDeadLockFortXid (pgxact -> xid );
4621
+ return MtmDetectGlobalDeadLockForXid (pgxact -> xid );
4534
4622
}
4535
4623
4536
4624
Datum mtm_check_deadlock (PG_FUNCTION_ARGS )
4537
4625
{
4538
4626
TransactionId xid = PG_GETARG_INT32 (0 );
4539
- PG_RETURN_BOOL (MtmDetectGlobalDeadLockFortXid (xid ));
4627
+ PG_RETURN_BOOL (MtmDetectGlobalDeadLockForXid (xid ));
4540
4628
}