@@ -289,8 +289,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
289
289
elog (ERROR ,"Arbiter failed to resolve host '%s' by name" ,host );
290
290
}
291
291
292
- Retry :
293
-
292
+ Retry :
294
293
while (1 ) {
295
294
int rc = -1 ;
296
295
@@ -384,20 +383,29 @@ static void MtmOpenConnections()
384
383
385
384
386
385
static bool MtmSendToNode (int node ,void const * buf ,int size )
387
- {
388
- while (sockets [node ]< 0 || !MtmWriteSocket (sockets [node ],buf ,size )) {
389
- elog (WARNING ,"Arbiter failed to write to node %d: %d" ,node + 1 ,errno );
390
- if (sockets [node ] >=0 ) {
386
+ {
387
+ while (true) {
388
+ if (sockets [node ] >=0 && BIT_CHECK (Mtm -> reconnectMask ,node )) {
389
+ elog (WARNING ,"Arbiter is forced to reconnect to node %d" ,node + 1 );
390
+ BIT_CLEAR (Mtm -> reconnectMask ,node );
391
391
close (sockets [node ]);
392
+ sockets [node ]= -1 ;
392
393
}
393
- sockets [node ]= MtmConnectSocket (Mtm -> nodes [node ].con .hostName ,MtmArbiterPort + node + 1 ,MtmReconnectAttempts );
394
- if (sockets [node ]< 0 ) {
395
- MtmOnNodeDisconnect (node + 1 );
396
- return false;
394
+ if (sockets [node ]< 0 || !MtmWriteSocket (sockets [node ],buf ,size )) {
395
+ if (sockets [node ] >=0 ) {
396
+ elog (WARNING ,"Arbiter failed to write to node %d: %d" ,node + 1 ,errno );
397
+ close (sockets [node ]);
398
+ }
399
+ sockets [node ]= MtmConnectSocket (Mtm -> nodes [node ].con .hostName ,MtmArbiterPort + node + 1 ,MtmReconnectAttempts );
400
+ if (sockets [node ]< 0 ) {
401
+ MtmOnNodeDisconnect (node + 1 );
402
+ return false;
403
+ }
404
+ MTM_TRACE ("Arbiter restablished connection with node %d\n" ,node + 1 );
405
+ }else {
406
+ return true;
397
407
}
398
- elog (NOTICE ,"Arbiter restablish connection with node %d" ,node + 1 );
399
408
}
400
- return true;
401
409
}
402
410
403
411
static int MtmReadFromNode (int node ,void * buf ,int buf_size )
@@ -477,10 +485,6 @@ static void MtmAcceptIncomingConnections()
477
485
478
486
sockets [MtmNodeId - 1 ]= gateway ;
479
487
MtmRegisterSocket (gateway ,MtmNodeId - 1 );
480
-
481
- for (i = 0 ;i < MtmNodes - 1 ;i ++ ) {
482
- MtmAcceptOneConnection ();
483
- }
484
488
}
485
489
486
490
@@ -693,6 +697,7 @@ static void MtmTransReceiver(Datum arg)
693
697
msg -> node ,Mtm -> disabledNodeMask ,msg -> disabledNodeMask );
694
698
ts -> status = TRANSACTION_STATUS_ABORTED ;
695
699
MtmAdjustSubtransactions (ts );
700
+ Mtm -> nActiveTransactions -= 1 ;
696
701
}
697
702
698
703
if (++ ts -> nVotes == Mtm -> nNodes ) {
@@ -712,6 +717,7 @@ static void MtmTransReceiver(Datum arg)
712
717
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
713
718
ts -> status = TRANSACTION_STATUS_ABORTED ;
714
719
MtmAdjustSubtransactions (ts );
720
+ Mtm -> nActiveTransactions -= 1 ;
715
721
}
716
722
if (++ ts -> nVotes == Mtm -> nNodes ) {
717
723
MtmWakeUpBackend (ts );