21
21
#include "postgres.h"
22
22
#include "fmgr.h"
23
23
#include "miscadmin.h"
24
+ #include "pg_socket.h"
24
25
#include "postmaster/postmaster.h"
25
26
#include "postmaster/bgworker.h"
26
27
#include "storage/s_lock.h"
58
59
#include "tcop/utility.h"
59
60
#include "libpq/ip.h"
60
61
62
+
61
63
#ifndef USE_EPOLL
62
64
#ifdef __linux__
63
65
#define USE_EPOLL 0
@@ -185,7 +187,7 @@ static void MtmUnregisterSocket(int fd)
185
187
static void MtmDisconnect (int node )
186
188
{
187
189
MtmUnregisterSocket (sockets [node ]);
188
- close (sockets [node ]);
190
+ pg_closesocket (sockets [node ], MtmUseRDMA );
189
191
sockets [node ]= -1 ;
190
192
MtmOnNodeDisconnect (node + 1 );
191
193
}
@@ -208,7 +210,7 @@ static int MtmWaitSocket(int sd, bool forWrite, timestamp_t timeoutMsec)
208
210
FD_SET (sd ,& set );
209
211
tv .tv_sec = (deadline - now )/USECS_PER_SEC ;
210
212
tv .tv_usec = (deadline - now )%USECS_PER_SEC ;
211
- }while ((rc = select (sd + 1 ,forWrite ?NULL :& set ,forWrite ?& set :NULL ,NULL ,& tv ))< 0 && errno == EINTR );
213
+ }while ((rc = pg_select (sd + 1 ,forWrite ?NULL :& set ,forWrite ?& set :NULL ,NULL ,& tv , MtmUseRDMA ))< 0 && errno == EINTR );
212
214
213
215
return rc ;
214
216
}
@@ -219,7 +221,7 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
219
221
while (size != 0 ) {
220
222
int rc = MtmWaitSocket (sd , true,MtmHeartbeatSendTimeout );
221
223
if (rc == 1 ) {
222
- while ((rc = send (sd ,src ,size ,0 ))< 0 && errno == EINTR );
224
+ while ((rc = pg_send (sd ,src ,size ,0 , MtmUseRDMA ))< 0 && errno == EINTR );
223
225
if (rc < 0 ) {
224
226
if (errno == EINPROGRESS ) {
225
227
continue ;
@@ -238,11 +240,11 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
238
240
static int MtmReadSocket (int sd ,void * buf ,int buf_size )
239
241
{
240
242
int rc ;
241
- while ((rc = recv (sd ,buf ,buf_size ,0 ))< 0 && errno == EINTR );
243
+ while ((rc = pg_recv (sd ,buf ,buf_size ,0 , MtmUseRDMA ))< 0 && errno == EINTR );
242
244
if (rc <=0 && (errno == EAGAIN || errno == EINPROGRESS )) {
243
245
rc = MtmWaitSocket (sd , false,MtmHeartbeatSendTimeout );
244
246
if (rc == 1 ) {
245
- while ((rc = recv (sd ,buf ,buf_size ,0 ))< 0 && errno == EINTR );
247
+ while ((rc = pg_recv (sd ,buf ,buf_size ,0 , MtmUseRDMA ))< 0 && errno == EINTR );
246
248
}
247
249
}
248
250
return rc ;
@@ -254,25 +256,25 @@ static void MtmSetSocketOptions(int sd)
254
256
{
255
257
#ifdef TCP_NODELAY
256
258
int on = 1 ;
257
- if (setsockopt (sd ,IPPROTO_TCP ,TCP_NODELAY , (char const * )& on ,sizeof (on ))< 0 ) {
259
+ if (pg_setsockopt (sd ,IPPROTO_TCP ,TCP_NODELAY , (char const * )& on ,sizeof (on ), MtmUseRDMA )< 0 ) {
258
260
MTM_ELOG (WARNING ,"Failed to set TCP_NODELAY: %m" );
259
261
}
260
262
#endif
261
- if (setsockopt (sd ,SOL_SOCKET ,SO_KEEPALIVE , (char const * )& on ,sizeof (on ))< 0 ) {
263
+ if (pg_setsockopt (sd ,SOL_SOCKET ,SO_KEEPALIVE , (char const * )& on ,sizeof (on ), MtmUseRDMA )< 0 ) {
262
264
MTM_ELOG (WARNING ,"Failed to set SO_KEEPALIVE: %m" );
263
265
}
264
266
265
267
if (tcp_keepalives_idle ) {
266
268
#ifdef TCP_KEEPIDLE
267
- if (setsockopt (sd ,IPPROTO_TCP ,TCP_KEEPIDLE ,
268
- (char * )& tcp_keepalives_idle ,sizeof (tcp_keepalives_idle ))< 0 )
269
+ if (pg_setsockopt (sd ,IPPROTO_TCP ,TCP_KEEPIDLE ,
270
+ (char * )& tcp_keepalives_idle ,sizeof (tcp_keepalives_idle ), MtmUseRDMA )< 0 )
269
271
{
270
272
MTM_ELOG (WARNING ,"Failed to set TCP_KEEPIDLE: %m" );
271
273
}
272
274
#else
273
275
#ifdef TCP_KEEPALIVE
274
- if (setsockopt (sd ,IPPROTO_TCP ,TCP_KEEPALIVE ,
275
- (char * )& tcp_keepalives_idle ,sizeof (tcp_keepalives_idle ))< 0 )
276
+ if (pg_setsockopt (sd ,IPPROTO_TCP ,TCP_KEEPALIVE ,
277
+ (char * )& tcp_keepalives_idle ,sizeof (tcp_keepalives_idle ), MtmUseRDMA )< 0 )
276
278
{
277
279
MTM_ELOG (WARNING ,"Failed to set TCP_KEEPALIVE: %m" );
278
280
}
@@ -281,17 +283,17 @@ static void MtmSetSocketOptions(int sd)
281
283
}
282
284
#ifdef TCP_KEEPINTVL
283
285
if (tcp_keepalives_interval ) {
284
- if (setsockopt (sd ,IPPROTO_TCP ,TCP_KEEPINTVL ,
285
- (char * )& tcp_keepalives_interval ,sizeof (tcp_keepalives_interval ))< 0 )
286
+ if (pg_setsockopt (sd ,IPPROTO_TCP ,TCP_KEEPINTVL ,
287
+ (char * )& tcp_keepalives_interval ,sizeof (tcp_keepalives_interval ), MtmUseRDMA )< 0 )
286
288
{
287
289
MTM_ELOG (WARNING ,"Failed to set TCP_KEEPINTVL: %m" );
288
290
}
289
291
}
290
292
#endif
291
293
#ifdef TCP_KEEPCNT
292
294
if (tcp_keepalives_count ) {
293
- if (setsockopt (sd ,IPPROTO_TCP ,TCP_KEEPCNT ,
294
- (char * )& tcp_keepalives_count ,sizeof (tcp_keepalives_count ))< 0 )
295
+ if (pg_setsockopt (sd ,IPPROTO_TCP ,TCP_KEEPCNT ,
296
+ (char * )& tcp_keepalives_count ,sizeof (tcp_keepalives_count ), MtmUseRDMA )< 0 )
295
297
{
296
298
MTM_ELOG (WARNING ,"Failed to set TCP_KEEPCNT: %m" );
297
299
}
@@ -376,7 +378,7 @@ static void MtmSendHeartbeat()
376
378
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
377
379
if (BIT_CHECK (SELF_CONNECTIVITY_MASK ,i )) {
378
380
MTM_LOG1 ("Force reconnect to node %d" ,i + 1 );
379
- close (sockets [i ]);
381
+ pg_closesocket (sockets [i ], MtmUseRDMA );
380
382
sockets [i ]= -1 ;
381
383
MtmReconnectNode (i + 1 );/* set reconnect mask to force node reconnent */
382
384
}
@@ -431,22 +433,22 @@ static int MtmConnectSocket(int node, int port)
431
433
}
432
434
BIT_SET (busy_mask ,node );
433
435
434
- Retry :
436
+ Retry :
435
437
436
- sd = socket (AF_INET ,SOCK_STREAM ,0 );
438
+ sd = pg_socket (AF_INET ,SOCK_STREAM ,0 , MtmUseRDMA );
437
439
if (sd < 0 ) {
438
440
MTM_ELOG (LOG ,"Arbiter failed to create socket: %s" ,strerror (errno ));
439
441
gotoError ;
440
442
}
441
- rc = fcntl (sd ,F_SETFL ,O_NONBLOCK );
443
+ rc = pg_fcntl (sd ,F_SETFL ,O_NONBLOCK , MtmUseRDMA );
442
444
if (rc < 0 ) {
443
445
MTM_ELOG (LOG ,"Arbiter failed to switch socket to non-blocking mode: %s" ,strerror (errno ));
444
446
gotoError ;
445
447
}
446
448
for (addr = addrs ;addr != NULL ;addr = addr -> ai_next )
447
449
{
448
450
do {
449
- rc = connect (sd ,addr -> ai_addr ,addr -> ai_addrlen );
451
+ rc = pg_connect (sd ,addr -> ai_addr ,addr -> ai_addrlen , MtmUseRDMA );
450
452
}while (rc < 0 && errno == EINTR );
451
453
452
454
if (rc >=0 || errno == EINPROGRESS ) {
@@ -460,7 +462,7 @@ static int MtmConnectSocket(int node, int port)
460
462
socklen_t optlen = sizeof (int );
461
463
int errcode ;
462
464
463
- if (getsockopt (sd ,SOL_SOCKET ,SO_ERROR , (void * )& errcode ,& optlen )< 0 ) {
465
+ if (pg_getsockopt (sd ,SOL_SOCKET ,SO_ERROR , (void * )& errcode ,& optlen , MtmUseRDMA )< 0 ) {
464
466
MTM_ELOG (WARNING ,"Arbiter failed to getsockopt for %s:%d: %s" ,host ,port ,strerror (errcode ));
465
467
gotoError ;
466
468
}
@@ -490,17 +492,17 @@ static int MtmConnectSocket(int node, int port)
490
492
strcpy (req .connStr ,Mtm -> nodes [MtmNodeId - 1 ].con .connStr );
491
493
if (!MtmWriteSocket (sd ,& req ,sizeof req )) {
492
494
MTM_ELOG (WARNING ,"Arbiter failed to send handshake message to %s:%d: %s" ,host ,port ,strerror (errno ));
493
- close (sd );
495
+ pg_closesocket (sd , MtmUseRDMA );
494
496
gotoRetry ;
495
497
}
496
498
if (MtmReadSocket (sd ,& resp ,sizeof resp )!= sizeof (resp )) {
497
499
MTM_ELOG (WARNING ,"Arbiter failed to receive response for handshake message from %s:%d: %s" ,host ,port ,strerror (errno ));
498
- close (sd );
500
+ pg_closesocket (sd , MtmUseRDMA );
499
501
gotoRetry ;
500
502
}
501
503
if (resp .code != MSG_STATUS || resp .dxid != HANDSHAKE_MAGIC ) {
502
504
MTM_ELOG (WARNING ,"Arbiter get unexpected response %d for handshake message from %s:%d" ,resp .code ,host ,port );
503
- close (sd );
505
+ pg_closesocket (sd , MtmUseRDMA );
504
506
gotoRetry ;
505
507
}
506
508
if (addrs )
@@ -519,7 +521,7 @@ static int MtmConnectSocket(int node, int port)
519
521
Error :
520
522
busy_mask = save_mask ;
521
523
if (sd >=0 ) {
522
- close (sd );
524
+ pg_closesocket (sd , MtmUseRDMA );
523
525
}
524
526
if (addrs ) {
525
527
pg_freeaddrinfo_all (hint .ai_family ,addrs );
@@ -567,7 +569,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
567
569
*/
568
570
if (sockets [node ] >=0 && BIT_CHECK (Mtm -> reconnectMask ,node )) {
569
571
MTM_ELOG (WARNING ,"Arbiter is forced to reconnect to node %d" ,node + 1 );
570
- close (sockets [node ]);
572
+ pg_closesocket (sockets [node ], MtmUseRDMA );
571
573
sockets [node ]= -1 ;
572
574
}
573
575
#endif
@@ -579,7 +581,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
579
581
if (sockets [node ]< 0 || !MtmWriteSocket (sockets [node ],buf ,size )) {
580
582
if (sockets [node ] >=0 ) {
581
583
MTM_ELOG (WARNING ,"Arbiter fail to write to node %d: %s" ,node + 1 ,strerror (errno ));
582
- close (sockets [node ]);
584
+ pg_closesocket (sockets [node ], MtmUseRDMA );
583
585
sockets [node ]= -1 ;
584
586
}
585
587
sockets [node ]= MtmConnectSocket (node ,Mtm -> nodes [node ].con .arbiterPort );
@@ -610,23 +612,23 @@ static int MtmReadFromNode(int node, void* buf, int buf_size)
610
612
611
613
static void MtmAcceptOneConnection ()
612
614
{
613
- int fd = accept (gateway ,NULL ,NULL );
615
+ int fd = pg_accept (gateway ,NULL ,NULL , MtmUseRDMA );
614
616
if (fd < 0 ) {
615
617
MTM_ELOG (WARNING ,"Arbiter failed to accept socket: %s" ,strerror (errno ));
616
618
}else {
617
619
MtmHandshakeMessage req ;
618
620
MtmArbiterMessage resp ;
619
- int rc = fcntl (fd ,F_SETFL ,O_NONBLOCK );
621
+ int rc = pg_fcntl (fd ,F_SETFL ,O_NONBLOCK , MtmUseRDMA );
620
622
if (rc < 0 ) {
621
623
MTM_ELOG (ERROR ,"Arbiter failed to switch socket to non-blocking mode: %s" ,strerror (errno ));
622
624
}
623
625
rc = MtmReadSocket (fd ,& req ,sizeof req );
624
626
if (rc < sizeof (req )) {
625
- MTM_ELOG (WARNING ,"Arbiter failed to handshake socket: %s " ,strerror ( errno ) );
626
- close (fd );
627
+ MTM_ELOG (WARNING ,"Arbiter failed to handshake socket: %d, errno=%d " ,rc , errno );
628
+ pg_closesocket (fd , MtmUseRDMA );
627
629
}else if (req .hdr .code != MSG_HANDSHAKE && req .hdr .dxid != HANDSHAKE_MAGIC ) {
628
- MTM_ELOG (WARNING ,"Arbiterget unexpected handshakemessage %d " ,req . hdr . code );
629
- close (fd );
630
+ MTM_ELOG (WARNING ,"Arbiterfailed to handshakesocket: %s " ,strerror ( errno ) );
631
+ pg_closesocket (fd , MtmUseRDMA );
630
632
}else {
631
633
int node = req .hdr .node - 1 ;
632
634
Assert (node >=0 && node < Mtm -> nAllNodes && node + 1 != MtmNodeId );
@@ -643,7 +645,7 @@ static void MtmAcceptOneConnection()
643
645
MtmUpdateNodeConnectionInfo (& Mtm -> nodes [node ].con ,req .connStr );
644
646
if (!MtmWriteSocket (fd ,& resp ,sizeof resp )) {
645
647
MTM_ELOG (WARNING ,"Arbiter failed to write response for handshake message to node %d" ,node + 1 );
646
- close (fd );
648
+ pg_closesocket (fd , MtmUseRDMA );
647
649
}else {
648
650
MTM_LOG1 ("Arbiter established connection with node %d" ,node + 1 );
649
651
if (sockets [node ] >=0 ) {
@@ -673,18 +675,18 @@ static void MtmAcceptIncomingConnections()
673
675
sock_inet .sin_addr .s_addr = htonl (INADDR_ANY );
674
676
sock_inet .sin_port = htons (MtmArbiterPort );
675
677
676
- gateway = socket (sock_inet .sin_family ,SOCK_STREAM ,0 );
678
+ gateway = pg_socket (sock_inet .sin_family ,SOCK_STREAM ,0 , MtmUseRDMA );
677
679
if (gateway < 0 ) {
678
680
MTM_ELOG (ERROR ,"Arbiter failed to create socket: %s" ,strerror (errno ));
679
681
}
680
- if (setsockopt (gateway ,SOL_SOCKET ,SO_REUSEADDR , (char * )& on ,sizeof on )< 0 ) {
682
+ if (pg_setsockopt (gateway ,SOL_SOCKET ,SO_REUSEADDR , (char * )& on ,sizeof on , MtmUseRDMA )< 0 ) {
681
683
MTM_ELOG (ERROR ,"Arbiter failed to set options for socket: %s" ,strerror (errno ));
682
684
}
683
685
684
- if (bind (gateway , (struct sockaddr * )& sock_inet ,sizeof (sock_inet ))< 0 ) {
686
+ if (pg_bind (gateway , (struct sockaddr * )& sock_inet ,sizeof (sock_inet ), MtmUseRDMA )< 0 ) {
685
687
MTM_ELOG (ERROR ,"Arbiter failed to bind socket: %s" ,strerror (errno ));
686
688
}
687
- if (listen (gateway ,nNodes )< 0 ) {
689
+ if (pg_listen (gateway ,nNodes , MtmUseRDMA )< 0 ) {
688
690
MTM_ELOG (ERROR ,"Arbiter failed to listen socket: %s" ,strerror (errno ));
689
691
}
690
692
@@ -787,7 +789,7 @@ static bool MtmRecovery()
787
789
fd_set tryset ;
788
790
FD_ZERO (& tryset );
789
791
FD_SET (sd ,& tryset );
790
- if (select (sd + 1 ,& tryset ,NULL ,NULL ,& tm )< 0 ) {
792
+ if (pg_select (sd + 1 ,& tryset ,NULL ,NULL ,& tm , MtmUseRDMA )< 0 ) {
791
793
MTM_ELOG (WARNING ,"Arbiter lost connection with node %d" ,i + 1 );
792
794
MtmDisconnect (i );
793
795
recovered = true;
@@ -884,7 +886,7 @@ static void MtmReceiver(Datum arg)
884
886
tv .tv_sec = selectTimeout /1000 ;
885
887
tv .tv_usec = selectTimeout %1000 * 1000 ;
886
888
do {
887
- n = select (max_fd + 1 ,& events ,NULL ,NULL ,& tv );
889
+ n = pg_select (max_fd + 1 ,& events ,NULL ,NULL ,& tv , MtmUseRDMA );
888
890
}while (n < 0 && errno == EINTR );
889
891
}while (n < 0 && MtmRecovery ());
890
892