Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit75624cd

Browse files
committed
Merge RDMA branch
2 parents440165f +c761fc3 commit75624cd

File tree

58 files changed

+3660
-395
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+3660
-395
lines changed

‎configure

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,7 @@ with_perl
723723
with_tcl
724724
enable_thread_safety
725725
INCLUDES
726+
with_rsocket
726727
autodepend
727728
TAS
728729
GCC
@@ -826,6 +827,7 @@ with_wal_segsize
826827
with_CC
827828
enable_depend
828829
enable_cassert
830+
with_rsocket
829831
enable_thread_safety
830832
with_tcl
831833
with_tclconfig
@@ -1518,6 +1520,7 @@ Optional Packages:
15181520
--with-wal-segsize=SEGSIZE
15191521
set WAL segment size in MB [16]
15201522
--with-CC=CMD set compiler (deprecated)
1523+
--with-rsocket replace socket with rsocket (RDMA socket API)
15211524
--with-tcl build Tcl modules (PL/Tcl)
15221525
--with-tclconfig=DIR tclConfig.sh is in DIR
15231526
--with-perl build Perl modules (PL/Perl)
@@ -5301,6 +5304,41 @@ fi
53015304

53025305

53035306

5307+
#
5308+
# Replace socket with rsocket
5309+
#
5310+
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking whether to build with rsocket support" >&5
5311+
$as_echo_n "checking whether to build with rsocket support... " >&6; }
5312+
5313+
5314+
5315+
# Check whether --with-rsocket was given.
5316+
if test "${with_rsocket+set}" = set; then :
5317+
withval=$with_rsocket;
5318+
case $withval in
5319+
yes)
5320+
5321+
$as_echo "#define WITH_RSOCKET 1" >>confdefs.h
5322+
5323+
;;
5324+
no)
5325+
:
5326+
;;
5327+
*)
5328+
as_fn_error $? "no argument expected for --with-rsocket option" "$LINENO" 5
5329+
;;
5330+
esac
5331+
5332+
else
5333+
with_rsocket=no
5334+
5335+
fi
5336+
5337+
5338+
{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $with_rsocket" >&5
5339+
$as_echo "$with_rsocket" >&6; }
5340+
5341+
53045342
#
53055343
# Include directories
53065344
#

‎configure.in

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,16 @@ PGAC_ARG_BOOL(enable, cassert, no, [enable assertion checks (for debugging)],
574574
[Define to 1 to build with assertion checks. (--enable-cassert)])])
575575

576576

577+
#
578+
# Replace socket with rsocket
579+
#
580+
AC_MSG_CHECKING([whether to build with rsocket support])
581+
PGAC_ARG_BOOL(with, rsocket, no, [replace socket with rsocket (RDMA socket API)],
582+
[AC_DEFINE([WITH_RSOCKET], 1,
583+
[Define to 1 to build with rsocket instead socket. (--with-rsocket)])])
584+
AC_MSG_RESULT([$with_rsocket])
585+
AC_SUBST(with_rsocket)
586+
577587
#
578588
# Include directories
579589
#

‎contrib/mmts/arbiter.c

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include"postgres.h"
2222
#include"fmgr.h"
2323
#include"miscadmin.h"
24+
#include"pg_socket.h"
2425
#include"postmaster/postmaster.h"
2526
#include"postmaster/bgworker.h"
2627
#include"storage/s_lock.h"
@@ -58,6 +59,7 @@
5859
#include"tcop/utility.h"
5960
#include"libpq/ip.h"
6061

62+
6163
#ifndefUSE_EPOLL
6264
#ifdef__linux__
6365
#defineUSE_EPOLL 0
@@ -185,7 +187,7 @@ static void MtmUnregisterSocket(int fd)
185187
staticvoidMtmDisconnect(intnode)
186188
{
187189
MtmUnregisterSocket(sockets[node]);
188-
close(sockets[node]);
190+
pg_closesocket(sockets[node],MtmUseRDMA);
189191
sockets[node]=-1;
190192
MtmOnNodeDisconnect(node+1);
191193
}
@@ -208,7 +210,7 @@ static int MtmWaitSocket(int sd, bool forWrite, timestamp_t timeoutMsec)
208210
FD_SET(sd,&set);
209211
tv.tv_sec= (deadline-now)/USECS_PER_SEC;
210212
tv.tv_usec= (deadline-now)%USECS_PER_SEC;
211-
}while ((rc=select(sd+1,forWrite ?NULL :&set,forWrite ?&set :NULL,NULL,&tv))<0&&errno==EINTR);
213+
}while ((rc=pg_select(sd+1,forWrite ?NULL :&set,forWrite ?&set :NULL,NULL,&tv,MtmUseRDMA))<0&&errno==EINTR);
212214

