@@ -429,29 +429,29 @@ static int MtmConnectSocket(int node, int port)
429
429
snprintf (portstr ,sizeof (portstr ),"%d" ,port );
430
430
431
431
rc = pg_getaddrinfo_all (host ,portstr ,& hint ,& addrs );
432
- if (rc != 0 )
432
+ if (rc != 0 )
433
433
{
434
434
MTM_ELOG (LOG ,"Arbiter failed to resolve host '%s' by name: %s" ,host ,gai_strerror (rc ));
435
435
return -1 ;
436
436
}
437
437
BIT_SET (busy_mask ,node );
438
438
439
- Retry :
439
+ Retry :
440
440
441
- sd = socket (AF_INET ,SOCK_STREAM ,0 );
441
+ sd = pg_socket (AF_INET ,SOCK_STREAM ,0 , MtmUseRDMA );
442
442
if (sd < 0 ) {
443
443
MTM_ELOG (LOG ,"Arbiter failed to create socket: %s" ,strerror (errno ));
444
444
gotoError ;
445
445
}
446
- rc = fcntl (sd ,F_SETFL ,O_NONBLOCK );
446
+ rc = pg_fcntl (sd ,F_SETFL ,O_NONBLOCK , MtmUseRDMA );
447
447
if (rc < 0 ) {
448
448
MTM_ELOG (LOG ,"Arbiter failed to switch socket to non-blocking mode: %s" ,strerror (errno ));
449
449
gotoError ;
450
450
}
451
451
for (addr = addrs ;addr != NULL ;addr = addr -> ai_next )
452
452
{
453
453
do {
454
- rc = connect (sd ,addr -> ai_addr ,addr -> ai_addrlen );
454
+ rc = pg_connect (sd ,addr -> ai_addr ,addr -> ai_addrlen , MtmUseRDMA );
455
455
}while (rc < 0 && errno == EINTR );
456
456
457
457
if (rc >=0 || errno == EINPROGRESS ) {
@@ -491,12 +491,12 @@ static int MtmConnectSocket(int node, int port)
491
491
strcpy (req .connStr ,Mtm -> nodes [MtmNodeId - 1 ].con .connStr );
492
492
if (!MtmWriteSocket (sd ,& req ,sizeof req )) {
493
493
MTM_ELOG (WARNING ,"Arbiter failed to send handshake message to %s:%d: %s" ,host ,port ,strerror (errno ));
494
- close (sd );
494
+ pg_closesocket (sd , MtmUseRDMA );
495
495
gotoRetry ;
496
496
}
497
497
if (MtmReadSocket (sd ,& resp ,sizeof resp )!= sizeof (resp )) {
498
498
MTM_ELOG (WARNING ,"Arbiter failed to receive response for handshake message from %s:%d: %s" ,host ,port ,strerror (errno ));
499
- close (sd );
499
+ pg_closesocket (sd , MtmUseRDMA );
500
500
gotoRetry ;
501
501
}
502
502
if (resp .code != MSG_STATUS || resp .dxid != HANDSHAKE_MAGIC ) {
@@ -580,7 +580,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
580
580
if (sockets [node ]< 0 || !MtmWriteSocket (sockets [node ],buf ,size )) {
581
581
if (sockets [node ] >=0 ) {
582
582
MTM_ELOG (WARNING ,"Arbiter fail to write to node %d: %s" ,node + 1 ,strerror (errno ));
583
- close (sockets [node ]);
583
+ pg_closesocket (sockets [node ], MtmUseRDMA );
584
584
sockets [node ]= -1 ;
585
585
}
586
586
sockets [node ]= MtmConnectSocket (node ,Mtm -> nodes [node ].con .arbiterPort );
@@ -626,7 +626,7 @@ static void MtmAcceptOneConnection()
626
626
MTM_ELOG (WARNING ,"Arbiter failed to handshake socket: %s" ,strerror (errno ));
627
627
pg_closesocket (fd ,MtmUseRDMA );
628
628
}else if (req .hdr .code != MSG_HANDSHAKE && req .hdr .dxid != HANDSHAKE_MAGIC ) {
629
- MTM_ELOG (WARNING ,"Arbiterget unexpected handshakemessage %d " ,req . hdr . code );
629
+ MTM_ELOG (WARNING ,"Arbiterfailed to handshakesocket: %s " ,strerror ( errno ) );
630
630
pg_closesocket (fd ,MtmUseRDMA );
631
631
}else {
632
632
int node = req .hdr .node - 1 ;