@@ -243,6 +243,7 @@ void MtmLock(LWLockMode mode)
243
243
#else
244
244
LWLockAcquire ((LWLockId )& Mtm -> locks [MTM_STATE_LOCK_ID ],mode );
245
245
#endif
246
+ Mtm -> lastLockHolder = MyProcPid ;
246
247
}
247
248
248
249
void MtmUnlock (void )
@@ -252,6 +253,7 @@ void MtmUnlock(void)
252
253
#else
253
254
LWLockRelease ((LWLockId )& Mtm -> locks [MTM_STATE_LOCK_ID ]);
254
255
#endif
256
+ Mtm -> lastLockHolder = 0 ;
255
257
}
256
258
257
259
void MtmLockNode (int nodeId )
@@ -550,16 +552,20 @@ MtmAdjustOldestXid(TransactionId xid)
550
552
551
553
static void MtmTransactionListAppend (MtmTransState * ts )
552
554
{
553
- ts -> next = NULL ;
554
- ts -> nSubxids = 0 ;
555
- * Mtm -> transListTail = ts ;
556
- Mtm -> transListTail = & ts -> next ;
555
+ if (!ts -> isEnqueued ) {
556
+ ts -> isEnqueued = true;
557
+ ts -> next = NULL ;
558
+ ts -> nSubxids = 0 ;
559
+ * Mtm -> transListTail = ts ;
560
+ Mtm -> transListTail = & ts -> next ;
561
+ }
557
562
}
558
563
559
564
static void MtmTransactionListInsertAfter (MtmTransState * after ,MtmTransState * ts )
560
565
{
561
566
ts -> next = after -> next ;
562
567
after -> next = ts ;
568
+ ts -> isEnqueued = true;
563
569
if (Mtm -> transListTail == & after -> next ) {
564
570
Mtm -> transListTail = & ts -> next ;
565
571
}
@@ -700,6 +706,9 @@ MtmCreateTransState(MtmCurrentTrans* x)
700
706
ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
701
707
ts -> snapshot = x -> snapshot ;
702
708
ts -> isLocal = true;
709
+ if (!found ) {
710
+ ts -> isEnqueued = false;
711
+ }
703
712
if (TransactionIdIsValid (x -> gtid .xid )) {
704
713
Assert (x -> gtid .node != MtmNodeId );
705
714
ts -> gtid = x -> gtid ;
@@ -833,6 +842,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
833
842
Assert (x -> gid [0 ]);
834
843
tm -> state = ts ;
835
844
ts -> votingCompleted = true;
845
+ if (!found ) {
846
+ ts -> isEnqueued = false;
847
+ }
836
848
if (Mtm -> status != MTM_RECOVERY ) {
837
849
MtmSendNotificationMessage (ts ,MSG_READY );/* send notification to coordinator */
838
850
if (!MtmUseDtm ) {
@@ -945,8 +957,12 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
945
957
*/
946
958
MTM_LOG1 ("%d: send ABORT notification abort transaction %d to coordinator %d" ,MyProcPid ,x -> gtid .xid ,x -> gtid .node );
947
959
if (ts == NULL ) {
960
+ bool found ;
948
961
Assert (TransactionIdIsValid (x -> xid ));
949
- ts = hash_search (MtmXid2State ,& x -> xid ,HASH_ENTER ,NULL );
962
+ ts = hash_search (MtmXid2State ,& x -> xid ,HASH_ENTER ,& found );
963
+ if (!found ) {
964
+ ts -> isEnqueued = false;
965
+ }
950
966
ts -> status = TRANSACTION_STATUS_ABORTED ;
951
967
ts -> isLocal = true;
952
968
ts -> snapshot = x -> snapshot ;
@@ -1364,7 +1380,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
1364
1380
*/
1365
1381
bool MtmRefreshClusterStatus (bool nowait )
1366
1382
{
1367
- nodemask_t mask ,clique ,disabled , enabled ;
1383
+ nodemask_t mask ,clique ,disabled ;
1368
1384
nodemask_t matrix [MAX_NODES ];
1369
1385
MtmTransState * ts ;
1370
1386
int clique_size ;
@@ -1391,28 +1407,29 @@ bool MtmRefreshClusterStatus(bool nowait)
1391
1407
MTM_LOG1 ("Find clique %lx, disabledNodeMask %lx" , (long )clique , (long )Mtm -> disabledNodeMask );
1392
1408
MtmLock (LW_EXCLUSIVE );
1393
1409
disabled = ~clique & (((nodemask_t )1 <<Mtm -> nAllNodes )- 1 )& ~Mtm -> disabledNodeMask ;/* new disabled nodes mask */
1394
- enabled = clique & Mtm -> disabledNodeMask ;/* new enabled nodes mask */
1395
1410
1396
1411
for (i = 0 ,mask = disabled ;mask != 0 ;i ++ ,mask >>=1 ) {
1397
1412
if (mask & 1 ) {
1398
1413
MtmDisableNode (i + 1 );
1399
1414
}
1400
- }
1401
-
1415
+ }
1416
+ #if 0 /* Do not enable nodes here: them will be enabled after completion of recovery */
1417
+ enabled = clique & Mtm -> disabledNodeMask ;/* new enabled nodes mask */
1402
1418
for (i = 0 ,mask = enabled ;mask != 0 ;i ++ ,mask >>=1 ) {
1403
1419
if (mask & 1 ) {
1404
1420
MtmEnableNode (i + 1 );
1405
1421
}
1406
1422
}
1407
- if (disabled |enabled ) {
1423
+ #endif
1424
+ if (disabled ) {
1408
1425
MtmCheckQuorum ();
1409
1426
}
1410
1427
/* Interrupt voting for active transaction and abort them */
1411
1428
for (ts = Mtm -> transListHead ;ts != NULL ;ts = ts -> next ) {
1412
1429
MTM_LOG3 ("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d" ,
1413
1430
ts -> gid ,ts -> gtid .node ,ts -> xid ,ts -> status ,ts -> gtid .xid );
1414
1431
if (MtmIsCoordinator (ts )) {
1415
- if (!ts -> votingCompleted && ( disabled | enabled ) != 0 && ts -> status != TRANSACTION_STATUS_ABORTED ) {
1432
+ if (!ts -> votingCompleted && disabled != 0 && ts -> status != TRANSACTION_STATUS_ABORTED ) {
1416
1433
MtmAbortTransaction (ts );
1417
1434
MtmWakeUpBackend (ts );
1418
1435
}
@@ -2222,6 +2239,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
2222
2239
{
2223
2240
if (nodeId <=0 || nodeId > Mtm -> nLiveNodes )
2224
2241
{
2242
+ MtmUnlock ();
2225
2243
elog (ERROR ,"NodeID %d is out of range [1,%d]" ,nodeId ,Mtm -> nLiveNodes );
2226
2244
}
2227
2245
MtmDisableNode (nodeId );
@@ -2287,6 +2305,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
2287
2305
MtmEnableNode (MtmReplicationNodeId );
2288
2306
MtmCheckQuorum ();
2289
2307
}else {
2308
+ MtmUnlock ();
2290
2309
elog (ERROR ,"Disabled node %d tries to reconnect without recovery" ,MtmReplicationNodeId );
2291
2310
}
2292
2311
}else {