@@ -289,8 +289,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
289289elog (ERROR ,"Arbiter failed to resolve host '%s' by name" ,host );
290290}
291291
292- Retry :
293-
292+ Retry :
294293while (1 ) {
295294int rc = -1 ;
296295
@@ -384,20 +383,29 @@ static void MtmOpenConnections()
384383
385384
386385static bool MtmSendToNode (int node ,void const * buf ,int size )
387- {
388- while (sockets [node ]< 0 || !MtmWriteSocket (sockets [node ],buf ,size )) {
389- elog (WARNING ,"Arbiter failed to write to node %d: %d" ,node + 1 ,errno );
390- if (sockets [node ] >=0 ) {
386+ {
387+ while (true) {
388+ if (sockets [node ] >=0 && BIT_CHECK (Mtm -> reconnectMask ,node )) {
389+ elog (WARNING ,"Arbiter is forced to reconnect to node %d" ,node + 1 );
390+ BIT_CLEAR (Mtm -> reconnectMask ,node );
391391close (sockets [node ]);
392+ sockets [node ]= -1 ;
392393}
393- sockets [node ]= MtmConnectSocket (Mtm -> nodes [node ].con .hostName ,MtmArbiterPort + node + 1 ,MtmReconnectAttempts );
394- if (sockets [node ]< 0 ) {
395- MtmOnNodeDisconnect (node + 1 );
396- return false;
394+ if (sockets [node ]< 0 || !MtmWriteSocket (sockets [node ],buf ,size )) {
395+ if (sockets [node ] >=0 ) {
396+ elog (WARNING ,"Arbiter failed to write to node %d: %d" ,node + 1 ,errno );
397+ close (sockets [node ]);
398+ }
399+ sockets [node ]= MtmConnectSocket (Mtm -> nodes [node ].con .hostName ,MtmArbiterPort + node + 1 ,MtmReconnectAttempts );
400+ if (sockets [node ]< 0 ) {
401+ MtmOnNodeDisconnect (node + 1 );
402+ return false;
403+ }
404+ MTM_TRACE ("Arbiter restablished connection with node %d\n" ,node + 1 );
405+ }else {
406+ return true;
397407}
398- elog (NOTICE ,"Arbiter restablish connection with node %d" ,node + 1 );
399408}
400- return true;
401409}
402410
403411static int MtmReadFromNode (int node ,void * buf ,int buf_size )
@@ -477,10 +485,6 @@ static void MtmAcceptIncomingConnections()
477485
478486sockets [MtmNodeId - 1 ]= gateway ;
479487MtmRegisterSocket (gateway ,MtmNodeId - 1 );
480-
481- for (i = 0 ;i < MtmNodes - 1 ;i ++ ) {
482- MtmAcceptOneConnection ();
483- }
484488}
485489
486490
@@ -693,6 +697,7 @@ static void MtmTransReceiver(Datum arg)
693697msg -> node ,Mtm -> disabledNodeMask ,msg -> disabledNodeMask );
694698ts -> status = TRANSACTION_STATUS_ABORTED ;
695699MtmAdjustSubtransactions (ts );
700+ Mtm -> nActiveTransactions -= 1 ;
696701}
697702
698703if (++ ts -> nVotes == Mtm -> nNodes ) {
@@ -712,6 +717,7 @@ static void MtmTransReceiver(Datum arg)
712717Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
713718ts -> status = TRANSACTION_STATUS_ABORTED ;
714719MtmAdjustSubtransactions (ts );
720+ Mtm -> nActiveTransactions -= 1 ;
715721}
716722if (++ ts -> nVotes == Mtm -> nNodes ) {
717723MtmWakeUpBackend (ts );