@@ -247,14 +247,13 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
247
247
while (size != 0 ) {
248
248
int rc = MtmWaitSocket (sd , true,MtmHeartbeatSendTimeout );
249
249
if (rc == 1 ) {
250
- int n = send (sd ,src ,size ,0 );
251
- if (n < 0 ) {
252
- Assert (errno != EINTR );/* should not happen in non-blocking call */
250
+ while ((rc = send (sd ,src ,size ,0 ))< 0 && errno == EINTR );
251
+ if (rc < 0 ) {
253
252
busy_socket = -1 ;
254
253
return false;
255
254
}
256
- size -= n ;
257
- src += n ;
255
+ size -= rc ;
256
+ src += rc ;
258
257
}else if (rc < 0 ) {
259
258
busy_socket = -1 ;
260
259
return false;
@@ -266,15 +265,12 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
266
265
267
266
static int MtmReadSocket (int sd ,void * buf ,int buf_size )
268
267
{
269
- int rc = recv (sd ,buf ,buf_size ,0 );
268
+ int rc ;
269
+ while ((rc = recv (sd ,buf ,buf_size ,0 ))< 0 && errno == EINTR );
270
270
if (rc < 0 && errno == EAGAIN ) {
271
271
rc = MtmWaitSocket (sd , false,MtmHeartbeatSendTimeout );
272
272
if (rc == 1 ) {
273
- rc = recv (sd ,buf ,buf_size ,0 );
274
- if (rc < 0 ) {
275
- Assert (errno != EINTR );/* should not happen in non-blocking call */
276
- return -1 ;
277
- }
273
+ while ((rc = recv (sd ,buf ,buf_size ,0 ))< 0 && errno == EINTR );
278
274
}else {
279
275
return 0 ;
280
276
}
@@ -370,12 +366,13 @@ static void MtmSendHeartbeat()
370
366
for (i = 0 ;i < Mtm -> nAllNodes ;i ++ )
371
367
{
372
368
if (i + 1 != MtmNodeId && sockets [i ]!= busy_socket
373
- && ((sockets [i ] >=0 && !BIT_CHECK (Mtm -> disabledNodeMask ,i ))|| BIT_CHECK (Mtm -> reconnectMask ,i )))
369
+ && (Mtm -> status != MTM_ONLINE
370
+ || (sockets [i ] >=0 && !BIT_CHECK (Mtm -> disabledNodeMask ,i )&& !BIT_CHECK (Mtm -> reconnectMask ,i ))))
374
371
{
375
372
if (!MtmSendToNode (i ,& msg ,sizeof (msg ))) {
376
373
elog (LOG ,"Arbiter failed to send heartbeat to node %d" ,i + 1 );
377
374
}else {
378
- MTM_LOG1 ("Send heartbeat to node %d with timestamp %ld" ,i + 1 ,now );
375
+ MTM_LOG2 ("Send heartbeat to node %d with timestamp %ld" ,i + 1 ,now );
379
376
}
380
377
}
381
378
}
@@ -593,8 +590,9 @@ static void MtmAcceptOneConnection()
593
590
}else if (req .hdr .code != MSG_HANDSHAKE && req .hdr .dxid != HANDSHAKE_MAGIC ) {
594
591
elog (WARNING ,"Arbiter get unexpected handshake message %d" ,req .hdr .code );
595
592
close (fd );
596
- }else {
597
- Assert (req .hdr .node > 0 && req .hdr .node <=Mtm -> nAllNodes && req .hdr .node != MtmNodeId );
593
+ }else {
594
+ int node = req .hdr .node - 1 ;
595
+ Assert (node >=0 && node < Mtm -> nAllNodes && node + 1 != MtmNodeId );
598
596
599
597
MtmLock (LW_EXCLUSIVE );
600
598
MtmCheckResponse (& req .hdr );
@@ -606,15 +604,18 @@ static void MtmAcceptOneConnection()
606
604
resp .sxid = ShmemVariableCache -> nextXid ;
607
605
resp .csn = MtmGetCurrentTime ();
608
606
resp .node = MtmNodeId ;
609
- MtmUpdateNodeConnectionInfo (& Mtm -> nodes [req . hdr . node - 1 ].con ,req .connStr );
607
+ MtmUpdateNodeConnectionInfo (& Mtm -> nodes [node ].con ,req .connStr );
610
608
if (!MtmWriteSocket (fd ,& resp ,sizeof resp )) {
611
- elog (WARNING ,"Arbiter failed to write response for handshake message to node %d" ,resp . node );
609
+ elog (WARNING ,"Arbiter failed to write response for handshake message to node %d" ,node + 1 );
612
610
close (fd );
613
611
}else {
614
- MTM_LOG1 ("Arbiter established connection with node %d" ,req .hdr .node );
615
- MtmRegisterSocket (fd ,req .hdr .node - 1 );
616
- sockets [req .hdr .node - 1 ]= fd ;
617
- MtmOnNodeConnect (req .hdr .node );
612
+ MTM_LOG1 ("Arbiter established connection with node %d" ,node + 1 );
613
+ if (sockets [node ] >=0 ) {
614
+ MtmUnregisterSocket (sockets [node ]);
615
+ }
616
+ sockets [node ]= fd ;
617
+ MtmRegisterSocket (fd ,node );
618
+ MtmOnNodeConnect (node + 1 );
618
619
}
619
620
}
620
621
}
@@ -889,7 +890,7 @@ static void MtmTransReceiver(Datum arg)
889
890
Mtm -> nodes [msg -> node - 1 ].lastHeartbeat = MtmGetSystemTime ();
890
891
891
892
if (msg -> code == MSG_HEARTBEAT ) {
892
- MTM_LOG1 ("Receive HEARTBEAT from node %d with timestamp %ld delay %ld" ,
893
+ MTM_LOG2 ("Receive HEARTBEAT from node %d with timestamp %ld delay %ld" ,
893
894
msg -> node ,msg -> csn ,USEC_TO_MSEC (MtmGetSystemTime ()- msg -> csn ));
894
895
continue ;
895
896
}
@@ -1002,21 +1003,23 @@ static void MtmTransReceiver(Datum arg)
1002
1003
}
1003
1004
}
1004
1005
}
1005
- now = MtmGetSystemTime ();
1006
- if (now > lastHeartbeatCheck + MSEC_TO_USEC (MtmHeartbeatRecvTimeout )) {
1007
- if (!MtmWatchdog (stopPolling )) {
1008
- for (i = 0 ;i < nNodes ;i ++ ) {
1009
- if (Mtm -> nodes [i ].lastHeartbeat != 0 && sockets [i ] >=0 ) {
1010
- MTM_LOG1 ("Last hearbeat from node %d received %ld microseconds ago" ,i + 1 ,now - Mtm -> nodes [i ].lastHeartbeat );
1006
+ if (Mtm -> status != MTM_RECOVERY ) {
1007
+ now = MtmGetSystemTime ();
1008
+ if (now > lastHeartbeatCheck + MSEC_TO_USEC (MtmHeartbeatRecvTimeout )) {
1009
+ if (!MtmWatchdog (stopPolling )) {
1010
+ for (i = 0 ;i < nNodes ;i ++ ) {
1011
+ if (Mtm -> nodes [i ].lastHeartbeat != 0 && sockets [i ] >=0 ) {
1012
+ MTM_LOG1 ("Last hearbeat from node %d received %ld microseconds ago" ,i + 1 ,now - Mtm -> nodes [i ].lastHeartbeat );
1013
+ }
1011
1014
}
1015
+ MTM_LOG1 ("epoll started %ld and finished %ld microseconds ago" ,now - startPolling ,now - stopPolling );
1012
1016
}
1013
- MTM_LOG1 ("epoll started %ld and finished %ld microseconds ago" ,now - startPolling ,now - stopPolling );
1017
+ lastHeartbeatCheck = now ;
1018
+ }
1019
+ if (n == 0 && Mtm -> disabledNodeMask != 0 ) {
1020
+ /* If timeout is expired and there are disabled nodes, then recheck cluster's state */
1021
+ MtmRefreshClusterStatus (false);
1014
1022
}
1015
- lastHeartbeatCheck = now ;
1016
- }
1017
- if (n == 0 && Mtm -> disabledNodeMask != 0 ) {
1018
- /* If timeout is expired and there are disabled nodes, then recheck cluster's state */
1019
- MtmRefreshClusterStatus (false);
1020
1023
}
1021
1024
}
1022
1025
proc_exit (1 );/* force restart of this bgwroker */