213215
returnrc;
214216
}
@@ -219,7 +221,7 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
219221
while (size!=0) {
220222
intrc=MtmWaitSocket(sd, true,MtmHeartbeatSendTimeout);
221223
if (rc==1) {
222-
while ((rc=send(sd,src,size,0))<0&&errno==EINTR);
224+
while ((rc=pg_send(sd,src,size,0,MtmUseRDMA))<0&&errno==EINTR);
223225
if (rc<0) {
224226
if (errno==EINPROGRESS) {
225227
continue;
@@ -238,11 +240,11 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
238240
staticintMtmReadSocket(intsd,void*buf,intbuf_size)
239241
{
240242
intrc;
241-
while ((rc=recv(sd,buf,buf_size,0))<0&&errno==EINTR);
243+
while ((rc=pg_recv(sd,buf,buf_size,0,MtmUseRDMA))<0&&errno==EINTR);
242244
if (rc <=0&& (errno==EAGAIN||errno==EINPROGRESS)) {
243245
rc=MtmWaitSocket(sd, false,MtmHeartbeatSendTimeout);
244246
if (rc==1) {
245-
while ((rc=recv(sd,buf,buf_size,0))<0&&errno==EINTR);
247+
while ((rc=pg_recv(sd,buf,buf_size,0,MtmUseRDMA))<0&&errno==EINTR);
246248
}
247249
}
248250
returnrc;
@@ -254,25 +256,25 @@ static void MtmSetSocketOptions(int sd)
254256
{
255257
#ifdefTCP_NODELAY
256258
inton=1;
257-
if (setsockopt(sd,IPPROTO_TCP,TCP_NODELAY, (charconst*)&on,sizeof(on))<0) {
259+
if (pg_setsockopt(sd,IPPROTO_TCP,TCP_NODELAY, (charconst*)&on,sizeof(on),MtmUseRDMA)<0) {
258260
MTM_ELOG(WARNING,"Failed to set TCP_NODELAY: %m");
259261
}
260262
#endif
261-
if (setsockopt(sd,SOL_SOCKET,SO_KEEPALIVE, (charconst*)&on,sizeof(on))<0) {
263+
if (pg_setsockopt(sd,SOL_SOCKET,SO_KEEPALIVE, (charconst*)&on,sizeof(on),MtmUseRDMA)<0) {
262264
MTM_ELOG(WARNING,"Failed to set SO_KEEPALIVE: %m");
263265
}
264266

265267
if (tcp_keepalives_idle) {
266268
#ifdefTCP_KEEPIDLE
267-
if (setsockopt(sd,IPPROTO_TCP,TCP_KEEPIDLE,
268-
(char*)&tcp_keepalives_idle,sizeof(tcp_keepalives_idle))<0)
269+
if (pg_setsockopt(sd,IPPROTO_TCP,TCP_KEEPIDLE,
270+
(char*)&tcp_keepalives_idle,sizeof(tcp_keepalives_idle),MtmUseRDMA)<0)
269271
{
270272
MTM_ELOG(WARNING,"Failed to set TCP_KEEPIDLE: %m");
271273
}
272274
#else
273275
#ifdefTCP_KEEPALIVE
274-
if (setsockopt(sd,IPPROTO_TCP,TCP_KEEPALIVE,
275-
(char*)&tcp_keepalives_idle,sizeof(tcp_keepalives_idle))<0)
276+
if (pg_setsockopt(sd,IPPROTO_TCP,TCP_KEEPALIVE,
277+
(char*)&tcp_keepalives_idle,sizeof(tcp_keepalives_idle),MtmUseRDMA)<0)
276278
{
277279
MTM_ELOG(WARNING,"Failed to set TCP_KEEPALIVE: %m");
278280
}
@@ -281,17 +283,17 @@ static void MtmSetSocketOptions(int sd)
281283
}
282284
#ifdefTCP_KEEPINTVL
283285
if (tcp_keepalives_interval) {
284-
if (setsockopt(sd,IPPROTO_TCP,TCP_KEEPINTVL,
285-
(char*)&tcp_keepalives_interval,sizeof(tcp_keepalives_interval))<0)
286+
if (pg_setsockopt(sd,IPPROTO_TCP,TCP_KEEPINTVL,
287+
(char*)&tcp_keepalives_interval,sizeof(tcp_keepalives_interval),MtmUseRDMA)<0)
286288
{
287289
MTM_ELOG(WARNING,"Failed to set TCP_KEEPINTVL: %m");
288290
}
289291
}
290292
#endif
291293
#ifdefTCP_KEEPCNT
292294
if (tcp_keepalives_count) {
293-
if (setsockopt(sd,IPPROTO_TCP,TCP_KEEPCNT,
294-
(char*)&tcp_keepalives_count,sizeof(tcp_keepalives_count))<0)
295+
if (pg_setsockopt(sd,IPPROTO_TCP,TCP_KEEPCNT,
296+
(char*)&tcp_keepalives_count,sizeof(tcp_keepalives_count),MtmUseRDMA)<0)
295297
{
296298
MTM_ELOG(WARNING,"Failed to set TCP_KEEPCNT: %m");
297299
}
@@ -376,7 +378,7 @@ static void MtmSendHeartbeat()
376378
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
377379
if (BIT_CHECK(SELF_CONNECTIVITY_MASK,i)) {
378380
MTM_LOG1("Force reconnect to node %d",i+1);
379-
close(sockets[i]);
381+
pg_closesocket(sockets[i],MtmUseRDMA);
380382
sockets[i]=-1;
381383
MtmReconnectNode(i+1);/* set reconnect mask to force node reconnent */
382384
}
@@ -431,22 +433,22 @@ static int MtmConnectSocket(int node, int port)
431433
}
432434
BIT_SET(busy_mask,node);
433435

434-
Retry:
436+
Retry:
435437

436-
sd=socket(AF_INET,SOCK_STREAM,0);
438+
sd=pg_socket(AF_INET,SOCK_STREAM,0,MtmUseRDMA);
437439
if (sd<0) {
438440
MTM_ELOG(LOG,"Arbiter failed to create socket: %s",strerror(errno));
439441
gotoError;
440442
}
441-
rc=fcntl(sd,F_SETFL,O_NONBLOCK);
443+
rc=pg_fcntl(sd,F_SETFL,O_NONBLOCK,MtmUseRDMA);
442444
if (rc<0) {
443445
MTM_ELOG(LOG,"Arbiter failed to switch socket to non-blocking mode: %s",strerror(errno));
444446
gotoError;
445447
}
446448
for (addr=addrs;addr!=NULL;addr=addr->ai_next)
447449
{
448450
do {
449-
rc=connect(sd,addr->ai_addr,addr->ai_addrlen);
451+
rc=pg_connect(sd,addr->ai_addr,addr->ai_addrlen,MtmUseRDMA);
450452
}while (rc<0&&errno==EINTR);
451453

452454
if (rc >=0||errno==EINPROGRESS) {
@@ -460,7 +462,7 @@ static int MtmConnectSocket(int node, int port)
460462
socklen_toptlen=sizeof(int);
461463
interrcode;
462464

463-
if (getsockopt(sd,SOL_SOCKET,SO_ERROR, (void*)&errcode,&optlen)<0) {
465+
if (pg_getsockopt(sd,SOL_SOCKET,SO_ERROR, (void*)&errcode,&optlen,MtmUseRDMA)<0) {
464466
MTM_ELOG(WARNING,"Arbiter failed to getsockopt for %s:%d: %s",host,port,strerror(errcode));
465467
gotoError;
466468
}
@@ -490,17 +492,17 @@ static int MtmConnectSocket(int node, int port)
490492
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].con.connStr);
491493
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
492494
MTM_ELOG(WARNING,"Arbiter failed to send handshake message to %s:%d: %s",host,port,strerror(errno));
493-
close(sd);
495+
pg_closesocket(sd,MtmUseRDMA);
494496
gotoRetry;
495497
}
496498
if (MtmReadSocket(sd,&resp,sizeofresp)!=sizeof(resp)) {
497499
MTM_ELOG(WARNING,"Arbiter failed to receive response for handshake message from %s:%d: %s",host,port,strerror(errno));
498-
close(sd);
500+
pg_closesocket(sd,MtmUseRDMA);
499501
gotoRetry;
500502
}
501503
if (resp.code!=MSG_STATUS||resp.dxid!=HANDSHAKE_MAGIC) {
502504
MTM_ELOG(WARNING,"Arbiter get unexpected response %d for handshake message from %s:%d",resp.code,host,port);
503-
close(sd);
505+
pg_closesocket(sd,MtmUseRDMA);
504506
gotoRetry;
505507
}
506508
if (addrs)
@@ -519,7 +521,7 @@ static int MtmConnectSocket(int node, int port)
519521
Error:
520522
busy_mask=save_mask;
521523
if (sd >=0) {
522-
close(sd);
524+
pg_closesocket(sd,MtmUseRDMA);
523525
}
524526
if (addrs) {
525527
pg_freeaddrinfo_all(hint.ai_family,addrs);
@@ -567,7 +569,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
567569
*/
568570
if (sockets[node] >=0&&BIT_CHECK(Mtm->reconnectMask,node)) {
569571
MTM_ELOG(WARNING,"Arbiter is forced to reconnect to node %d",node+1);
570-
close(sockets[node]);
572+
pg_closesocket(sockets[node],MtmUseRDMA);
571573
sockets[node]=-1;
572574
}
573575
#endif
@@ -579,7 +581,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
579581
if (sockets[node]<0|| !MtmWriteSocket(sockets[node],buf,size)) {
580582
if (sockets[node] >=0) {
581583
MTM_ELOG(WARNING,"Arbiter fail to write to node %d: %s",node+1,strerror(errno));
582-
close(sockets[node]);
584+
pg_closesocket(sockets[node],MtmUseRDMA);
583585
sockets[node]=-1;
584586
}
585587
sockets[node]=MtmConnectSocket(node,Mtm->nodes[node].con.arbiterPort);
@@ -610,23 +612,23 @@ static int MtmReadFromNode(int node, void* buf, int buf_size)
610612

611613
staticvoidMtmAcceptOneConnection()
612614
{
613-
intfd=accept(gateway,NULL,NULL);
615+
intfd=pg_accept(gateway,NULL,NULL,MtmUseRDMA);
614616
if (fd<0) {
615617
MTM_ELOG(WARNING,"Arbiter failed to accept socket: %s",strerror(errno));
616618
}else {
617619
MtmHandshakeMessagereq;
618620
MtmArbiterMessageresp;
619-
intrc=fcntl(fd,F_SETFL,O_NONBLOCK);
621+
intrc=pg_fcntl(fd,F_SETFL,O_NONBLOCK,MtmUseRDMA);
620622
if (rc<0) {
621623
MTM_ELOG(ERROR,"Arbiter failed to switch socket to non-blocking mode: %s",strerror(errno));
622624
}
623625
rc=MtmReadSocket(fd,&req,sizeofreq);
624626
if (rc<sizeof(req)) {
625-
MTM_ELOG(WARNING,"Arbiter failed to handshake socket: %s",strerror(errno));
626-
close(fd);
627+
MTM_ELOG(WARNING,"Arbiter failed to handshake socket: %d, errno=%d",rc,errno);
628+
pg_closesocket(fd,MtmUseRDMA);
627629
}elseif (req.hdr.code!=MSG_HANDSHAKE&&req.hdr.dxid!=HANDSHAKE_MAGIC) {
628-
MTM_ELOG(WARNING,"Arbiterget unexpected handshakemessage %d",req.hdr.code);
629-
close(fd);
630+
MTM_ELOG(WARNING,"Arbiterfailed to handshakesocket: %s",strerror(errno));
631+
pg_closesocket(fd,MtmUseRDMA);
630632
}else {
631633
intnode=req.hdr.node-1;
632634
Assert(node >=0&&node<Mtm->nAllNodes&&node+1!=MtmNodeId);
@@ -643,7 +645,7 @@ static void MtmAcceptOneConnection()
643645
MtmUpdateNodeConnectionInfo(&Mtm->nodes[node].con,req.connStr);
644646
if (!MtmWriteSocket(fd,&resp,sizeofresp)) {
645647
MTM_ELOG(WARNING,"Arbiter failed to write response for handshake message to node %d",node+1);
646-
close(fd);
648+
pg_closesocket(fd,MtmUseRDMA);
647649
}else {
648650
MTM_LOG1("Arbiter established connection with node %d",node+1);
649651
if (sockets[node] >=0) {
@@ -673,18 +675,18 @@ static void MtmAcceptIncomingConnections()
673675
sock_inet.sin_addr.s_addr=htonl(INADDR_ANY);
674676
sock_inet.sin_port=htons(MtmArbiterPort);
675677

676-
gateway=socket(sock_inet.sin_family,SOCK_STREAM,0);
678+
gateway=pg_socket(sock_inet.sin_family,SOCK_STREAM,0,MtmUseRDMA);
677679
if (gateway<0) {
678680
MTM_ELOG(ERROR,"Arbiter failed to create socket: %s",strerror(errno));
679681
}
680-
if (setsockopt(gateway,SOL_SOCKET,SO_REUSEADDR, (char*)&on,sizeofon)<0) {
682+
if (pg_setsockopt(gateway,SOL_SOCKET,SO_REUSEADDR, (char*)&on,sizeofon,MtmUseRDMA)<0) {
681683
MTM_ELOG(ERROR,"Arbiter failed to set options for socket: %s",strerror(errno));
682684
}
683685

684-
if (bind(gateway, (structsockaddr*)&sock_inet,sizeof(sock_inet))<0) {
686+
if (pg_bind(gateway, (structsockaddr*)&sock_inet,sizeof(sock_inet),MtmUseRDMA)<0) {
685687
MTM_ELOG(ERROR,"Arbiter failed to bind socket: %s",strerror(errno));
686688
}
687-
if (listen(gateway,nNodes)<0) {
689+
if (pg_listen(gateway,nNodes,MtmUseRDMA)<0) {
688690
MTM_ELOG(ERROR,"Arbiter failed to listen socket: %s",strerror(errno));
689691
}
690692

@@ -787,7 +789,7 @@ static bool MtmRecovery()
787789
fd_settryset;
788790
FD_ZERO(&tryset);
789791
FD_SET(sd,&tryset);
790-
if (select(sd+1,&tryset,NULL,NULL,&tm)<0) {
792+
if (pg_select(sd+1,&tryset,NULL,NULL,&tm,MtmUseRDMA)<0) {
791793
MTM_ELOG(WARNING,"Arbiter lost connection with node %d",i+1);
792794
MtmDisconnect(i);
793795
recovered= true;
@@ -884,7 +886,7 @@ static void MtmReceiver(Datum arg)
884886
tv.tv_sec=selectTimeout/1000;
885887
tv.tv_usec=selectTimeout%1000*1000;
886888
do {
887-
n=select(max_fd+1,&events,NULL,NULL,&tv);
889+
n=pg_select(max_fd+1,&events,NULL,NULL,&tv,MtmUseRDMA);
888890
}while (n<0&&errno==EINTR);
889891
}while (n<0&&MtmRecovery());
890892

‎contrib/mmts/multimaster.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ int MtmHeartbeatRecvTimeout;
252252
intMtmMin2PCTimeout;
253253
intMtmMax2PCRatio;
254254
boolMtmUseDtm;
255+
boolMtmUseRDMA;
255256
boolMtmPreserveCommitOrder;
256257
boolMtmVolksWagenMode;/* Pretend to be normal postgres. This means skip some NOTICE's and use local sequences */
257258

@@ -3295,6 +3296,19 @@ _PG_init(void)
32953296
NULL
32963297
);
32973298

3299+
DefineCustomBoolVariable(
3300+
"multimaster.use_rdma",
3301+
"Use RDMA sockets",
3302+
NULL,
3303+
&MtmUseRDMA,
3304+
false,
3305+
PGC_POSTMASTER,
3306+
0,
3307+
NULL,
3308+
NULL,
3309+
NULL
3310+
);
3311+
32983312
DefineCustomBoolVariable(
32993313
"multimaster.preserve_commit_order",
33003314
"Transactions from one node will be committed in same order al all nodes",

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp