@@ -195,7 +195,7 @@ static void MtmRegisterSocket(int fd, int node)
195
195
ev .events = EPOLLIN ;
196
196
ev .data .u32 = node ;
197
197
if (epoll_ctl (epollfd ,EPOLL_CTL_ADD ,fd ,& ev )< 0 ) {
198
- elog (ERROR ,"Arbiter failed to add socket to epoll set: %d" ,errno );
198
+ elog (LOG ,"Arbiter failed to add socket to epoll set: %d" ,errno );
199
199
}
200
200
#else
201
201
FD_SET (fd ,& inset );
@@ -209,7 +209,7 @@ static void MtmUnregisterSocket(int fd)
209
209
{
210
210
#if USE_EPOLL
211
211
if (epoll_ctl (epollfd ,EPOLL_CTL_DEL ,fd ,NULL )< 0 ) {
212
- elog (ERROR ,"Arbiter failed to unregister socket from epoll set: %d" ,errno );
212
+ elog (LOG ,"Arbiter failed to unregister socket from epoll set: %d" ,errno );
213
213
}
214
214
#else
215
215
FD_CLR (fd ,& inset );
@@ -266,12 +266,17 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
266
266
267
267
static int MtmReadSocket (int sd ,void * buf ,int buf_size )
268
268
{
269
- int rc = recv (sd ,buf ,buf_size ,0 );
270
- if (rc <=0 ) {
271
- Assert (errno != EINTR );/* should not happen in non-blocking call */
272
- return -1 ;
269
+ int rc = MtmWaitSocket (sd , false,MtmHeartbeatSendTimeout );
270
+ if (rc == 1 ) {
271
+ int rc = recv (sd ,buf ,buf_size ,0 );
272
+ if (rc <=0 ) {
273
+ Assert (errno != EINTR );/* should not happen in non-blocking call */
274
+ return -1 ;
275
+ }
276
+ return rc ;
277
+ }else {
278
+ return 0 ;
273
279
}
274
- return rc ;
275
280
}
276
281
277
282
@@ -325,7 +330,16 @@ static void MtmSetSocketOptions(int sd)
325
330
#endif
326
331
}
327
332
328
-
333
+ static void MtmCheckResponse (MtmArbiterMessage * resp )
334
+ {
335
+ if (BIT_CHECK (resp -> disabledNodeMask ,MtmNodeId - 1 )&& !BIT_CHECK (Mtm -> disabledNodeMask ,resp -> node - 1 )) {
336
+ elog (WARNING ,"Node %d thinks that I was dead, while I am %s" ,resp -> node ,MtmNodeStatusMnem [Mtm -> status ]);
337
+ if (Mtm -> status != MTM_RECOVERY ) {
338
+ BIT_SET (Mtm -> disabledNodeMask ,MtmNodeId - 1 );
339
+ MtmSwitchClusterMode (MTM_RECOVERY );
340
+ }
341
+ }
342
+ }
329
343
330
344
static void MtmScheduleHeartbeat ()
331
345
{
@@ -347,7 +361,8 @@ static void MtmSendHeartbeat()
347
361
348
362
for (i = 0 ;i < Mtm -> nAllNodes ;i ++ )
349
363
{
350
- if (sockets [i ] >=0 && sockets [i ]!= busy_socket && !BIT_CHECK (Mtm -> disabledNodeMask |Mtm -> reconnectMask ,i ))
364
+ if (i + 1 != MtmNodeId && sockets [i ]!= busy_socket
365
+ && ((sockets [i ] >=0 && !BIT_CHECK (Mtm -> disabledNodeMask ,i ))|| BIT_CHECK (Mtm -> reconnectMask ,i )))
351
366
{
352
367
if (!MtmSendToNode (i ,& msg ,sizeof (msg ))) {
353
368
elog (LOG ,"Arbiter failed to send heartbeat to node %d" ,i + 1 );
@@ -382,7 +397,8 @@ static int MtmConnectSocket(int node, int port, int timeout)
382
397
sock_inet .sin_port = htons (port );
383
398
384
399
if (!MtmResolveHostByName (host ,addrs ,& n_addrs )) {
385
- elog (ERROR ,"Arbiter failed to resolve host '%s' by name" ,host );
400
+ elog (LOG ,"Arbiter failed to resolve host '%s' by name" ,host );
401
+ return -1 ;
386
402
}
387
403
388
404
Retry :
@@ -391,11 +407,13 @@ static int MtmConnectSocket(int node, int port, int timeout)
391
407
392
408
sd = socket (AF_INET ,SOCK_STREAM ,0 );
393
409
if (sd < 0 ) {
394
- elog (ERROR ,"Arbiter failed to create socket: %d" ,errno );
410
+ elog (LOG ,"Arbiter failed to create socket: %d" ,errno );
411
+ return -1 ;
395
412
}
396
413
rc = fcntl (sd ,F_SETFL ,O_NONBLOCK );
397
414
if (rc < 0 ) {
398
- elog (ERROR ,"Arbiter failed to switch socket to non-blocking mode: %d" ,errno );
415
+ elog (LOG ,"Arbiter failed to switch socket to non-blocking mode: %d" ,errno );
416
+ return -1 ;
399
417
}
400
418
busy_socket = sd ;
401
419
for (i = 0 ;i < n_addrs ;++ i ) {
@@ -463,14 +481,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
463
481
}
464
482
465
483
MtmLock (LW_EXCLUSIVE );
466
-
467
- /* Some node considered that I am dead, so switch to recovery mode */
468
- if (BIT_CHECK (resp .disabledNodeMask ,MtmNodeId - 1 )) {
469
- elog (WARNING ,"Node %d thinks that I was dead" ,resp .node );
470
- BIT_SET (Mtm -> disabledNodeMask ,MtmNodeId - 1 );
471
- MtmRollbackAllPreparedTransactions ();
472
- MtmSwitchClusterMode (MTM_RECOVERY );
473
- }
484
+ MtmCheckResponse (& resp );
474
485
MtmUnlock ();
475
486
476
487
return sd ;
@@ -493,7 +504,7 @@ static void MtmOpenConnections()
493
504
char const * arbiterPortStr = strstr (Mtm -> nodes [i ].con .connStr ,"arbiterport=" );
494
505
if (arbiterPortStr != NULL ) {
495
506
if (sscanf (arbiterPortStr + 12 ,"%d" ,& arbiterPort )!= 1 ) {
496
- elog (ERROR ,"Invalid arbiter port: %s" ,arbiterPortStr + 12 );
507
+ elog (ERROR ,"Invalid arbiter port: %s" ,arbiterPortStr + 12 );
497
508
}
498
509
}else {
499
510
arbiterPort = MtmArbiterPort + i + 1 ;
@@ -518,11 +529,13 @@ static bool MtmSendToNode(int node, void const* buf, int size)
518
529
while (true) {
519
530
if (sockets [node ] >=0 && BIT_CHECK (Mtm -> reconnectMask ,node )) {
520
531
elog (WARNING ,"Arbiter is forced to reconnect to node %d" ,node + 1 );
532
+ close (sockets [node ]);
533
+ sockets [node ]= -1 ;
534
+ }
535
+ if (BIT_CHECK (Mtm -> reconnectMask ,node )) {
521
536
MtmLock (LW_EXCLUSIVE );
522
537
BIT_CLEAR (Mtm -> reconnectMask ,node );
523
538
MtmUnlock ();
524
- close (sockets [node ]);
525
- sockets [node ]= -1 ;
526
539
}
527
540
if (sockets [node ]< 0 || !MtmWriteSocket (sockets [node ],buf ,size )) {
528
541
if (sockets [node ] >=0 ) {
@@ -535,9 +548,6 @@ static bool MtmSendToNode(int node, void const* buf, int size)
535
548
MtmOnNodeDisconnect (node + 1 );
536
549
return false;
537
550
}
538
- MtmLock (LW_EXCLUSIVE );
539
- BIT_CLEAR (Mtm -> reconnectMask ,node );
540
- MtmUnlock ();
541
551
MTM_LOG3 ("Arbiter restablished connection with node %d" ,node + 1 );
542
552
}else {
543
553
return true;
@@ -563,14 +573,23 @@ static void MtmAcceptOneConnection()
563
573
}else {
564
574
MtmHandshakeMessage req ;
565
575
MtmArbiterMessage resp ;
566
- int rc = MtmReadSocket (fd ,& req ,sizeof req );
576
+ int rc = fcntl (fd ,F_SETFL ,O_NONBLOCK );
577
+ if (rc < 0 ) {
578
+ elog (ERROR ,"Arbiter failed to switch socket to non-blocking mode: %d" ,errno );
579
+ }
580
+ rc = MtmReadSocket (fd ,& req ,sizeof req );
567
581
if (rc < sizeof (req )) {
568
582
elog (WARNING ,"Arbiter failed to handshake socket: %d, errno=%d" ,rc ,errno );
569
583
}else if (req .hdr .code != MSG_HANDSHAKE && req .hdr .dxid != HANDSHAKE_MAGIC ) {
570
584
elog (WARNING ,"Arbiter get unexpected handshake message %d" ,req .hdr .code );
571
585
close (fd );
572
586
}else {
573
587
Assert (req .hdr .node > 0 && req .hdr .node <=Mtm -> nAllNodes && req .hdr .node != MtmNodeId );
588
+
589
+ MtmLock (LW_EXCLUSIVE );
590
+ MtmCheckResponse (& req .hdr );
591
+ MtmUnlock ();
592
+
574
593
resp .code = MSG_STATUS ;
575
594
resp .disabledNodeMask = Mtm -> disabledNodeMask ;
576
595
resp .dxid = HANDSHAKE_MAGIC ;
@@ -726,6 +745,7 @@ static void MtmTransSender(Datum arg)
726
745
CHECK_FOR_INTERRUPTS ();
727
746
}
728
747
elog (LOG ,"Stop arbiter sender %d" ,MyProcPid );
748
+ proc_exit (1 );/* force restart of this bgwroker */
729
749
}
730
750
731
751
@@ -863,9 +883,7 @@ static void MtmTransReceiver(Datum arg)
863
883
elog (WARNING ,"Ignore response for unexisted transaction %d from node %d" ,msg -> dxid ,msg -> node );
864
884
continue ;
865
885
}
866
- if (BIT_CHECK (msg -> disabledNodeMask ,MtmNodeId - 1 )&& Mtm -> status != MTM_RECOVERY ) {
867
- elog (PANIC ,"Node %d thinks that I was dead: perform hara-kiri not to be a zombie" ,msg -> node );
868
- }
886
+ MtmCheckResponse (msg );
869
887
870
888
if (MtmIsCoordinator (ts )) {
871
889
switch (msg -> code ) {
@@ -975,5 +993,6 @@ static void MtmTransReceiver(Datum arg)
975
993
MtmRefreshClusterStatus (false);
976
994
}
977
995
}
996
+ proc_exit (1 );/* force restart of this bgwroker */
978
997
}
979
998